Skip to content

Commit 23afc46

Browse files
committed
cleanup provider code
1 parent 03c0adc commit 23afc46

File tree

20 files changed

+767
-525
lines changed

20 files changed

+767
-525
lines changed

apps/sim/executor/handlers/agent/agent-handler.ts

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,20 @@ export class AgentBlockHandler implements BlockHandler {
6868
filteredInputs
6969
)
7070

71-
await this.persistResponseToMemory(ctx, filteredInputs, result)
71+
if (this.isStreamingExecution(result)) {
72+
if (filteredInputs.memoryType && filteredInputs.memoryType !== 'none') {
73+
return this.wrapStreamForMemoryPersistence(
74+
ctx,
75+
filteredInputs,
76+
result as StreamingExecution
77+
)
78+
}
79+
return result
80+
}
81+
82+
if (filteredInputs.memoryType && filteredInputs.memoryType !== 'none') {
83+
await this.persistResponseToMemory(ctx, filteredInputs, result as BlockOutput)
84+
}
7285

7386
return result
7487
}
@@ -1035,28 +1048,14 @@ export class AgentBlockHandler implements BlockHandler {
10351048
private async handleStreamingResponse(
10361049
response: Response,
10371050
block: SerializedBlock,
1038-
ctx?: ExecutionContext,
1039-
inputs?: AgentInputs
1051+
_ctx?: ExecutionContext,
1052+
_inputs?: AgentInputs
10401053
): Promise<StreamingExecution> {
10411054
const executionDataHeader = response.headers.get('X-Execution-Data')
10421055

10431056
if (executionDataHeader) {
10441057
try {
10451058
const executionData = JSON.parse(executionDataHeader)
1046-
1047-
if (ctx && inputs && executionData.output?.content) {
1048-
const assistantMessage: Message = {
1049-
role: 'assistant',
1050-
content: executionData.output.content,
1051-
}
1052-
1053-
memoryService
1054-
.persistMemoryMessage(ctx, inputs, assistantMessage)
1055-
.catch((error) =>
1056-
logger.error('Failed to persist streaming response to memory:', error)
1057-
)
1058-
}
1059-
10601059
return {
10611060
stream: response.body!,
10621061
execution: {
@@ -1156,45 +1155,40 @@ export class AgentBlockHandler implements BlockHandler {
11561155
}
11571156
}
11581157

1158+
private wrapStreamForMemoryPersistence(
1159+
ctx: ExecutionContext,
1160+
inputs: AgentInputs,
1161+
streamingExec: StreamingExecution
1162+
): StreamingExecution {
1163+
return {
1164+
stream: memoryService.wrapStreamForPersistence(
1165+
streamingExec.stream,
1166+
ctx,
1167+
inputs,
1168+
streamingExec.execution?.output
1169+
),
1170+
execution: streamingExec.execution,
1171+
}
1172+
}
1173+
11591174
private async persistResponseToMemory(
11601175
ctx: ExecutionContext,
11611176
inputs: AgentInputs,
1162-
result: BlockOutput | StreamingExecution
1177+
result: BlockOutput
11631178
): Promise<void> {
1164-
// Only persist if memoryType is configured
1165-
if (!inputs.memoryType || inputs.memoryType === 'none') {
1179+
const content = (result as any)?.content
1180+
if (!content || typeof content !== 'string') {
11661181
return
11671182
}
11681183

11691184
try {
1170-
// Don't persist streaming responses here - they're handled separately
1171-
if (this.isStreamingExecution(result)) {
1172-
return
1173-
}
1174-
1175-
// Extract content from regular response
1176-
const blockOutput = result as any
1177-
const content = blockOutput?.content
1178-
1179-
if (!content || typeof content !== 'string') {
1180-
return
1181-
}
1182-
1183-
const assistantMessage: Message = {
1184-
role: 'assistant',
1185-
content,
1186-
}
1187-
1188-
await memoryService.persistMemoryMessage(ctx, inputs, assistantMessage)
1189-
1185+
await memoryService.persistMemoryMessage(ctx, inputs, { role: 'assistant', content })
11901186
logger.debug('Persisted assistant response to memory', {
11911187
workflowId: ctx.workflowId,
1192-
memoryType: inputs.memoryType,
11931188
conversationId: inputs.conversationId,
11941189
})
11951190
} catch (error) {
11961191
logger.error('Failed to persist response to memory:', error)
1197-
// Don't throw - memory persistence failure shouldn't break workflow execution
11981192
}
11991193
}
12001194

apps/sim/executor/handlers/agent/memory.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,47 @@ export class Memory {
652652
}
653653
}
654654
}
655+
656+
/**
657+
* Wraps a streaming response to persist the assistant message when complete.
658+
* Works model-agnostically by accumulating raw text chunks.
659+
*/
660+
wrapStreamForPersistence(
661+
stream: ReadableStream<Uint8Array>,
662+
ctx: ExecutionContext,
663+
inputs: AgentInputs,
664+
executionOutput?: { content?: string }
665+
): ReadableStream<Uint8Array> {
666+
let accumulatedContent = ''
667+
const decoder = new TextDecoder()
668+
669+
const transformStream = new TransformStream<Uint8Array, Uint8Array>({
670+
transform: (chunk, controller) => {
671+
controller.enqueue(chunk)
672+
accumulatedContent += decoder.decode(chunk, { stream: true })
673+
},
674+
675+
flush: () => {
676+
const finalContent = executionOutput?.content || accumulatedContent
677+
678+
if (finalContent?.trim()) {
679+
this.persistMemoryMessage(ctx, inputs, { role: 'assistant', content: finalContent })
680+
.then(() => {
681+
logger.debug('Persisted streaming response to memory', {
682+
workflowId: ctx.workflowId,
683+
conversationId: inputs.conversationId,
684+
contentLength: finalContent.length,
685+
})
686+
})
687+
.catch((error) => {
688+
logger.error('Failed to persist streaming response to memory:', error)
689+
})
690+
}
691+
},
692+
})
693+
694+
return stream.pipeThrough(transformStream)
695+
}
655696
}
656697

657698
export const memoryService = new Memory()

apps/sim/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
"@browserbasehq/stagehand": "^3.0.5",
3838
"@cerebras/cerebras_cloud_sdk": "^1.23.0",
3939
"@e2b/code-interpreter": "^2.0.0",
40+
"@google/genai": "1.34.0",
4041
"@hookform/resolvers": "^4.1.3",
4142
"@opentelemetry/api": "^1.9.0",
4243
"@opentelemetry/exporter-jaeger": "2.1.0",

apps/sim/providers/anthropic/index.ts

Lines changed: 90 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ import type {
1414
ProviderResponse,
1515
TimeSegment,
1616
} from '@/providers/types'
17-
import { prepareToolExecution, prepareToolsWithUsageControl } from '@/providers/utils'
17+
import {
18+
calculateCost,
19+
prepareToolExecution,
20+
prepareToolsWithUsageControl,
21+
} from '@/providers/utils'
1822
import { executeTool } from '@/tools'
1923

2024
const logger = createLogger('AnthropicProvider')
@@ -255,28 +259,49 @@ ${fieldDescriptions}
255259
const providerStartTime = Date.now()
256260
const providerStartTimeISO = new Date(providerStartTime).toISOString()
257261

258-
// Create a streaming request
259262
const streamResponse: any = await anthropic.messages.create({
260263
...payload,
261264
stream: true,
262265
})
263266

264-
// Start collecting token usage
265-
const tokenUsage = {
266-
prompt: 0,
267-
completion: 0,
268-
total: 0,
269-
}
270-
271-
// Create a StreamingExecution response with a readable stream
272267
const streamingResult = {
273-
stream: createReadableStreamFromAnthropicStream(streamResponse),
268+
stream: createReadableStreamFromAnthropicStream(streamResponse, (content, usage) => {
269+
streamingResult.execution.output.content = content
270+
streamingResult.execution.output.tokens = {
271+
prompt: usage.input_tokens,
272+
completion: usage.output_tokens,
273+
total: usage.input_tokens + usage.output_tokens,
274+
}
275+
276+
const costResult = calculateCost(request.model, usage.input_tokens, usage.output_tokens)
277+
streamingResult.execution.output.cost = {
278+
input: costResult.input,
279+
output: costResult.output,
280+
total: costResult.total,
281+
}
282+
283+
const streamEndTime = Date.now()
284+
const streamEndTimeISO = new Date(streamEndTime).toISOString()
285+
286+
if (streamingResult.execution.output.providerTiming) {
287+
streamingResult.execution.output.providerTiming.endTime = streamEndTimeISO
288+
streamingResult.execution.output.providerTiming.duration =
289+
streamEndTime - providerStartTime
290+
291+
if (streamingResult.execution.output.providerTiming.timeSegments?.[0]) {
292+
streamingResult.execution.output.providerTiming.timeSegments[0].endTime =
293+
streamEndTime
294+
streamingResult.execution.output.providerTiming.timeSegments[0].duration =
295+
streamEndTime - providerStartTime
296+
}
297+
}
298+
}),
274299
execution: {
275300
success: true,
276301
output: {
277-
content: '', // Will be filled by streaming content in chat component
302+
content: '',
278303
model: request.model,
279-
tokens: tokenUsage,
304+
tokens: { prompt: 0, completion: 0, total: 0 },
280305
toolCalls: undefined,
281306
providerTiming: {
282307
startTime: providerStartTimeISO,
@@ -292,14 +317,13 @@ ${fieldDescriptions}
292317
},
293318
],
294319
},
295-
// Estimate token cost based on typical Claude pricing
296320
cost: {
297321
total: 0.0,
298322
input: 0.0,
299323
output: 0.0,
300324
},
301325
},
302-
logs: [], // No block logs for direct streaming
326+
logs: [],
303327
metadata: {
304328
startTime: providerStartTimeISO,
305329
endTime: new Date().toISOString(),
@@ -309,7 +333,6 @@ ${fieldDescriptions}
309333
},
310334
}
311335

312-
// Return the streaming execution object
313336
return streamingResult as StreamingExecution
314337
}
315338

@@ -688,6 +711,17 @@ ${fieldDescriptions}
688711
(currentResponse.usage?.input_tokens || 0) + (currentResponse.usage?.output_tokens || 0),
689712
}
690713

