Skip to content

如何实现python使用pyaudio持续向websocket接口上传音频,使用ServerVAD模式? #5

@qq839875761

Description

@qq839875761
import time
import asyncio
import base64
import os
import signal
import wave
from io import BytesIO
from typing import Optional

import pyaudio

from rtclient import RTLowLevelClient
from rtclient.models import (
    InputAudioBufferAppendMessage,
    ServerVAD,
    SessionUpdateMessage,
    SessionUpdateParams,
    InputVideoFrameAppendMessage
)

shutdown_event: Optional[asyncio.Event] = None

base_timestamp = int(time.time() * 1000)
VIDEO_INTERVAL = 500  # 每500ms发送一帧,2fps

def encode_image_to_base64(image_path: str) -> str:
    """
    将图片文件转换为base64编码
    Args:
        image_path: 图片文件路径
    Returns:
        base64编码的字符串
    """
    try:
        with open(image_path, 'rb') as image_file:
            return base64.b64encode(image_file.read()).decode('utf-8')
    except Exception as e:
        print(f"图片文件处理错误: {str(e)}")
        return None

async def send_video(client: RTLowLevelClient, image_file_path):
    image_base64 = encode_image_to_base64(image_file_path)
    """异步发送视频帧"""
    video_timestamp = base_timestamp
    for _ in range(2):  # 2fps
        video_message = InputVideoFrameAppendMessage(
            video_frame=image_base64,
            client_timestamp=video_timestamp
        )
        await client.send(video_message)
        video_timestamp += VIDEO_INTERVAL
        await asyncio.sleep(VIDEO_INTERVAL / 1000)

def handle_shutdown(sig=None, frame=None):
    """处理关闭信号"""
    if shutdown_event:
        print("\n正在关闭程序...")
        shutdown_event.set()

async def send_audio(client: RTLowLevelClient):
    """
    使用麦克风实时捕获音频并发送
    """
    try:
        # 初始化pyaudio
        p = pyaudio.PyAudio()

        # 设置音频流参数
        format = pyaudio.paInt16  # 16位深度
        channels = 1  # 单声道
        rate = 16000  # 采样率16kHz
        frame_size = 1536  # 固定帧大小(采样点数)
        step_ms = 32  # 发送间隔(毫秒)
        step_samples = int(rate * step_ms / 1000)  # 每步采样点数
        bytes_per_sample = 2  # 16位深度,2字节

        # 打开音频流
        stream = p.open(format=format,
                         channels=channels,
                         rate=rate,
                         input=True,
                         frames_per_buffer=step_samples)

        print("开始捕获麦克风音频...")

        while not shutdown_event.is_set():
            # 读取音频数据
            frame_bytes = stream.read(step_samples, exception_on_overflow=False)

            # 构造WAV格式
            wav_io = BytesIO()
            with wave.open(wav_io, 'wb') as wav_out:
                wav_out.setnchannels(channels)
                wav_out.setsampwidth(bytes_per_sample)
                wav_out.setframerate(rate)
                wav_out.writeframes(frame_bytes)

            # 发送数据
            wav_io.seek(0)
            base64_data = base64.b64encode(wav_io.getvalue()).decode('utf-8')
            message = InputAudioBufferAppendMessage(
                audio=base64_data,
                client_timestamp=int(asyncio.get_event_loop().time() * 1000)
            )

            try:
                await client.send(message)
                # await asyncio.sleep(step_ms / 1000)  # 等待下一帧
            except Exception as e:
                print(f"发送失败: {e}")
                break

    except Exception as e:
        print(f"音频处理失败: {e}")
    finally:
        if stream:
            stream.stop_stream()
            stream.close()
        if p:
            p.terminate()

