Skip to content

Commit fdac431

Browse files
authored
fix(chat): update stream to respect all output select objects (#2729)
1 parent a54fcbc commit fdac431

File tree

2 files changed

+72
-16
lines changed

2 files changed

+72
-16
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/chat.tsx

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ export function Chat() {
473473
/**
474474
* Processes streaming response from workflow execution
475475
* Reads the stream chunk by chunk and updates the message content in real-time
476+
* When the final event arrives, extracts any additional selected outputs (model, tokens, toolCalls)
476477
* @param stream - ReadableStream containing the workflow execution response
477478
* @param responseMessageId - ID of the message to update with streamed content
478479
*/
@@ -529,6 +530,35 @@ export function Chat() {
529530
return
530531
}
531532

533+
if (
534+
selectedOutputs.length > 0 &&
535+
'logs' in result &&
536+
Array.isArray(result.logs) &&
537+
activeWorkflowId
538+
) {
539+
const additionalOutputs: string[] = []
540+
541+
for (const outputId of selectedOutputs) {
542+
const blockId = extractBlockIdFromOutputId(outputId)
543+
const path = extractPathFromOutputId(outputId, blockId)
544+
545+
if (path === 'content') continue
546+
547+
const outputValue = extractOutputFromLogs(result.logs as BlockLog[], outputId)
548+
if (outputValue !== undefined) {
549+
const formattedValue =
550+
typeof outputValue === 'string' ? outputValue : JSON.stringify(outputValue)
551+
if (formattedValue) {
552+
additionalOutputs.push(`**${path}:** ${formattedValue}`)
553+
}
554+
}
555+
}
556+
557+
if (additionalOutputs.length > 0) {
558+
appendMessageContent(responseMessageId, `\n\n${additionalOutputs.join('\n\n')}`)
559+
}
560+
}
561+
532562
finalizeMessageStream(responseMessageId)
533563
} else if (contentChunk) {
534564
accumulatedContent += contentChunk
@@ -552,7 +582,7 @@ export function Chat() {
552582
focusInput(100)
553583
}
554584
},
555-
[appendMessageContent, finalizeMessageStream, focusInput]
585+
[appendMessageContent, finalizeMessageStream, focusInput, selectedOutputs, activeWorkflowId]
556586
)
557587

