使用Python实现Whisper实时音频流识别而不保存文件
2024.08.29 17:32浏览量:9简介:本文介绍如何结合Python中的音频处理库和OpenAI的Whisper API,实现实时录音并直接进行语音识别,无需先将音频保存到文件。我们将利用`sounddevice`库进行音频捕获,并通过`io.BytesIO`在内存中处理音频数据。
千帆应用开发平台“智能体Pro”全新上线 限时免费体验
面向慢思考场景,支持低代码配置的方式创建“智能体Pro”应用
引言
随着AI技术的快速发展,语音识别技术已经成为许多应用的重要组成部分。OpenAI的Whisper是一个强大的自动语音识别(ASR)模型,支持多种语言。然而,官方API通常要求传入文件作为输入,这在实时应用中可能不够高效。本文将展示如何使用Python实现一个实时录音系统,该系统直接将音频流发送到Whisper API进行识别,而无需先保存到磁盘。
技术栈
- Python:编程语言
- sounddevice:用于音频捕获
- numpy:处理音频数据
- io.BytesIO:在内存中存储音频数据
- requests 或 httpx:发送HTTP请求到Whisper API
- OpenAI Whisper API:自动语音识别服务
步骤 1: 安装必要的库
首先,你需要安装sounddevice
和numpy
。此外,为了发送HTTP请求,可以选择安装requests
或httpx
。
pip install sounddevice numpy requests
# 或者使用 httpx
pip install httpx
步骤 2: 捕获音频流
我们将使用sounddevice
库来捕获音频流。以下是一个简单的音频捕获脚本,它会捕获音频数据并将其存储在numpy
数组中。
import sounddevice as sd
import numpy as np
def capture_audio(duration=5, fs=44100, channels=1):
print('正在录音...')
myrecording = sd.rec(int(duration * fs), samplerate=fs, channels=channels, dtype='int16', blocking=True)
sd.wait() # 等待录音完成
print('录音完成!')
return myrecording
# 示例:捕获5秒音频
recording = capture_audio(5)
步骤 3: 实时处理音频流
为了实时处理音频并发送到Whisper API,我们需要修改上面的函数,以便能够持续捕获音频并分批处理。这里,我们使用io.BytesIO
来在内存中模拟文件操作。
```python
import io
from threading import Thread
import requests
假设这是你的Whisper API密钥
WHISPER_API_KEY = ‘YOUR_API_KEY’
WHISPER_API_URL = ‘https://api.openai.com/v1/audio/transcribe‘
def send_audio_to_whisper(audio_data):
headers = {
‘Authorization’: f’Bearer {WHISPER_API_KEY}’,
‘Content-Type’: ‘audio/wav; codecs=pcm_s16le’,
}
files = {‘file’: (‘audio.wav’, audio_data, ‘audio/wav; codecs=pcm_s16le’)}
response = requests.post(WHISPER_API_URL, headers=headers, files=files)
return response.json()
def capture_and_process_audio(duration=5, chunk_size=4096, fs=44100, channels=1):
def capture_thread():
nonlocal stream, finished
while not finished:
data = sd.rec(chunk_size, samplerate=fs, channels=channels, dtype=’int16’)
if data.size > 0:
audio_stream.write(data.tobytes())
audio_stream = io.BytesIO()
sd.default.samplerate = fs
sd.default.channels = channels
sd.default.dtype = 'int16'
stream = sd.InputStream(callback=lambda x, y, z, frames, time, status: None)
finished = False
capture_thread = Thread(target=capture_thread)
capture_thread.start()
# 模拟录音时长
import time
time.sleep(duration)
finished = True
capture_thread.join()\n stream.stop

发表评论
登录后可评论,请前往 登录 或 注册