Skip to content

Commit b7562a6

Browse files
authored
handle decoder errors and tts prewarm (#1587)
- added a tts.prewarm method to start the connection pool early. we should do this automatically in v1. currently just the stubs - in agent_output, combined both streaming and non-streaming synthesis paths - fixed a bug in AudioStreamDecoder where it could fail on close - deprecated elevenlabs' `optimize_stream_latency` option
1 parent b92a506 commit b7562a6

File tree

19 files changed

+164
-99
lines changed

19 files changed

+164
-99
lines changed

.changeset/blue-ants-heal.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"livekit-plugins-elevenlabs": patch
3+
"livekit-plugins-cartesia": patch
4+
"livekit-plugins-deepgram": patch
5+
"livekit-agents": patch
6+
---
7+
8+
added a tts.prewarm method to start the connection pool early.

.changeset/eleven-brooms-watch.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"livekit-agents": patch
3+
---
4+
5+
fixed a bug in AudioStreamDecoder where it could fail on close
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"livekit-plugins-elevenlabs": patch
3+
---
4+
5+
deprecated elevenlabs' optimize_stream_latency option

examples/voice-pipeline-agent/function_calling_weather.py

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
import asyncio
12
import logging
2-
import random
33
from typing import Annotated
44

55
import aiohttp
@@ -11,8 +11,9 @@
1111
WorkerOptions,
1212
cli,
1313
llm,
14+
metrics,
1415
)
15-
from livekit.agents.pipeline import AgentCallContext, VoicePipelineAgent
16+
from livekit.agents.pipeline import VoicePipelineAgent
1617
from livekit.plugins import deepgram, openai, silero
1718

1819
load_dotenv()
@@ -50,25 +51,21 @@ async def get_weather(
5051
# that it might take awhile:
5152
# Option 1: you can use .say filler message immediately after the call is triggered
5253
# Option 2: you can prompt the agent to return a text response when it's making a function call
53-
agent = AgentCallContext.get_current().agent
54-
55-
if (
56-
not agent.chat_ctx.messages
57-
or agent.chat_ctx.messages[-1].role != "assistant"
58-
):
59-
# skip if assistant already said something
60-
filler_messages = [
61-
"Let me check the weather in {location} for you.",
62-
"Let me see what the weather is like in {location} right now.",
63-
# LLM will complete this sentence if it is added to the end of the chat context
64-
"The current weather in {location} is ",
65-
]
66-
message = random.choice(filler_messages).format(location=location)
67-
logger.info(f"saying filler message: {message}")
68-
69-
# NOTE: set add_to_chat_ctx=True will add the message to the end
70-
# of the chat context of the function call for answer synthesis
71-
speech_handle = await agent.say(message, add_to_chat_ctx=True) # noqa: F841
54+
55+
# uncomment for option 1
56+
# agent = AgentCallContext.get_current().agent
57+
# filler_messages = [
58+
# "Let me check the weather in {location} for you.",
59+
# "Let me see what the weather is like in {location} right now.",
60+
# # LLM will complete this sentence if it is added to the end of the chat context
61+
# "The current weather in {location} is ",
62+
# ]
63+
# message = random.choice(filler_messages).format(location=location)
64+
# logger.info(f"saying filler message: {message}")
65+
66+
# NOTE: set add_to_chat_ctx=True will add the message to the end
67+
# of the chat context of the function call for answer synthesis
68+
# speech_handle = await agent.say(message, add_to_chat_ctx=True) # noqa: F841
7269

7370
logger.info(f"getting weather for {latitude}, {longitude}")
7471
url = f"https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}&current=temperature_2m"
@@ -82,13 +79,17 @@ async def get_weather(
8279
"temperature": data["current"]["temperature_2m"],
8380
"temperature_unit": "Celsius",
8481
}
85-
logger.info(f"weather data: {weather_data}")
8682
else:
8783
raise Exception(
8884
f"Failed to get weather data, status code: {response.status}"
8985
)
9086

87+
# artificially delay the function call for testing
88+
await asyncio.sleep(2)
89+
logger.info(f"weather data: {weather_data}")
90+
9191
# (optional) To wait for the speech to finish before giving results of the function call
92+
# without waiting, the new speech result will be queued and played after current speech is finished
9293
# await speech_handle.join()
9394
return weather_data
9495

@@ -106,26 +107,37 @@ async def entrypoint(ctx: JobContext):
106107
"You are a weather assistant created by LiveKit. Your interface with users will be voice. "
107108
"You will provide weather information for a given location. "
108109
# when using option 1, you can suppress from the agent with prompt
109-
"do not return any text while calling the function."
110-
# uncomment this to use option 2
111-
# "when performing function calls, let user know that you are checking the weather."
110+
# "do not return any text while calling the function."
111+
# option 2 - using LLM to generate text for the function call
112+
"when performing function calls, let user know that you are checking the weather."
112113
),
113114
role="system",
114115
)
115116
participant = await ctx.wait_for_participant()
116117
agent = VoicePipelineAgent(
117118
vad=ctx.proc.userdata["vad"],
118119
stt=deepgram.STT(),
119-
llm=openai.LLM(model="gpt-4o-mini"),
120+
llm=openai.LLM(model="gpt-4o"),
120121
tts=openai.TTS(),
121122
fnc_ctx=fnc_ctx,
122123
chat_ctx=initial_chat_ctx,
123124
)
124125

126+
usage_collector = metrics.UsageCollector()
127+
128+
@agent.on("metrics_collected")
129+
def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
130+
metrics.log_metrics(mtrcs)
131+
usage_collector.collect(mtrcs)
132+
133+
async def log_usage():
134+
summary = usage_collector.get_summary()
135+
logger.info(f"Usage: ${summary}")
136+
125137
# Start the assistant. This will automatically publish a microphone track and listen to the participant.
126138
agent.start(ctx.room, participant)
127139
await agent.say(
128-
"Hello from the weather station. Would you like to know the weather? If so, tell me your location."
140+
"Hello from the weather station. Tell me your location to check the weather."
129141
)
130142

131143

livekit-agents/livekit/agents/cli/log.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
"watchfiles",
2020
"anthropic",
2121
"websockets.client",
22+
"botocore",
23+
"aiobotocore",
2224
]
2325

2426

livekit-agents/livekit/agents/pipeline/agent_output.py

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,16 @@ async def _synthesize_task(self, handle: SynthesisHandle) -> None:
173173
if isinstance(transcript_source, Awaitable):
174174
transcript_source = await transcript_source
175175

176+
tts_stream: AsyncIterable[str] | None = None
176177
if isinstance(tts_source, str):
177-
co = self._str_synthesis_task(tts_source, transcript_source, handle)
178+
# wrap in async iterator
179+
async def string_to_stream(text: str):
180+
yield text
181+
182+
tts_stream = string_to_stream(tts_source)
178183
else:
179-
co = self._stream_synthesis_task(tts_source, transcript_source, handle)
184+
tts_stream = tts_source
185+
co = self._stream_synthesis_task(tts_stream, transcript_source, handle)
180186