558588
/**
@@ -564,7 +594,6 @@ export function Chat() {
564594
if (!result || !activeWorkflowId) return
565595
if (typeof result !== 'object') return
566596

567-
// Handle streaming response
568597
if ('stream' in result && result.stream instanceof ReadableStream) {
569598
const responseMessageId = crypto.randomUUID()
570599
addMessage({
@@ -578,7 +607,6 @@ export function Chat() {
578607
return
579608
}
580609

581-
// Handle success with logs
582610
if ('success' in result && result.success && 'logs' in result && Array.isArray(result.logs)) {
583611
selectedOutputs
584612
.map((outputId) => extractOutputFromLogs(result.logs as BlockLog[], outputId))
@@ -596,7 +624,6 @@ export function Chat() {
596624
return
597625
}
598626

599-
// Handle error response
600627
if ('success' in result && !result.success) {
601628
const errorMessage =
602629
'error' in result && typeof result.error === 'string'
@@ -622,7 +649,6 @@ export function Chat() {
622649

623650
const sentMessage = chatMessage.trim()
624651

625-
// Update prompt history (only if new unique message)
626652
if (sentMessage && promptHistory[promptHistory.length - 1] !== sentMessage) {
627653
setPromptHistory((prev) => [...prev, sentMessage])
628654
}
@@ -631,10 +657,8 @@ export function Chat() {
631657
const conversationId = getConversationId(activeWorkflowId)
632658

633659
try {
634-
// Process file attachments
635660
const attachmentsWithData = await processFileAttachments(chatFiles)
636661

637-
// Add user message
638662
const messageContent =
639663
sentMessage || (chatFiles.length > 0 ? `Uploaded ${chatFiles.length} file(s)` : '')
640664
addMessage({
@@ -644,7 +668,6 @@ export function Chat() {
644668
attachments: attachmentsWithData,
645669
})
646670

647-
// Prepare workflow input
648671
const workflowInput: {
649672
input: string
650673
conversationId: string
@@ -667,13 +690,11 @@ export function Chat() {
667690
}
668691
}
669692

670-
// Clear input and files
671693
setChatMessage('')
672694
clearFiles()
673695
clearErrors()
674696
focusInput(10)
675697

676-
// Execute workflow
677698
const result = await handleRunWorkflow(workflowInput)
678699
handleWorkflowResponse(result)
679700
} catch (error) {

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,7 @@ export function useWorkflowExecution() {
885885

886886
const activeBlocksSet = new Set<string>()
887887
const streamedContent = new Map<string, string>()
888+
const accumulatedBlockLogs: BlockLog[] = []
888889

889890
// Execute the workflow
890891
try {
@@ -933,14 +934,30 @@ export function useWorkflowExecution() {
933934

934935
// Edges already tracked in onBlockStarted, no need to track again
935936

937+
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
938+
const endedAt = new Date().toISOString()
939+
940+
// Accumulate block log for the execution result
941+
accumulatedBlockLogs.push({
942+
blockId: data.blockId,
943+
blockName: data.blockName || 'Unknown Block',
944+
blockType: data.blockType || 'unknown',
945+
input: data.input || {},
946+
output: data.output,
947+
success: true,
948+
durationMs: data.durationMs,
949+
startedAt,
950+
endedAt,
951+
})
952+
936953
// Add to console
937954
addConsole({
938955
input: data.input || {},
939956
output: data.output,
940957
success: true,
941958
durationMs: data.durationMs,
942-
startedAt: new Date(Date.now() - data.durationMs).toISOString(),
943-
endedAt: new Date().toISOString(),
959+
startedAt,
960+
endedAt,
944961
workflowId: activeWorkflowId,
945962
blockId: data.blockId,
946963
executionId: executionId || uuidv4(),
@@ -967,15 +984,33 @@ export function useWorkflowExecution() {
967984

968985
// Track failed block execution in run path
969986
setBlockRunStatus(data.blockId, 'error')
987+
988+
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
989+
const endedAt = new Date().toISOString()
990+
991+
// Accumulate block error log for the execution result
992+
accumulatedBlockLogs.push({
993+
blockId: data.blockId,
994+
blockName: data.blockName || 'Unknown Block',
995+
blockType: data.blockType || 'unknown',
996+
input: data.input || {},
997+
output: {},
998+
success: false,
999+
error: data.error,
1000+
durationMs: data.durationMs,
1001+
startedAt,
1002+
endedAt,
1003+
})
1004+
9701005
// Add error to console
9711006
addConsole({
9721007
input: data.input || {},
9731008
output: {},
9741009
success: false,
9751010
error: data.error,
9761011
durationMs: data.durationMs,
977-
startedAt: new Date(Date.now() - data.durationMs).toISOString(),
978-
endedAt: new Date().toISOString(),
1012+
startedAt,
1013+
endedAt,
9791014
workflowId: activeWorkflowId,
9801015
blockId: data.blockId,
9811016
executionId: executionId || uuidv4(),
@@ -1029,7 +1064,7 @@ export function useWorkflowExecution() {
10291064
startTime: data.startTime,
10301065
endTime: data.endTime,
10311066
},
1032-
logs: [],
1067+
logs: accumulatedBlockLogs,
10331068
}
10341069
},
10351070

@@ -1041,7 +1076,7 @@ export function useWorkflowExecution() {
10411076
metadata: {
10421077
duration: data.duration,
10431078
},
1044-
logs: [],
1079+
logs: accumulatedBlockLogs,
10451080
}
10461081

10471082
// Only add workflow-level error if no blocks have executed yet

0 commit comments

Comments
 (0)