714+
const initialCost = calculateCost(
715+
request.model,
716+
currentResponse.usage?.input_tokens || 0,
717+
currentResponse.usage?.output_tokens || 0
718+
)
719+
const cost = {
720+
input: initialCost.input,
721+
output: initialCost.output,
722+
total: initialCost.total,
723+
}
724+
691725
const toolCalls = []
692726
const toolResults = []
693727
const currentMessages = [...messages]
@@ -899,12 +933,21 @@ ${fieldDescriptions}
899933
content = textContent
900934
}
901935

902-
// Update token counts
936+
// Update token counts and cost
903937
if (currentResponse.usage) {
904938
tokens.prompt += currentResponse.usage.input_tokens || 0
905939
tokens.completion += currentResponse.usage.output_tokens || 0
906940
tokens.total +=
907941
(currentResponse.usage.input_tokens || 0) + (currentResponse.usage.output_tokens || 0)
942+
943+
const iterationCost = calculateCost(
944+
request.model,
945+
currentResponse.usage.input_tokens || 0,
946+
currentResponse.usage.output_tokens || 0
947+
)
948+
cost.input += iterationCost.input
949+
cost.output += iterationCost.output
950+
cost.total += iterationCost.total
908951
}
909952

910953
iterationCount++
@@ -931,31 +974,47 @@ ${fieldDescriptions}
931974
const providerEndTimeISO = new Date(providerEndTime).toISOString()
932975
const totalDuration = providerEndTime - providerStartTime
933976