181187
synth = asyncio.create_task(co)
182188
synth.add_done_callback(lambda _: handle._buf_ch.close())
@@ -205,41 +211,6 @@ async def _read_transcript_task(
205211
if inspect.isasyncgen(transcript_source):
206212
await transcript_source.aclose()
207213

208-
@utils.log_exceptions(logger=logger)
209-
async def _str_synthesis_task(
210-
self,
211-
tts_text: str,
212-
transcript_source: AsyncIterable[str] | str,
213-
handle: SynthesisHandle,
214-
) -> None:
215-
"""synthesize speech from a string"""
216-
read_transcript_atask: asyncio.Task | None = None
217-
218-
first_frame = True
219-
tts_stream = handle._tts.synthesize(tts_text)
220-
try:
221-
async for audio in tts_stream:
222-
if first_frame:
223-
first_frame = False
224-
read_transcript_atask = asyncio.create_task(
225-
self._read_transcript_task(transcript_source, handle)
226-
)
227-
228-
handle._buf_ch.send_nowait(audio.frame)
229-
if not handle.tts_forwarder.closed:
230-
handle.tts_forwarder.push_audio(audio.frame)
231-
232-
if not handle.tts_forwarder.closed:
233-
handle.tts_forwarder.mark_audio_segment_end()
234-
235-
if read_transcript_atask is not None:
236-
await read_transcript_atask
237-
finally:
238-
await tts_stream.aclose()
239-
240-
if read_transcript_atask is not None:
241-
await utils.aio.gracefully_cancel(read_transcript_atask)
242-
243214
@utils.log_exceptions(logger=logger)
244215
async def _stream_synthesis_task(
245216
self,

livekit-agents/livekit/agents/tts/fallback_adapter.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ def stream(
138138
conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
139139
)
140140

141+
def prewarm(self) -> None:
142+
if self._tts_instances:
143+
self._tts_instances[0].prewarm()
144+
141145
async def aclose(self) -> None:
142146
for tts_status in self._status:
143147
if tts_status.recovering_task is not None:

livekit-agents/livekit/agents/tts/stream_adapter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ def stream(
5555
sentence_tokenizer=self._sentence_tokenizer,
5656
)
5757

58+
def prewarm(self) -> None:
59+
self._tts.prewarm()
60+
5861

5962
class StreamAdapterWrapper(SynthesizeStream):
6063
def __init__(

livekit-agents/livekit/agents/tts/tts.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ def stream(
9898
"streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter"
9999
)
100100

101+
def prewarm(self) -> None:
102+
"""Pre-warm connection to the TTS service"""
103+
pass
104+
101105
async def aclose(self) -> None: ...
102106

103107
async def __aenter__(self) -> TTS:

livekit-agents/livekit/agents/utils/codecs/decoder.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from concurrent.futures import ThreadPoolExecutor
1818
from typing import AsyncIterator, Optional
1919

20+
from livekit.agents.log import logger
2021
from livekit.agents.utils import aio
2122

2223
try:
@@ -39,14 +40,14 @@ def __init__(self):
3940
self._buffer = io.BytesIO()
4041
self._lock = threading.Lock()
4142
self._data_available = threading.Condition(self._lock)
42-
self._eof = False # EOF flag to signal no more writes
43+
self._eof = False
4344

4445
def write(self, data: bytes):
4546
"""Write data to the buffer from a writer thread."""
46-
with self._data_available: # Lock and notify readers
47-
self._buffer.seek(0, io.SEEK_END) # Move to the end
47+
with self._data_available:
48+
self._buffer.seek(0, io.SEEK_END)
4849
self._buffer.write(data)
49-
self._data_available.notify_all() # Notify waiting readers
50+
self._data_available.notify_all()
5051

5152
def read(self, size: int = -1) -> bytes:
5253
"""Read data from the buffer in a reader thread."""
@@ -56,21 +57,21 @@ def read(self, size: int = -1) -> bytes:
5657

5758
with self._data_available:
5859
while True:
59-
self._buffer.seek(0) # Rewind for reading
60+
if self._buffer.closed:
61+
return b""
62+
# always read from beginning
63+
self._buffer.seek(0)
6064
data = self._buffer.read(size)
6165

62-
# If data is available, return it
6366
if data:
64-
# Shrink the buffer to remove already-read data
67+
# shrink the buffer to remove already-read data
6568
remaining = self._buffer.read()
6669
self._buffer = io.BytesIO(remaining)
6770
return data
6871

69-
# If EOF is signaled and no data remains, return EOF
7072
if self._eof:
7173
return b""
7274

73-
# Wait for more data
7475
self._data_available.wait()
7576

7677
def end_input(self):
@@ -129,15 +130,15 @@ def end_input(self):
129130
self._input_buf.end_input()
130131

131132
def _decode_loop(self):
132-
container = av.open(self._input_buf)
133-
audio_stream = next(s for s in container.streams if s.type == "audio")
134-
resampler = av.AudioResampler(
135-
# convert to signed 16-bit little endian
136-
format="s16",
137-
layout=self._layout,
138-
rate=self._sample_rate,
139-
)
140133
try:
134+
container = av.open(self._input_buf)
135+
audio_stream = next(s for s in container.streams if s.type == "audio")
136+
resampler = av.AudioResampler(
137+
# convert to signed 16-bit little endian
138+
format="s16",
139+
layout=self._layout,
140+
rate=self._sample_rate,
141+
)
141142
# TODO: handle error where audio stream isn't found
142143
if not audio_stream:
143144
return
@@ -157,6 +158,8 @@ def _decode_loop(self):
157158
),
158159
)
159160
)
161+
except Exception:
162+
logger.exception("error decoding audio")
160163
finally:
161164
self._output_ch.close()
162165

@@ -175,8 +178,9 @@ async def aclose(self):
175178
self._closed = True
176179
self.end_input()
177180
self._input_buf.close()
178-
# wait for decode loop to finish
181+
# wait for decode loop to finish, only if anything's been pushed
179182
try:
180-
await self._output_ch.recv()
183+
if self._started:
184+
await self._output_ch.recv()
181185
except aio.ChanClosed:
182186
pass

0 commit comments

Comments
 (0)