11import { db } from '@sim/db'
2- import { chat , userStats , workflow } from '@sim/db/schema'
3- import { eq , sql } from 'drizzle-orm'
2+ import { chat , workflow } from '@sim/db/schema'
3+ import { eq } from 'drizzle-orm'
44import { type NextRequest , NextResponse } from 'next/server'
55import { v4 as uuidv4 } from 'uuid'
66import { checkServerSideUsageLimits } from '@/lib/billing'
@@ -16,7 +16,7 @@ import { TriggerUtils } from '@/lib/workflows/triggers'
1616import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants'
1717import { getBlock } from '@/blocks'
1818import { Executor } from '@/executor'
19- import type { BlockLog , ExecutionResult } from '@/executor/types'
19+ import type { BlockLog , ExecutionResult , StreamingExecution } from '@/executor/types'
2020import { Serializer } from '@/serializer'
2121import { mergeSubblockState } from '@/stores/workflows/server-utils'
2222import type { WorkflowState } from '@/stores/workflows/workflow/types'
@@ -548,6 +548,7 @@ export async function executeWorkflowForChat(
548548 const stream = new ReadableStream ( {
549549 async start ( controller ) {
550550 const encoder = new TextEncoder ( )
551+ let executionResultForLogging : ExecutionResult | null = null
551552
552553 try {
553554 const streamedContent = new Map < string , string > ( )
@@ -603,6 +604,7 @@ export async function executeWorkflowForChat(
603604 endedAt : new Date ( ) . toISOString ( ) ,
604605 totalDurationMs : 0 ,
605606 error : { message : errorMessage } ,
607+ traceSpans : [ ] ,
606608 } )
607609 sessionCompleted = true
608610 }
@@ -644,16 +646,24 @@ export async function executeWorkflowForChat(
644646 // Set up logging on the executor
645647 loggingSession . setupExecutor ( executor )
646648
647- let result
649+ let result : ExecutionResult | StreamingExecution | undefined
648650 try {
649651 result = await executor . execute ( workflowId , startBlockId )
650652 } catch ( error : any ) {
651653 logger . error ( `[${ requestId } ] Chat workflow execution failed:` , error )
652654 if ( ! sessionCompleted ) {
655+ const executionResult = error ?. executionResult || {
656+ success : false ,
657+ output : { } ,
658+ logs : [ ] ,
659+ }
660+ const { traceSpans } = buildTraceSpans ( executionResult )
661+
653662 await loggingSession . safeCompleteWithError ( {
654663 endedAt : new Date ( ) . toISOString ( ) ,
655664 totalDurationMs : 0 ,
656665 error : { message : error . message || 'Chat workflow execution failed' } ,
666+ traceSpans,
657667 } )
658668 sessionCompleted = true
659669 }
@@ -677,31 +687,25 @@ export async function executeWorkflowForChat(
677687 ? ( result . execution as ExecutionResult )
678688 : ( result as ExecutionResult )
679689
680- if ( executionResult ?. logs ) {
681- // Update streamed content and apply tokenization - process regardless of overall success
682- // This ensures partial successes (some agents succeed, some fail) still return results
690+ executionResultForLogging = executionResult
683691
684- // Add newlines between different agent outputs for better readability
692+ if ( executionResult ?. logs ) {
685693 const processedOutputs = new Set < string > ( )
686694 executionResult . logs . forEach ( ( log : BlockLog ) => {
687695 if ( streamedContent . has ( log . blockId ) ) {
688696 const content = streamedContent . get ( log . blockId )
689697 if ( log . output && content ) {
690- // Add newline separation between different outputs (but not before the first one)
691698 const separator = processedOutputs . size > 0 ? '\n\n' : ''
692699 log . output . content = separator + content
693700 processedOutputs . add ( log . blockId )
694701 }
695702 }
696703 } )
697704
698- // Also process non-streamed outputs from selected blocks (like function blocks)
699- // This uses the same logic as the chat panel to ensure identical behavior
700705 const nonStreamingLogs = executionResult . logs . filter (
701706 ( log : BlockLog ) => ! streamedContent . has ( log . blockId )
702707 )
703708
704- // Extract the exact same functions used by the chat panel
705709 const extractBlockIdFromOutputId = ( outputId : string ) : string => {
706710 return outputId . includes ( '_' ) ? outputId . split ( '_' ) [ 0 ] : outputId . split ( '.' ) [ 0 ]
707711 }
@@ -719,21 +723,18 @@ export async function executeWorkflowForChat(
719723 try {
720724 return JSON . parse ( output . content )
721725 } catch ( e ) {
722- // Fallback to original structure if parsing fails
723726 return output
724727 }
725728 }
726729
727730 return output
728731 }
729732
730- // Filter outputs that have matching logs (exactly like chat panel)
731733 const outputsToRender = selectedOutputIds . filter ( ( outputId ) => {
732734 const blockIdForOutput = extractBlockIdFromOutputId ( outputId )
733735 return nonStreamingLogs . some ( ( log ) => log . blockId === blockIdForOutput )
734736 } )
735737
736- // Process each selected output (exactly like chat panel)
737738 for ( const outputId of outputsToRender ) {
738739 const blockIdForOutput = extractBlockIdFromOutputId ( outputId )
739740 const path = extractPathFromOutputId ( outputId , blockIdForOutput )
@@ -743,7 +744,6 @@ export async function executeWorkflowForChat(
743744 let outputValue : any = log . output
744745
745746 if ( path ) {
746- // Parse JSON content safely (exactly like chat panel)
747747 outputValue = parseOutputContentSafely ( outputValue )
748748
749749 const pathParts = path . split ( '.' )
@@ -758,16 +758,13 @@ export async function executeWorkflowForChat(
758758 }
759759
760760 if ( outputValue !== undefined ) {
761- // Add newline separation between different outputs
762761 const separator = processedOutputs . size > 0 ? '\n\n' : ''
763762
764- // Format the output exactly like the chat panel
765763 const formattedOutput =
766764 typeof outputValue === 'string'
767765 ? outputValue
768766 : JSON . stringify ( outputValue , null , 2 )
769767
770- // Update the log content
771768 if ( ! log . output . content ) {
772769 log . output . content = separator + formattedOutput
773770 } else {
@@ -778,7 +775,6 @@ export async function executeWorkflowForChat(
778775 }
779776 }
780777
781- // Process all logs for streaming tokenization
782778 const processedCount = processStreamingBlockLogs ( executionResult . logs , streamedContent )
783779 logger . info ( `Processed ${ processedCount } blocks for streaming tokenization` )
784780
@@ -793,23 +789,6 @@ export async function executeWorkflowForChat(
793789 }
794790 ; ( enrichedResult . metadata as any ) . conversationId = conversationId
795791 }
796- // Use the executionId created at the beginning of this function
797- logger . debug ( `Using execution ID for deployed chat: ${ executionId } ` )
798-
799- if ( executionResult . success ) {
800- try {
801- await db
802- . update ( userStats )
803- . set ( {
804- totalChatExecutions : sql `total_chat_executions + 1` ,
805- lastActive : new Date ( ) ,
806- } )
807- . where ( eq ( userStats . userId , deployment . userId ) )
808- logger . debug ( `Updated user stats for deployed chat: ${ deployment . userId } ` )
809- } catch ( error ) {
810- logger . error ( `Failed to update user stats for deployed chat:` , error )
811- }
812- }
813792 }
814793
815794 if ( ! ( result && typeof result === 'object' && 'stream' in result ) ) {
@@ -833,30 +812,35 @@ export async function executeWorkflowForChat(
833812
834813 controller . close ( )
835814 } catch ( error : any ) {
836- // Handle any errors that occur in the stream
837- logger . error ( `[${ requestId } ] Stream error:` , error )
815+ logger . error ( `[${ requestId } ] Chat execution streaming error:` , error )
838816
839- // Send error event to client
840- const encoder = new TextEncoder ( )
841- controller . enqueue (
842- encoder . encode (
843- `data: ${ JSON . stringify ( {
844- event : 'error' ,
845- error : error . message || 'An unexpected error occurred' ,
846- } ) } \n\n`
847- )
848- )
849-
850- // Try to complete the logging session with error if not already completed
851817 if ( ! sessionCompleted && loggingSession ) {
818+ const executionResult = executionResultForLogging ||
819+ ( error ?. executionResult as ExecutionResult | undefined ) || {
820+ success : false ,
821+ output : { } ,
822+ logs : [ ] ,
823+ }
824+ const { traceSpans } = buildTraceSpans ( executionResult )
825+
852826 await loggingSession . safeCompleteWithError ( {
853827 endedAt : new Date ( ) . toISOString ( ) ,
854828 totalDurationMs : 0 ,
855829 error : { message : error . message || 'Stream processing error' } ,
830+ traceSpans,
856831 } )
857832 sessionCompleted = true
858833 }
859834
835+ controller . enqueue (
836+ encoder . encode (
837+ `data: ${ JSON . stringify ( {
838+ event : 'error' ,
839+ error : error . message || 'Stream processing error' ,
840+ } ) } \n\n`
841+ )
842+ )
843+
860844 controller . close ( )
861845 }
862846 } ,
0 commit comments