async def receive_messages(client: RTLowLevelClient):
    try:
        while not client.closed:
            if shutdown_event.is_set():
                print("正在停止消息接收...")
                break

            try:
                message = await asyncio.wait_for(client.recv(), timeout=1.0)
                if message is None:
                    continue

                msg_type = message.type if hasattr(message, 'type') else message.get('type')
                if msg_type is None:
                    print("收到未知类型的消息:", message)
                    continue

                match msg_type:
                    case "session.created":
                        print("会话创建消息")
                        print(f"  Session Id: {message.session.id}")

                    case "error":
                        print("错误消息")
                        print(f"  Error: {message.error}")

                    case "session.updated":
                        print("会话更新消息")
                        print(f"updated session: {message.session}")

                    case "input_audio_buffer.speech_started":
                        print("语音开始消息")

                    case "input_audio_buffer.speech_stopped":
                        print("语音结束消息")
                    case "input_audio_buffer.committed":
                        print("输入音频缓冲区提交消息")

                    case "conversation.item.created":
                        print("会话项目创建消息")

                    case "conversation.item.input_audio_transcription.completed":
                        print("输入音频转写完成消息")
                        print(f"  Transcript: {message.transcript}")

                    case "response.created":
                        print("响应创建消息")
                        print(f"  Response Id: {message.response.id}")

                    case "response.done":
                        print("响应完成消息")
                        if hasattr(message, 'response'):
                            print(f"  Response Id: {message.response.id}")
                            print(f"  Status: {message.response.status}")

                    case "response.audio.delta":
                        print("模型音频增量消息")
                        print(f"  Response Id: {message.response_id}")
                        if message.delta:
                            print(f"  Delta Length: {len(message.delta)}")
                        else:
                            print("  Delta: None")

                    case "response.audio_transcript.delta":
                        print("模型音频文本增量消息")
                        print(f"  Response Id: {message.response_id}")
                        print(f"  Delta: {message.delta if message.delta else 'None'}")

                    case "response.function_call_arguments.done":
                        print("函数调用参数完成消息")
                        print(f"  Response Id: {message.response_id}")
                        print(f"  Arguments: {message.arguments if message.arguments else 'None'}")
                    case "response.audio.done":
                        print("模型音频完成消息")
                    case "response.audio_transcript.done":
                        print("模型音频文本完成消息")
                    case "heartbeat":
                        print("心跳消息")

                    case _:
                        print(f"未处理的消息类型: {msg_type}")
                        print(message)
            except TimeoutError:
                continue
            except Exception as e:
                if not shutdown_event.is_set():
                    print(f"接收消息时发生错误: {[e]}")
                break
    finally:
        if not client.closed:
            await client.close()
            print("WebSocket连接已关闭")

def get_env_var(var_name: str) -> str:
    value = os.environ.get(var_name)
    if not value:
        raise OSError(f"环境变量 '{var_name}' 未设置或为空。")
    return value

async def with_zhipu(image_path):
    global shutdown_event
    shutdown_event = asyncio.Event()

    for sig in (signal.SIGINT, signal.SIGTERM):
        signal.signal(sig, handle_shutdown)

    api_key = 'key'
    try:
        async with RTLowLevelClient(url="wss://open.bigmodel.cn/api/paas/v4/realtime",
                                    headers={"Authorization": f"Bearer {api_key}"}) as client:
            if shutdown_event.is_set():
                return

            session_message = SessionUpdateMessage(
                session=SessionUpdateParams(
                    input_audio_format="wav",
                    output_audio_format="pcm",
                    modalities={"audio", "text"},
                    turn_detection=ServerVAD(),
                    beta_fields={
                        "chat_mode": "video_passive",
                        "tts_source": "e2e",
                        "auto_search": False
                    },
                    tools=[]
                )
            )
            await client.send(session_message)

            if shutdown_event.is_set():
                return

            send_audio_task = asyncio.create_task(send_audio(client))
            send_image_task = asyncio.create_task(send_video(client, image_path))
            receive_task = asyncio.create_task(receive_messages(client))

            try:
                await asyncio.gather(send_audio_task, send_image_task, receive_task)
            except Exception as e:
                print(f"任务执行出错: {e}")
                for task in [send_audio_task, send_image_task, receive_task]:
                    if not task.done():
                        task.cancel()
                        try:
                            await task
                        except asyncio.CancelledError:
                            pass
    except Exception as e:
        print(f"发生错误: {e}")
    finally:
        if shutdown_event.is_set():
            print("程序已完成退出")

if __name__ == "__main__":
    image_path = 'programmer.jpg'

    try:
        asyncio.run(with_zhipu(image_path))
    except KeyboardInterrupt:
        print("\n程序被用户中断")
    except Exception as e:
        print(f"程序执行出错: {e}")
    finally:
        print("程序已退出")

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions