forked from livekit/agents
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmulti-user-transcriber.py
More file actions
145 lines (108 loc) ยท 4.39 KB
/
multi-user-transcriber.py
File metadata and controls
145 lines (108 loc) ยท 4.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import asyncio
import logging
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import (
Agent,
AgentServer,
AgentSession,
AutoSubscribe,
JobContext,
JobProcess,
StopResponse,
cli,
llm,
room_io,
utils,
)
from livekit.plugins import deepgram, silero
load_dotenv()
logger = logging.getLogger("transcriber")
# This example demonstrates how to transcribe audio from multiple remote participants.
# It creates agent sessions for each participant and transcribes their audio.
class Transcriber(Agent):
def __init__(self, *, participant_identity: str):
super().__init__(
instructions="not-needed",
stt=deepgram.STT(),
)
self.participant_identity = participant_identity
async def on_user_turn_completed(self, chat_ctx: llm.ChatContext, new_message: llm.ChatMessage):
user_transcript = new_message.text_content
logger.info(f"{self.participant_identity} -> {user_transcript}")
raise StopResponse()
class MultiUserTranscriber:
def __init__(self, ctx: JobContext):
self.ctx = ctx
self._sessions: dict[str, AgentSession] = {}
self._tasks: set[asyncio.Task] = set()
def start(self):
self.ctx.room.on("participant_connected", self.on_participant_connected)
self.ctx.room.on("participant_disconnected", self.on_participant_disconnected)
async def aclose(self):
await utils.aio.cancel_and_wait(*self._tasks)
await asyncio.gather(*[self._close_session(session) for session in self._sessions.values()])
self.ctx.room.off("participant_connected", self.on_participant_connected)
self.ctx.room.off("participant_disconnected", self.on_participant_disconnected)
def on_participant_connected(self, participant: rtc.RemoteParticipant):
if participant.identity in self._sessions:
return
logger.info(f"starting session for {participant.identity}")
task = asyncio.create_task(self._start_session(participant))
self._tasks.add(task)
def on_task_done(task: asyncio.Task):
try:
self._sessions[participant.identity] = task.result()
finally:
self._tasks.discard(task)
task.add_done_callback(on_task_done)
def on_participant_disconnected(self, participant: rtc.RemoteParticipant):
if (session := self._sessions.pop(participant.identity)) is None:
return
logger.info(f"closing session for {participant.identity}")
task = asyncio.create_task(self._close_session(session))
self._tasks.add(task)
task.add_done_callback(lambda _: self._tasks.discard(task))
async def _start_session(self, participant: rtc.RemoteParticipant) -> AgentSession:
if participant.identity in self._sessions:
return self._sessions[participant.identity]
session = AgentSession(
vad=self.ctx.proc.userdata["vad"],
)
await session.start(
agent=Transcriber(
participant_identity=participant.identity,
),
room=self.ctx.room,
room_options=room_io.RoomOptions(
audio_input=True,
text_output=True,
audio_output=False,
participant_identity=participant.identity,
# text input is not supported for multiple room participants
# if needed, register the text stream handler by yourself
# and route the text to different sessions based on the participant identity
text_input=False,
),
)
return session
async def _close_session(self, sess: AgentSession) -> None:
await sess.drain()
await sess.aclose()
server = AgentServer()
@server.rtc_session()
async def entrypoint(ctx: JobContext):
transcriber = MultiUserTranscriber(ctx)
transcriber.start()
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
for participant in ctx.room.remote_participants.values():
# handle all existing participants
transcriber.on_participant_connected(participant)
async def cleanup():
await transcriber.aclose()
ctx.add_shutdown_callback(cleanup)
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarm
if __name__ == "__main__":
cli.run_app(server)