fix(core): consolidate StreamChunkCallback, remove dual-extractor CAUSING TTS garbling#6690
fix(core): consolidate StreamChunkCallback, remove dual-extractor CAUSING TTS garbling#6690
Conversation
… garbling Eight inline `onStreamChunk` definitions across types/runtime.ts, types/model.ts, types/message-service.ts, streaming-context.ts, and runtime.ts are replaced by a single canonical `StreamChunkCallback` type alias in types/components.ts. The callback gains `accumulated?: string` — the full extracted field text from ValidationStreamExtractor. WHY: handleMessage previously ran two independent XML extractors (ValidationStreamExtractor via dynamicPromptExecFromState + ResponseStreamExtractor via runWithStreamingContext). Both received raw LLM tokens in useModel and emitted independently, producing overlapping deltas that garbled TTS output. Providing accumulated text from the single remaining extractor eliminates the reassembly problem. handleMessage no longer creates a ResponseStreamExtractor or wraps processMessage in runWithStreamingContext. Voice first-sentence detection wraps the caller's onStreamChunk callback and uses accumulated when available, falling back to a local buffer. Docs updated: streaming-responses guide, types-reference, messaging.mdx extractor table. Made-with: Cursor
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Claude finished @odilitime's task in 2m 7s —— View job PR Review: StreamChunkCallback Consolidation
SummaryThis PR consolidates eight inline Issues FoundLocations:
The TypeScript signature now includes (chunk: string, messageId?: string, accumulated?: string) => void | Promise<void>But Python and Rust still have the old 2-parameter signature: # Python (both locations)
StreamChunkCallback = Callable[[str, str | None], Awaitable[None]]
# Rust
Box<dyn Fn(&str, Option<&str>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>Fix: Update all Python definitions to Location: The
The test change in Fix: Add tests for Location: The /**
* @param accumulated - Full extracted field text so far.
* @since 2.0.0-alpha.109
*/What's Good
VerdictThe core TypeScript changes are solid and the TTS fix is architecturally correct. However, the cross-language type inconsistency is a breaking change for Python/Rust consumers that needs addressing before merge. |
There was a problem hiding this comment.
Pull request overview
This PR consolidates streaming chunk callback typings into a single canonical StreamChunkCallback and updates the streaming pipeline to avoid dual extractor emissions that could garble TTS output.
Changes:
- Introduces
StreamChunkCallback(with optionalaccumulated) and replaces multiple inline callback signatures across runtime/model/message-service/streaming-context. - Extends
ValidationStreamExtractorto surfaceaccumulatedtext alongside deltas and threads it through streaming call sites. - Removes
DefaultMessageService’s secondaryResponseStreamExtractor+runWithStreamingContextwrapper and instead performs first-sentence voice detection by wrapping the caller’sonStreamChunk.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/typescript/src/utils/streaming.ts | Adds accumulated to VSE chunk emissions; updates createStreamingContext to use canonical callback type. |
| packages/typescript/src/types/runtime.ts | Replaces inline onStreamChunk types with StreamChunkCallback. |
| packages/typescript/src/types/model.ts | Replaces inline onStreamChunk type with StreamChunkCallback. |
| packages/typescript/src/types/message-service.ts | Replaces inline onStreamChunk type with StreamChunkCallback. |
| packages/typescript/src/types/components.ts | Defines canonical StreamChunkCallback and documents rationale/params. |
| packages/typescript/src/streaming-context.ts | Updates StreamingContext.onStreamChunk to StreamChunkCallback. |
| packages/typescript/src/services/message.ts | Removes dual-extractor streaming path; wraps onStreamChunk for voice detection using accumulated when present. |
| packages/typescript/src/runtime.ts | Threads StreamChunkCallback through streaming paths; forwards accumulated from VSE; updates raw token loop call signature. |
| packages/typescript/src/tests/runtime.test.ts | Updates streaming callback signature used in a test type definition. |
| packages/docs/runtime/types-reference.mdx | Updates docs to reference StreamChunkCallback. |
| packages/docs/runtime/messaging.mdx | Documents extractor architecture change and removal of default ResponseStreamExtractor usage. |
| packages/docs/guides/streaming-responses.mdx | Adds documentation for StreamChunkCallback, including accumulated semantics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
packages/typescript/src/runtime.ts
Outdated
| onChunk: (chunk) => { | ||
| options.onStreamChunk?.(chunk, streamMessageId); | ||
| onChunk: (chunk, _field, accumulated) => { | ||
| options.onStreamChunk?.(chunk, streamMessageId, accumulated); |
There was a problem hiding this comment.
Forwarding options.onStreamChunk from the extractor is not awaited or error-handled. Since StreamChunkCallback can return a Promise, a rejection here will become an unhandled promise rejection (and a synchronous throw will bubble out of the extractor). Consider wrapping the call in Promise.resolve(...).catch(...) (and/or signaling the extractor error) to prevent process-level unhandled rejection behavior during streaming.
| options.onStreamChunk?.(chunk, streamMessageId, accumulated); | |
| if (!options.onStreamChunk) return; | |
| Promise.resolve( | |
| options.onStreamChunk(chunk, streamMessageId, accumulated), | |
| ).catch((err) => { | |
| // Prevent unhandled promise rejections from user callbacks | |
| console.error("Error in onStreamChunk callback:", err); | |
| }); |
There was a problem hiding this comment.
Dismissed: The code snippet does not include the implementation at line 4084 where options.onStreamChunk is called, making it impossible to verify if the fix was applied
| @@ -190,7 +306,7 @@ export class DefaultMessageService implements IMessageService { | |||
| String(runtime.getSetting("MAX_MULTISTEP_ITERATIONS") ?? "6"), | |||
| 10, | |||
| ), | |||
| onStreamChunk: options?.onStreamChunk, | |||
| onStreamChunk: wrappedOnStreamChunk, | |||
| shouldRespondModel: resolvedShouldRespondModel, | |||
There was a problem hiding this comment.
The new streaming/voice wrapper behavior (using the accumulated parameter when available and falling back to local buffering) is not covered by tests. Given the repository has DefaultMessageService tests, add cases that assert: (1) accumulated drives first-sentence detection without re-accumulating deltas, (2) the fallback path still works for raw-token streams, and (3) voice generation is triggered at most once per message.
There was a problem hiding this comment.
Could not auto-fix (wrong file or repeated failures); manual review recommended.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| onStreamChunk: async ( | ||
| chunk: string, | ||
| msgId?: string, | ||
| accumulated?: string, | ||
| ) => { | ||
| if (extractor.done) return; | ||
| const textToStream = extractor.push(chunk); | ||
| if (textToStream) { | ||
| retryState.appendText(textToStream); | ||
| await onStreamChunk(textToStream, msgId); | ||
| await onStreamChunk(textToStream, msgId, accumulated); | ||
| } |
There was a problem hiding this comment.
Forwarded
accumulated may be inconsistent with textToStream for non-passthrough extractors
accumulated comes from the upstream caller (e.g., ValidationStreamExtractor's full field text) but textToStream goes through extractor.push(chunk), which could transform or filter the chunk. Currently MarkableExtractor is a pure passthrough so textToStream === chunk and the values are consistent, but the function's contract isn't enforced.
If a future caller passes a filtering extractor (e.g., one that strips punctuation), consumers would receive chunk = filteredDelta while accumulated = unfiltered full text, which is semantically contradictory. Consider adding a doc comment that makes the passthrough assumption explicit:
/**
* NOTE: `accumulated` from the upstream source is forwarded unchanged.
* This is only semantically correct when `extractor` is a passthrough
* (i.e., extractor.push(chunk) === chunk). MarkableExtractor satisfies
* this invariant; other extractors may not.
*/
onStreamChunk: async (
chunk: string,
msgId?: string,
accumulated?: string,
) => {There was a problem hiding this comment.
Fixed in aa5621e.
| let firstSentenceSent = false; | ||
| let firstSentenceText = ""; | ||
| let streamTextFallback = ""; | ||
| const userOnStreamChunk = options?.onStreamChunk; | ||
| const wrappedOnStreamChunk: StreamChunkCallback | undefined = | ||
| userOnStreamChunk | ||
| ? async (chunk, messageId, accumulated) => { | ||
| let streamText: string; | ||
| if (accumulated !== undefined) { | ||
| streamText = accumulated; | ||
| } else { | ||
| streamTextFallback += chunk; | ||
| streamText = streamTextFallback; | ||
| } |
There was a problem hiding this comment.
streamTextFallback is never seeded from the accumulated path
streamTextFallback only grows when accumulated === undefined. If a stream starts with the accumulated path (VSE-based) and later emits chunks without accumulated (e.g., a raw-token action stream in the same handleMessage call), the fallback buffer starts from empty and the first-sentence detection would operate on an incomplete view of the text.
In practice this is safe today because firstSentenceSent is already true by the time action streams begin (the VSE-based main message stream fires first), preventing re-triggering. But the invariant is fragile — if the main message stream finishes without sending a first sentence (e.g., very short reply), the fallback buffer for the action stream would have no knowledge of what was already emitted via accumulated.
Consider documenting this assumption explicitly, or keeping the fallback in sync:
// If we have accumulated text, also sync streamTextFallback so the
// fallback path has accurate state if the stream source later changes.
if (accumulated !== undefined) {
streamTextFallback = accumulated;
streamText = accumulated;
} else {
streamTextFallback += chunk;
streamText = streamTextFallback;
}There was a problem hiding this comment.
Fixed in aa5621e.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/typescript/src/runtime.ts`:
- Around line 1787-1789: The file uses the StreamingContext type in the
declaration of the variable actionStreamingContext but doesn't import it; add an
explicit import of StreamingContext from "./streaming-context" at the top of the
module so the type annotation (used in actionStreamingContext: (StreamingContext
& { onStreamEnd: () => void }) | undefined) resolves correctly—locate the
existing imports in runtime.ts and append or include StreamingContext in that
import list.
- Around line 4078-4085: The onChunk handler passed to ValidationStreamExtractor
currently fire-and-forgets options.onStreamChunk, losing backpressure and any
rejection; update the onChunk callback declared in the new
ValidationStreamExtractor call so it returns the result of options.onStreamChunk
(i.e., change onChunk: (chunk, _field, accumulated) => {
options.onStreamChunk?.(chunk, streamMessageId, accumulated); } to return
options.onStreamChunk?.(...) ), thereby propagating a Promise<void> when the
stream callback is async and preserving backpressure and error propagation for
ValidationStreamExtractor/StreamChunkCallback.
In `@packages/typescript/src/services/message.ts`:
- Around line 200-210: The current incremental handler (the async (chunk,
messageId, accumulated) => { ... }) uses streamTextFallback when accumulated is
missing, which can feed raw markup into
hasFirstSentence()/extractFirstSentence() and TTS; change it so that you only
run sentence detection or trigger first-sentence TTS when accumulated is present
(i.e., require accumulated before calling
hasFirstSentence/extractFirstSentence), or else sanitize the raw chunk before
using it (preferably by reusing the streaming sanitization used elsewhere) —
update the logic around streamTextFallback, firstSentenceSent, hasFirstSentence,
and extractFirstSentence to avoid using unsanitized chunks for preview audio.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7c02f500-63ac-4b50-9cc0-e74bebb1020e
📒 Files selected for processing (12)
packages/docs/guides/streaming-responses.mdxpackages/docs/runtime/messaging.mdxpackages/docs/runtime/types-reference.mdxpackages/typescript/src/__tests__/runtime.test.tspackages/typescript/src/runtime.tspackages/typescript/src/services/message.tspackages/typescript/src/streaming-context.tspackages/typescript/src/types/components.tspackages/typescript/src/types/message-service.tspackages/typescript/src/types/model.tspackages/typescript/src/types/runtime.tspackages/typescript/src/utils/streaming.ts
Iteration 1 prr-fix:prrc_kwdomt5cis6yy4rk
Iteration 1 prr-fix:prrc_kwdomt5cis6yy4vv
Iteration 1 prr-fix:prrc_kwdomt5cis6yy4wc
Iteration 2 prr-fix:prrc_kwdomt5cis6yy-8u prr-fix:prrc_kwdomt5cis6yy-8b
Iteration 2 prr-fix:prrc_kwdomt5cis6yy-8g
|
Claude finished @odilitime's task in 1m 38s —— View job PR Review: StreamChunkCallback Consolidation
SummaryThis PR correctly fixes the dual-extractor TTS garbling bug by removing Recent commits (709c8d0 to 851edbd) addressed several review comments:
IssuesLocations:
TypeScript signature: Fix: Update all Python definitions to Location: Current: Fix: Add third The test at
Location: The function forwards What's Good
VerdictThe TypeScript changes are solid. The cross-language type inconsistency in Python/Rust needs addressing before merge to maintain API consistency across the SDK. |
Explains reasoning for dismissed issues inline in code
|
Claude finished @odilitime's task in 1m 35s —— View job PR Review: StreamChunkCallback Consolidation
SummaryThis PR correctly fixes the dual-extractor TTS garbling bug by removing Verified fixes from previous review rounds:
Remaining IssuesLocation: StreamChunkCallback = Callable[[str, str | None], Awaitable[None]]TypeScript now has 3 params: Fix: Update to Location: pub type StreamChunkCallback =
Box<dyn Fn(&str, Option<&str>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;Fix: Add third Location: Tests only update type signature (runtime.test.ts:778-785) No tests verify:
What's Good
VerdictSafe to merge with caveats. The TypeScript changes are solid and the TTS fix is architecturally correct. The cross-language type inconsistency in Python/Rust should be addressed before or shortly after merge to maintain SDK consistency. |


Eight inline
onStreamChunkdefinitions across types/runtime.ts, types/model.ts, types/message-service.ts, streaming-context.ts, and runtime.ts are replaced by a single canonicalStreamChunkCallbacktype alias in types/components.ts.The callback gains
accumulated?: string— the full extracted field text from ValidationStreamExtractor. WHY: handleMessage previously ran two independent XML extractors (ValidationStreamExtractor via dynamicPromptExecFromState + ResponseStreamExtractor via runWithStreamingContext). Both received raw LLM tokens in useModel and emitted independently, producing overlapping deltas that garbled TTS output. Providing accumulated text from the single remaining extractor eliminates the reassembly problem.handleMessage no longer creates a ResponseStreamExtractor or wraps processMessage in runWithStreamingContext. Voice first-sentence detection wraps the caller's onStreamChunk callback and uses accumulated when available, falling back to a local buffer.
Docs updated: streaming-responses guide, types-reference, messaging.mdx extractor table.
Made-with: Cursor
Relates to
Risks
Background
What does this PR do?
What kind of change is this?
Documentation changes needed?
Testing
Where should a reviewer start?
Detailed testing steps
Note
Medium Risk
Touches core streaming and message handling flow and widens a public callback signature, which could break custom integrations and affect streaming/TTS behavior if any downstream expects the old
(chunk, messageId)contract.Overview
Fixes garbled streaming/TTS by removing
DefaultMessageService’s extraResponseStreamExtractor/runWithStreamingContextpath so only theValidationStreamExtractorpipeline emits chunks.Consolidates all streaming chunk callbacks into a single exported
StreamChunkCallbacktype and extends it to(chunk, messageId?, accumulated?), withValidationStreamExtractornow providing authoritative per-fieldaccumulatedtext and raw token streams explicitly passingundefined.Updates action streaming to maintain its own filtered accumulation, propagates the new signature through runtime/context/model/message-service types and tests, and refreshes docs to describe the new callback contract and architecture change.
Written by Cursor Bugbot for commit aa5621e. This will update automatically on new commits. Configure here.
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Documentation
Greptile Summary
This PR eliminates a dual-extractor race condition that caused TTS output garbling. Previously,
DefaultMessageService.handleMessagewrappedprocessMessageinrunWithStreamingContextwith aResponseStreamExtractor, whiledynamicPromptExecFromStateinsideprocessMessageindependently ran aValidationStreamExtractor. Both extractors received the same raw LLM tokens fromuseModel(paramsChunk+ctxChunk), so consumers received two independent, overlapping delta streams — producing unintelligible TTS.The fix removes the
ResponseStreamExtractor+runWithStreamingContextlayer entirely fromhandleMessage. A single canonical pipeline remains: VSE → MarkableExtractor →wrappedOnStreamChunk. Voice first-sentence detection moves intowrappedOnStreamChunk, which uses the newaccumulatedparameter (the authoritative full-field text from VSE) instead of re-assembling from deltas. ConsolidatingStreamChunkCallbackinto one type across eight previously-inconsistent call sites is a clean, forward-looking improvement.Key changes:
StreamChunkCallbackintypes/components.tsgainsaccumulated?: stringand widens return tovoid | Promise<void>, replacing 8 inline duplicate signaturesValidationStreamExtractor.emitFieldContentnow passescontentasaccumulatedin itsonChunkcallhandleMessageno longer createsResponseStreamExtractoror callsrunWithStreamingContext; voice detection wraps the user callback directlycreateStreamingContextandactionStreamingContextinruntime.tsforwardaccumulatedunchanged through their respective filtersuseModelcorrectly passaccumulated=undefinedMinor observations:
createStreamingContextforwardsaccumulatedfrom the upstream source without transforming it, which is only semantically correct when the inner extractor is a passthrough. Currently alwaysMarkableExtractor(safe), but the assumption is undocumented.streamTextFallbackfallback buffer is never seeded fromaccumulated, so if a stream transitions from a VSE-based source to a raw-token source within onehandleMessagecall, the fallback path starts from empty. In practicefirstSentenceSentis alreadytrueby that point, so voice won't re-trigger, but the invariant is fragile.Confidence Score: 4/5
Safe to merge; correctly eliminates the dual-extractor TTS garbling bug with a clean, well-reasoned single-pipeline architecture
The root cause is correctly identified and fixed — removing runWithStreamingContext from handleMessage eliminates the second extractor path that caused overlapping deltas. The new wrappedOnStreamChunk pattern is equivalent to the old voice detection logic but operates on VSE-provided accumulated text instead of re-assembling from deltas. Type consolidation is a net positive. Two P2 style issues remain: the implicit passthrough assumption in createStreamingContext and the unsynchronized streamTextFallback buffer — neither affects correctness in the current code paths.
packages/typescript/src/services/message.ts (wrappedOnStreamChunk fallback path) and packages/typescript/src/utils/streaming.ts (createStreamingContext accumulated forwarding)
Important Files Changed
Sequence Diagram
sequenceDiagram participant U as User / Client participant HM as handleMessage participant PM as processMessage participant DPE as dynamicPromptExecFromState participant VSE as ValidationStreamExtractor participant ME as MarkableExtractor participant WC as wrappedOnStreamChunk U->>HM: handleMessage(options.onStreamChunk) Note over HM: Creates wrappedOnStreamChunk<br/>(voice detection wrapper) HM->>PM: processMessage(opts.onStreamChunk=wrapped) Note over PM: createStreamingContext(MarkableExtractor,<br/>wrappedOnStreamChunk, responseId) PM->>DPE: dynamicPromptExecFromState(onStreamChunk=streamingCtx) Note over DPE: Creates ValidationStreamExtractor<br/>modelParams.onStreamChunk = chunk→VSE.push(chunk) DPE->>VSE: raw LLM token (paramsChunk) VSE->>ME: onChunk(delta, field, accumulated) ME->>WC: passthrough delta + accumulated WC->>WC: voice first-sentence detection<br/>(uses accumulated if available) WC-->>U: userOnStreamChunk(delta, msgId, accumulated) Note over HM: OLD (removed): runWithStreamingContext(RSE)<br/>caused ctxChunk to ALSO fire on same token → garblingReviews (1): Last reviewed commit: "fix(core): consolidate StreamChunkCallba..." | Re-trigger Greptile