Skip to content

Commit 1e825a1

Browse files
committed
add back on_playback_started
1 parent 7139879 commit 1e825a1

File tree

3 files changed

+27
-31
lines changed

3 files changed

+27
-31
lines changed

livekit-agents/livekit/agents/voice/generation.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,12 @@ async def _audio_forwarding_task(
360360

361361
try:
362362
audio_output.resume()
363+
364+
@audio_output.on("playback_started")
365+
def _on_playback_started(ev: io.PlaybackStartedEvent) -> None:
366+
if not out.first_frame_fut.done():
367+
out.first_frame_fut.set_result(ev.created_at)
368+
363369
async for frame in tts_output:
364370
out.audio.append(frame)
365371

@@ -381,9 +387,6 @@ async def _audio_forwarding_task(
381387
else:
382388
await audio_output.capture_frame(frame)
383389

384-
if not out.first_frame_fut.done():
385-
out.first_frame_fut.set_result(time.time())
386-
387390
if resampler:
388391
for frame in resampler.flush():
389392
await audio_output.capture_frame(frame)

livekit-agents/livekit/agents/voice/io.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,18 @@ class PlaybackFinishedEvent:
126126
When None, the transcript is not synchronized with the playback"""
127127

128128

129+
@dataclass
130+
class PlaybackStartedEvent:
131+
created_at: float
132+
"""The timestamp (time.time())when the playback started"""
133+
134+
129135
@dataclass
130136
class AudioOutputCapabilities:
131137
pause: bool
132138

133139

134-
class AudioOutput(ABC, rtc.EventEmitter[Literal["playback_finished"]]):
140+
class AudioOutput(ABC, rtc.EventEmitter[Literal["playback_finished", "playback_started"]]):
135141
def __init__(
136142
self,
137143
*,
@@ -167,6 +173,9 @@ def __init__(
167173
synchronized_transcript=ev.synchronized_transcript,
168174
),
169175
)
176+
self.next_in_chain.on(
177+
"playback_started", lambda ev: self.on_playback_started(created_at=ev.created_at)
178+
)
170179

171180
@property
172181
def label(self) -> str:
@@ -176,6 +185,9 @@ def label(self) -> str:
176185
def next_in_chain(self) -> AudioOutput | None:
177186
return self.__next_in_chain
178187

188+
def on_playback_started(self, *, created_at: float) -> None:
189+
self.emit("playback_started", PlaybackStartedEvent(created_at=created_at))
190+
179191
def on_playback_finished(
180192
self,
181193
*,

livekit-agents/livekit/agents/voice/room_io/_output.py

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import time
45

56
from livekit import rtc
67

@@ -17,10 +18,6 @@
1718
from ..transcription import find_micro_track_id
1819

1920

20-
class _InterruptedError(Exception):
21-
pass
22-
23-
2421
class _ParticipantAudioOutput(io.AudioOutput):
2522
def __init__(
2623
self,
@@ -60,8 +57,7 @@ def __init__(
6057

6158
self._playback_enabled = asyncio.Event()
6259
self._playback_enabled.set()
63-
64-
self._first_frame_fut: asyncio.Future[None] | None = None
60+
self._first_frame_event = asyncio.Event()
6561

6662
async def _publish_track(self) -> None:
6763
async with self._lock:
@@ -103,21 +99,9 @@ async def capture_frame(self, frame: rtc.AudioFrame) -> None:
10399
await self._flush_task
104100

105101
for f in self._audio_bstream.push(frame.data):
106-
if self._pushed_duration == 0:
107-
self._first_frame_fut = asyncio.Future[None]()
108-
109102
await self._audio_buf.send(f)
110103
self._pushed_duration += f.duration
111104

112-
# wait for the first frame to be captured
113-
if self._first_frame_fut and not self._first_frame_fut.done():
114-
try:
115-
await self._first_frame_fut
116-
except _InterruptedError:
117-
continue
118-
finally:
119-
self._first_frame_fut = None
120-
121105
def flush(self) -> None:
122106
super().flush()
123107

@@ -150,6 +134,7 @@ def pause(self) -> None:
150134
def resume(self) -> None:
151135
super().resume()
152136
self._playback_enabled.set()
137+
self._first_frame_event.clear()
153138

154139
async def _wait_for_playout(self) -> None:
155140
wait_for_interruption = asyncio.create_task(self._interrupted_event.wait())
@@ -185,9 +170,7 @@ async def _wait_buffered_audio() -> None:
185170

186171
self._pushed_duration = 0
187172
self._interrupted_event.clear()
188-
if self._first_frame_fut and not self._first_frame_fut.done():
189-
self._first_frame_fut.set_exception(_InterruptedError())
190-
self._first_frame_fut = None
173+
self._first_frame_event.clear()
191174
self.on_playback_finished(playback_position=pushed_duration, interrupted=interrupted)
192175

193176
async def _forward_audio(self) -> None:
@@ -198,18 +181,16 @@ async def _forward_audio(self) -> None:
198181
# TODO(long): save the frames in the queue and play them later
199182
# TODO(long): ignore frames from previous syllable
200183

201-
if self._interrupted_event.is_set() or (
202-
self._pushed_duration == 0 and not self._first_frame_fut
203-
):
184+
if self._interrupted_event.is_set() or self._pushed_duration == 0:
204185
if self._interrupted_event.is_set() and self._flush_task:
205186
await self._flush_task
206187

207188
# ignore frames if interrupted
208189
continue
209190

210-
if self._first_frame_fut and not self._first_frame_fut.done():
211-
self._first_frame_fut.set_result(None)
212-
self._first_frame_fut = None
191+
if not self._first_frame_event.is_set():
192+
self._first_frame_event.set()
193+
self.on_playback_started(created_at=time.time())
213194
await self._audio_source.capture_frame(frame)
214195

215196
def _on_reconnected(self) -> None:

0 commit comments

Comments
 (0)