934-
// After all tool processing complete, if streaming was requested, use streaming for the final response
935977
if (request.stream) {
936978
logger.info('Using streaming for final Anthropic response after tool processing')
937979

938-
// When streaming after tool calls with forced tools, make sure tool_choice is removed
939-
// This prevents the API from trying to force tool usage again in the final streaming response
940980
const streamingPayload = {
941981
...payload,
942982
messages: currentMessages,
943-
// For Anthropic, omit tool_choice entirely rather than setting it to 'none'
944983
stream: true,
984+
tool_choice: undefined,
945985
}
946986

947-
// Remove the tool_choice parameter as Anthropic doesn't accept 'none' as a string value
948-
streamingPayload.tool_choice = undefined
949-
950987
const streamResponse: any = await anthropic.messages.create(streamingPayload)
951988

952-
// Create a StreamingExecution response with all collected data
953989
const streamingResult = {
954-
stream: createReadableStreamFromAnthropicStream(streamResponse),
990+
stream: createReadableStreamFromAnthropicStream(streamResponse, (content, usage) => {
991+
streamingResult.execution.output.content = content
992+
streamingResult.execution.output.tokens = {
993+
prompt: tokens.prompt + usage.input_tokens,
994+
completion: tokens.completion + usage.output_tokens,
995+
total: tokens.total + usage.input_tokens + usage.output_tokens,
996+
}
997+
998+
const streamCost = calculateCost(request.model, usage.input_tokens, usage.output_tokens)
999+
streamingResult.execution.output.cost = {
1000+
input: cost.input + streamCost.input,
1001+
output: cost.output + streamCost.output,
1002+
total: cost.total + streamCost.total,
1003+
}
1004+
1005+
const streamEndTime = Date.now()
1006+
const streamEndTimeISO = new Date(streamEndTime).toISOString()
1007+
1008+
if (streamingResult.execution.output.providerTiming) {
1009+
streamingResult.execution.output.providerTiming.endTime = streamEndTimeISO
1010+
streamingResult.execution.output.providerTiming.duration =
1011+
streamEndTime - providerStartTime
1012+
}
1013+
}),
9551014
execution: {
9561015
success: true,
9571016
output: {
958-
content: '', // Will be filled by the callback
1017+
content: '',
9591018
model: request.model || 'claude-3-7-sonnet-20250219',
9601019
tokens: {
9611020
prompt: tokens.prompt,
@@ -980,12 +1039,12 @@ ${fieldDescriptions}
9801039
timeSegments: timeSegments,
9811040
},
9821041
cost: {
983-
total: (tokens.total || 0) * 0.0001, // Estimate cost based on tokens
984-
input: (tokens.prompt || 0) * 0.0001,
985-
output: (tokens.completion || 0) * 0.0001,
1042+
input: cost.input,
1043+
output: cost.output,
1044+
total: cost.total,
9861045
},
9871046
},
988-
logs: [], // No block logs at provider level
1047+
logs: [],
9891048
metadata: {
9901049
startTime: providerStartTimeISO,
9911050
endTime: new Date().toISOString(),

0 commit comments

Comments
 (0)