Skip to content

Commit a70f2a6

Browse files
authored
fix(executor): streaming after tool calls (#1963)
* Provider changes * Fix lint
1 parent 4e5b834 commit a70f2a6

File tree

11 files changed

+155
-27
lines changed

11 files changed

+155
-27
lines changed

apps/sim/executor/execution/block-executor.ts

Lines changed: 133 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import type {
2121
ExecutionContext,
2222
NormalizedBlockOutput,
2323
} from '@/executor/types'
24+
import { streamingResponseFormatProcessor } from '@/executor/utils'
2425
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
2526
import type { VariableResolver } from '@/executor/variables/resolver'
2627
import type { SerializedBlock } from '@/serializer/types'
@@ -100,11 +101,14 @@ export class BlockExecutor {
100101
const streamingExec = output as { stream: ReadableStream; execution: any }
101102

102103
if (ctx.onStream) {
103-
try {
104-
await ctx.onStream(streamingExec)
105-
} catch (error) {
106-
logger.error('Error in onStream callback', { blockId: node.id, error })
107-
}
104+
await this.handleStreamingExecution(
105+
ctx,
106+
node,
107+
block,
108+
streamingExec,
109+
resolvedInputs,
110+
ctx.selectedOutputs ?? []
111+
)
108112
}
109113

110114
normalizedOutput = this.normalizeOutput(
@@ -446,4 +450,128 @@ export class BlockExecutor {
446450
}
447451
}
448452
}
453+
454+
private async handleStreamingExecution(
455+
ctx: ExecutionContext,
456+
node: DAGNode,
457+
block: SerializedBlock,
458+
streamingExec: { stream: ReadableStream; execution: any },
459+
resolvedInputs: Record<string, any>,
460+
selectedOutputs: string[]
461+
): Promise<void> {
462+
const blockId = node.id
463+
464+
const responseFormat =
465+
resolvedInputs?.responseFormat ??
466+
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
467+
(block.config as Record<string, any> | undefined)?.responseFormat
468+
469+
const stream = streamingExec.stream
470+
if (typeof stream.tee !== 'function') {
471+
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
472+
return
473+
}
474+
475+
const [clientStream, executorStream] = stream.tee()
476+
477+
const processedClientStream = streamingResponseFormatProcessor.processStream(
478+
clientStream,
479+
blockId,
480+
selectedOutputs,
481+
responseFormat
482+
)
483+
484+
const clientStreamingExec = {
485+
...streamingExec,
486+
stream: processedClientStream,
487+
}
488+
489+
const executorConsumption = this.consumeExecutorStream(
490+
executorStream,
491+
streamingExec,
492+
blockId,
493+
responseFormat
494+
)
495+
496+
const clientConsumption = (async () => {
497+
try {
498+
await ctx.onStream?.(clientStreamingExec)
499+
} catch (error) {
500+
logger.error('Error in onStream callback', { blockId, error })
501+
}
502+
})()
503+
504+
await Promise.all([clientConsumption, executorConsumption])
505+
}
506+
507+
private async forwardStream(
508+
ctx: ExecutionContext,
509+
blockId: string,
510+
streamingExec: { stream: ReadableStream; execution: any },
511+
stream: ReadableStream,
512+
responseFormat: any,
513+
selectedOutputs: string[]
514+
): Promise<void> {
515+
const processedStream = streamingResponseFormatProcessor.processStream(
516+
stream,
517+
blockId,
518+
selectedOutputs,
519+
responseFormat
520+
)
521+
522+
try {
523+
await ctx.onStream?.({
524+
...streamingExec,
525+
stream: processedStream,
526+
})
527+
} catch (error) {
528+
logger.error('Error in onStream callback', { blockId, error })
529+
}
530+
}
531+
532+
private async consumeExecutorStream(
533+
stream: ReadableStream,
534+
streamingExec: { execution: any },
535+
blockId: string,
536+
responseFormat: any
537+
): Promise<void> {
538+
const reader = stream.getReader()
539+
const decoder = new TextDecoder()
540+
let fullContent = ''
541+
542+
try {
543+
while (true) {
544+
const { done, value } = await reader.read()
545+
if (done) break
546+
fullContent += decoder.decode(value, { stream: true })
547+
}
548+
} catch (error) {
549+
logger.error('Error reading executor stream for block', { blockId, error })
550+
} finally {
551+
try {
552+
reader.releaseLock()
553+
} catch {}
554+
}
555+
556+
if (!fullContent) {
557+
return
558+
}
559+
560+
const executionOutput = streamingExec.execution?.output
561+
if (!executionOutput || typeof executionOutput !== 'object') {
562+
return
563+
}
564+
565+
if (responseFormat) {
566+
try {
567+
const parsed = JSON.parse(fullContent.trim())
568+
Object.assign(executionOutput, parsed)
569+
return
570+
} catch (error) {
571+
logger.warn('Failed to parse streamed content for response format', { blockId, error })
572+
}
573+
}
574+
575+
executionOutput.content = fullContent
576+
}
449577
}

apps/sim/providers/anthropic/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -985,9 +985,9 @@ ${fieldDescriptions}
985985
const providerEndTimeISO = new Date(providerEndTime).toISOString()
986986
const totalDuration = providerEndTime - providerStartTime
987987

988-
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
989-
if (request.stream && iterationCount > 0) {
990-
logger.info('Using streaming for final Anthropic response after tool calls')
988+
// After all tool processing complete, if streaming was requested, use streaming for the final response
989+
if (request.stream) {
990+
logger.info('Using streaming for final Anthropic response after tool processing')
991991

992992
// When streaming after tool calls with forced tools, make sure tool_choice is removed
993993
// This prevents the API from trying to force tool usage again in the final streaming response

apps/sim/providers/azure-openai/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -523,9 +523,9 @@ export const azureOpenAIProvider: ProviderConfig = {
523523
iterationCount++
524524
}
525525

526-
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
527-
if (request.stream && iterationCount > 0) {
528-
logger.info('Using streaming for final response after tool calls')
526+
// After all tool processing complete, if streaming was requested, use streaming for the final response
527+
if (request.stream) {
528+
logger.info('Using streaming for final response after tool processing')
529529

530530
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
531531
// This prevents Azure OpenAI API from trying to force tool usage again in the final streaming response

apps/sim/providers/cerebras/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,8 @@ export const cerebrasProvider: ProviderConfig = {
455455
const totalDuration = providerEndTime - providerStartTime
456456

457457
// POST-TOOL-STREAMING: stream after tool calls if requested
458-
if (request.stream && iterationCount > 0) {
459-
logger.info('Using streaming for final Cerebras response after tool calls')
458+
if (request.stream) {
459+
logger.info('Using streaming for final Cerebras response after tool processing')
460460

461461
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
462462
// This prevents the API from trying to force tool usage again in the final streaming response

apps/sim/providers/deepseek/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,8 +457,8 @@ export const deepseekProvider: ProviderConfig = {
457457
const totalDuration = providerEndTime - providerStartTime
458458

459459
// POST-TOOL STREAMING: stream final response after tool calls if requested
460-
if (request.stream && iterationCount > 0) {
461-
logger.info('Using streaming for final DeepSeek response after tool calls')
460+
if (request.stream) {
461+
logger.info('Using streaming for final DeepSeek response after tool processing')
462462

463463
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
464464
// This prevents the API from trying to force tool usage again in the final streaming response

apps/sim/providers/groq/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,8 @@ export const groqProvider: ProviderConfig = {
374374
}
375375

376376
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
377-
if (request.stream && iterationCount > 0) {
378-
logger.info('Using streaming for final Groq response after tool calls')
377+
if (request.stream) {
378+
logger.info('Using streaming for final Groq response after tool processing')
379379

380380
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
381381
// This prevents the API from trying to force tool usage again in the final streaming response

apps/sim/providers/mistral/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,8 @@ export const mistralProvider: ProviderConfig = {
447447
iterationCount++
448448
}
449449

450-
if (request.stream && iterationCount > 0) {
451-
logger.info('Using streaming for final response after tool calls')
450+
if (request.stream) {
451+
logger.info('Using streaming for final response after tool processing')
452452

453453
const streamingPayload = {
454454
...payload,

apps/sim/providers/ollama/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,8 @@ export const ollamaProvider: ProviderConfig = {
529529
}
530530

531531
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
532-
if (request.stream && iterationCount > 0) {
533-
logger.info('Using streaming for final response after tool calls')
532+
if (request.stream) {
533+
logger.info('Using streaming for final response after tool processing')
534534

535535
const streamingPayload = {
536536
...payload,

apps/sim/providers/openai/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -504,9 +504,9 @@ export const openaiProvider: ProviderConfig = {
504504
iterationCount++
505505
}
506506

507-
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
508-
if (request.stream && iterationCount > 0) {
509-
logger.info('Using streaming for final response after tool calls')
507+
// After all tool processing complete, if streaming was requested, use streaming for the final response
508+
if (request.stream) {
509+
logger.info('Using streaming for final response after tool processing')
510510

511511
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
512512
// This prevents OpenAI API from trying to force tool usage again in the final streaming response

apps/sim/providers/openrouter/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ export const openRouterProvider: ProviderConfig = {
381381
iterationCount++
382382
}
383383

384-
if (request.stream && iterationCount > 0) {
384+
if (request.stream) {
385385
const streamingPayload = {
386386
...payload,
387387
messages: currentMessages,

0 commit comments

Comments
 (0)