Skip to content

Commit d089090

Browse files
sauhardjainclaudeakavi
authored
Use AgentSendText(interruptible=False) for uninterruptible output (#147)
* Use AgentSendText(interruptible=False) for uninterruptible output * Clarify uninterruptible ack-back debug logs * Apply pre-commit formatting and revert version bump * Checkpoint Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Handle interuptible -> uninterruptible * Remove * Clean up * Lint * Refactor to centralize uninterruptibility computation into _get_processed_history * Apply ruff formatting Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Skip deduped ack-backs in run loop to prevent ghost event re-firing When _get_processed_history deduplicates a late AgentTextSent ack-back (already pre-committed as uninterruptible text), processed_history[-1] no longer corresponds to the raw event. This caused _process_input_event to construct a bogus event from the wrong processed entry, re-triggering run/cancel filters and producing repeated UserTurnStarted/UserTurnEnded. Fix: return None from _process_input_event when the raw AgentTextSent was consumed by dedup, and skip it in the run loop. --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Arjun Mariputtana Kavi <arjun.kavi@cartesia.ai>
1 parent 180b0e0 commit d089090

File tree

6 files changed

+698
-30
lines changed

6 files changed

+698
-30
lines changed

line/_harness_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class DTMFOutput(BaseModel):
7878
class MessageOutput(BaseModel):
7979
type: Literal["message"] = "message"
8080
content: str
81+
interruptible: bool = True
8182

8283

8384
class ToolCallOutput(BaseModel):

line/events.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def _generate_event_id() -> str:
2121
class AgentSendText(BaseModel):
2222
type: Literal["agent_send_text"] = "agent_send_text"
2323
text: str
24+
interruptible: bool = True
2425

2526

2627
class AgentToolCalled(BaseModel):

line/llm_agent/llm_agent.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ async def process(
175175
async for output in self._handoff_target(env, event):
176176
self.history._append_local(output)
177177
yield output
178+
# Keep turn timing consistent across all process paths, including handoffs.
178179
yield LogMetric(name="agent_turn_ms", value=(time.perf_counter() - turn_start_time) * 1000)
179180
return
180181

line/voice_agent_app.py

Lines changed: 173 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import os
1717
import re
1818
import traceback
19-
from typing import Any, AsyncIterable, Awaitable, Callable, Dict, List, Optional
19+
from typing import Any, AsyncIterable, Awaitable, Callable, Dict, List, Optional, Tuple
2020
from urllib.parse import urlencode
2121

2222
from dotenv import load_dotenv
@@ -282,9 +282,7 @@ def __init__(self, websocket: WebSocket, agent_spec: AgentSpec, env: AgentEnv):
282282
self.env = env
283283
self.shutdown_event = asyncio.Event()
284284
self.history: List[InputEvent] = []
285-
self.emitted_agent_text: str = (
286-
"" # Buffer for all AgentSendText content (for whitespace interpolation)
287-
)
285+
self.emitted_agent_text: List[Tuple[str, bool]] = [] # (content, interruptible)
288286

289287
self.agent_callable, self.run_filter, self.cancel_filter = self._prepare_agent(agent_spec)
290288
self.agent_task: Optional[asyncio.Task] = None
@@ -358,6 +356,8 @@ async def run(self):
358356
# Convert and process the input message
359357
event = self._convert_input_message(input_msg)
360358
ev, self.history = self._process_input_event(self.history, event)
359+
if ev is None:
360+
continue
361361
await self._handle_event(TurnEnv(), ev)
362362

363363
except WebSocketDisconnect:
@@ -404,9 +404,8 @@ async def _start_agent_task(self, turn_env: TurnEnv, event: InputEvent) -> None:
404404
async def runner():
405405
try:
406406
async for output in self.agent_callable(turn_env, event):
407-
# Buffer AgentSendText content for whitespace interpolation
408407
if isinstance(output, AgentSendText):
409-
self.emitted_agent_text += output.text
408+
self.emitted_agent_text.append((output.text, output.interruptible))
410409
mapped = self._map_output_event(output)
411410

412411
if self.shutdown_event.is_set():
@@ -495,11 +494,14 @@ def _turn_content(
495494

496495
def _process_input_event(
497496
self, history: List[InputEvent], raw_event: InputEvent
498-
) -> tuple[InputEvent, List[InputEvent]]:
497+
) -> tuple[Optional[InputEvent], List[InputEvent]]:
499498
"""Create an InputEvent including history from an InputEvent (with history=None).
500499
501500
The raw history is updated with the new event, but the history passed to
502501
the InputEvent is processed to restore whitespace in AgentTextSent events.
502+
503+
Returns None for the event when an AgentTextSent ack-back is consumed by
504+
deduplication (already pre-committed as uninterruptible text).
503505
"""
504506
raw_history = history + [raw_event]
505507
# Process history to restore whitespace before passing to agent
@@ -508,6 +510,10 @@ def _process_input_event(
508510
# Extract base data excluding history (we'll set it explicitly)
509511
base_data = {k: v for k, v in processed_event.model_dump().items() if k != "history"}
510512
if type(processed_event) is not type(raw_event):
513+
if isinstance(raw_event, AgentTextSent):
514+
# Ack-back was consumed by dedup — skip it
515+
logger.debug(f'Ack-back "{raw_event.content}" consumed by dedup (already pre-committed)')
516+
return None, raw_history
511517
logger.warning(
512518
f"Processed event type {type(processed_event).__name__} "
513519
f"differs from raw event type {type(raw_event).__name__}"
@@ -589,8 +595,11 @@ def _truncate_dict_for_ws(
589595
def _map_output_event(self, event: OutputEvent) -> OutputMessage:
590596
"""Convert OutputEvent to websocket OutputMessage."""
591597
if isinstance(event, AgentSendText):
592-
logger.info(f'<- 🤖🗣️ Agent said: "{event.text}"')
593-
return MessageOutput(content=event.text)
598+
if event.interruptible:
599+
logger.info(f'<- 🤖🗣️ Agent said: "{event.text}"')
600+
else:
601+
logger.info(f'<- 🤖🔒 Agent said (uninterruptible): "{event.text}"')
602+
return MessageOutput(content=event.text, interruptible=event.interruptible)
594603
if isinstance(event, AgentSendDtmf):
595604
logger.info(f"<- 🤖🔔 Agent DTMF sent: {event.button}")
596605
return DTMFOutput(button=event.button)
@@ -649,41 +658,73 @@ def _map_output_event(self, event: OutputEvent) -> OutputMessage:
649658
return ErrorOutput(content=f"Unhandled output event type: {type(event).__name__}")
650659

651660

652-
def _get_processed_history(pending_text: str, history: List[InputEvent]) -> List[InputEvent]:
661+
def _get_processed_history(
662+
emitted_chunks: List[Tuple[str, bool]],
663+
history: List[InputEvent],
664+
) -> List[InputEvent]:
653665
"""
654-
Process history to reinterpolate whitespace into AgentTextSent events.
655-
656-
The TTS system strips whitespace when confirming what was spoken. This method
657-
uses the buffered AgentSendText content to restore proper whitespace formatting
658-
in the history passed to the agent's process method.
666+
Process history to:
667+
1. Restore whitespace in AgentTextSent events (TTS strips it)
668+
2. Pre-commit uninterruptible text before user events
669+
3. Deduplicate late ack-backs for pre-committed text
659670
660671
Args:
661-
pending_text: Accumulated text from AgentSendText events (with whitespace)
672+
emitted_chunks: List of (text, interruptible) from AgentSendText events
662673
history: Raw history containing AgentTextSent with stripped whitespace
663674
664675
Returns:
665-
Processed history with whitespace restored in AgentTextSent events
676+
Processed history with whitespace restored and uninterruptible text
677+
correctly ordered before user events
666678
"""
679+
full_emitted = "".join(text for text, _ in emitted_chunks)
680+
681+
# Build chunk boundaries: (start, end, interruptible)
682+
chunk_boundaries: List[Tuple[int, int, bool]] = []
683+
pos = 0
684+
for text, interruptible in emitted_chunks:
685+
chunk_boundaries.append((pos, pos + len(text), interruptible))
686+
pos += len(text)
687+
667688
processed_events: List[InputEvent] = []
668689
committed_text_buffer = ""
690+
pending_text = full_emitted
691+
pre_committed_dedup = "" # pre-committed text awaiting ack-back consumption
692+
669693
for event in history:
670694
if isinstance(event, AgentTextSent):
671-
committed_text_buffer += event.content
695+
content = event.content
696+
# Consume against pre-committed text to avoid double-counting
697+
if pre_committed_dedup:
698+
_, remaining_content, remaining_pre = _consume_expected_ack_back_prefix(
699+
content, pre_committed_dedup
700+
)
701+
pre_committed_dedup = remaining_pre
702+
content = remaining_content
703+
if content:
704+
committed_text_buffer += content
672705
else:
673706
committed_text, committed_text_buffer, pending_text = _parse_committed(
674707
committed_text_buffer, pending_text
675708
)
709+
710+
# Check if we need to pre-commit uninterruptible text
711+
pre_commit = _compute_uninterruptible_precommit(
712+
len(full_emitted) - len(pending_text), chunk_boundaries, full_emitted
713+
)
714+
if pre_commit:
715+
committed_text = (committed_text or "") + pre_commit
716+
pending_text = pending_text[len(pre_commit) :]
717+
pre_committed_dedup += pre_commit
718+
676719
if committed_text:
677720
processed_events.append(AgentTextSent(content=committed_text))
678721
if isinstance(event, (AgentTurnEnded, CallEnded)) and committed_text_buffer:
679722
logger.warning(
680723
f"Unexpected committed text buffer at end of turn/call: '{committed_text_buffer}'"
681724
)
682-
# this is a hack to basically "throw up our hands" when we see the end of an agent turn"
683-
# we commit all buffered text immediately, even if it doesn't match perfectly with
684-
# pending_text. Since we can't _exactly_ know what text was dropped by Sonic's wordstamps
685-
# (and therefore what text `AgentTextSent` events contain) we will inevitably have mismatches
686-
# between our pending text and the committed text we get from `AgentTextSent` events.
725+
# Commit all buffered text at turn/call boundaries even if it
726+
# doesn't align perfectly with pending_text — inevitable mismatches
727+
# from TTS wordstamp drops.
687728
processed_events.append(AgentTextSent(content=committed_text_buffer))
688729
committed_text_buffer = ""
689730
processed_events.append(event)
@@ -694,6 +735,56 @@ def _get_processed_history(pending_text: str, history: List[InputEvent]) -> List
694735
return processed_events
695736

696737

738+
def _compute_uninterruptible_precommit(
739+
consumed_pos: int,
740+
chunk_boundaries: List[Tuple[int, int, bool]],
741+
full_emitted: str,
742+
) -> str:
743+
"""Compute text to pre-commit at a user event boundary.
744+
745+
We only pre-commit when consumed_pos falls strictly inside a chunk
746+
(start < consumed_pos < end), meaning we have ack-back evidence that
747+
the chunk was actively being spoken. This prevents retroactive
748+
pre-commits of chunks from future turns whose text hasn't been
749+
acknowledged yet.
750+
751+
If the current chunk is uninterruptible, we pre-commit the remainder
752+
of it plus any consecutive uninterruptible chunks that follow.
753+
754+
Returns:
755+
The text to pre-commit (empty string if nothing to pre-commit).
756+
"""
757+
if not chunk_boundaries:
758+
return ""
759+
760+
# Find the chunk strictly containing consumed_pos
761+
current_idx = None
762+
for i, (start, end, _) in enumerate(chunk_boundaries):
763+
if start < consumed_pos < end:
764+
current_idx = i
765+
break
766+
767+
if current_idx is None:
768+
return ""
769+
770+
_, current_end, current_interruptible = chunk_boundaries[current_idx]
771+
if current_interruptible:
772+
return ""
773+
774+
# Current chunk is uninterruptible — pre-commit the rest of it
775+
precommit_end = current_end
776+
777+
# Also pre-commit consecutive uninterruptible chunks that follow
778+
for i in range(current_idx + 1, len(chunk_boundaries)):
779+
_, end, interruptible = chunk_boundaries[i]
780+
if not interruptible:
781+
precommit_end = end
782+
else:
783+
break
784+
785+
return full_emitted[consumed_pos:precommit_end]
786+
787+
697788
def _parse_committed(committed_buffer_text: str, pending_text: str) -> tuple[str, str, str]:
698789
"""
699790
Parse committed text by aligning it character-by-character against pending_text
@@ -777,6 +868,65 @@ def _parse_committed(committed_buffer_text: str, pending_text: str) -> tuple[str
777868
return committed_str, remaining_committed, remaining_pending
778869

779870

871+
def _consume_expected_ack_back_prefix(committed_text: str, pending_text: str) -> tuple[int, str, str]:
872+
"""Consume the longest strict prefix of committed_text that matches pending_text.
873+
874+
This is stricter than _parse_committed: it never skips arbitrary pending text
875+
to find a later match. It only tolerates:
876+
- characters stripped by the harness in pending_text (e.g. whitespace/emoji)
877+
- full stops inserted by TTS in committed_text
878+
879+
Returns:
880+
(consumed_chars, remaining_committed, remaining_pending)
881+
"""
882+
if not committed_text or not pending_text:
883+
return 0, committed_text, pending_text
884+
885+
i = 0 # pointer into committed_text
886+
j = 0 # pointer into pending_text
887+
matched = False # whether at least one real character pair matched
888+
889+
while i < len(committed_text) and j < len(pending_text):
890+
c = committed_text[i]
891+
p = pending_text[j]
892+
893+
if c == p:
894+
matched = True
895+
i += 1
896+
j += 1
897+
elif _is_stripped_by_harness(p):
898+
j += 1
899+
elif c in FULL_STOP_CHARS:
900+
i += 1
901+
else:
902+
break
903+
904+
# Skip any trailing TTS-inserted full stops in remaining committed text
905+
# (mirrors the same sweep in _parse_committed).
906+
while i < len(committed_text) and committed_text[i] in FULL_STOP_CHARS:
907+
i += 1
908+
909+
if not matched:
910+
# No real character matched; full stops and whitespace alone are not
911+
# sufficient to count as a prefix match.
912+
return 0, committed_text, pending_text
913+
914+
# Only accept if at least one side was fully consumed (true prefix
915+
# relationship). A partial match where both sides have leftover chars
916+
# is a coincidental overlap (e.g. "bye" vs stale "bcd" matching only
917+
# the leading 'b') and must be rejected to prevent transcript corruption.
918+
if i < len(committed_text) and j < len(pending_text):
919+
return 0, committed_text, pending_text
920+
921+
if i == len(committed_text):
922+
# If committed text is fully consumed, drop trailing pending chars that
923+
# the harness would strip anyway so they don't get stuck forever.
924+
while j < len(pending_text) and _is_stripped_by_harness(pending_text[j]):
925+
j += 1
926+
927+
return i, committed_text[i:], pending_text[j:]
928+
929+
780930
# Regex to match strings consisting entirely of emoji characters
781931
EMOJI_REGEX = re.compile(
782932
r"^["

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ target-version = 'py39'
6868

6969
[tool.pytest.ini_options]
7070
asyncio_mode = "auto"
71+
norecursedirs = ["line/llm_agent/scripts", ".claude"]
7172

7273
[tool.ruff.lint]
7374
select = ["E", "F", "I", "B", "C4"]

0 commit comments

Comments
 (0)