Skip to content

Commit 8841e9b

Browse files
fix(subflow-validation): validate subflow fields correctly + surface serialization errors in the logs correctly (#1299)
* fix(subflow-validation): validate subflow fields correctly + surface serialiazation errors in the logs correctly * remove comments
1 parent 3d4b9f0 commit 8841e9b

File tree

4 files changed

+221
-29
lines changed

4 files changed

+221
-29
lines changed

apps/sim/app/api/workflows/[id]/log/route.ts

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,42 +24,48 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
2424
const body = await request.json()
2525
const { logs, executionId, result } = body
2626

27-
// If result is provided, use logging system for full tool call extraction
2827
if (result) {
2928
logger.info(`[${requestId}] Persisting execution result for workflow: ${id}`, {
3029
executionId,
3130
success: result.success,
3231
})
3332

34-
// Check if this execution is from chat using only the explicit source flag
3533
const isChatExecution = result.metadata?.source === 'chat'
3634

37-
// Also log to logging system
3835
const triggerType = isChatExecution ? 'chat' : 'manual'
3936
const loggingSession = new LoggingSession(id, executionId, triggerType, requestId)
4037

38+
const userId = validation.workflow.userId
39+
const workspaceId = validation.workflow.workspaceId || ''
40+
4141
await loggingSession.safeStart({
42-
userId: '', // TODO: Get from session
43-
workspaceId: '', // TODO: Get from workflow
42+
userId,
43+
workspaceId,
4444
variables: {},
4545
})
4646

47-
// Build trace spans from execution logs
48-
const { traceSpans } = buildTraceSpans(result)
49-
50-
await loggingSession.safeComplete({
51-
endedAt: new Date().toISOString(),
52-
totalDurationMs: result.metadata?.duration || 0,
53-
finalOutput: result.output || {},
54-
traceSpans,
55-
})
47+
if (result.success === false) {
48+
const message = result.error || 'Workflow execution failed'
49+
await loggingSession.safeCompleteWithError({
50+
endedAt: new Date().toISOString(),
51+
totalDurationMs: result.metadata?.duration || 0,
52+
error: { message },
53+
})
54+
} else {
55+
const { traceSpans } = buildTraceSpans(result)
56+
await loggingSession.safeComplete({
57+
endedAt: new Date().toISOString(),
58+
totalDurationMs: result.metadata?.duration || 0,
59+
finalOutput: result.output || {},
60+
traceSpans,
61+
})
62+
}
5663

5764
return createSuccessResponse({
5865
message: 'Execution logs persisted successfully',
5966
})
6067
}
6168

62-
// Fall back to the original log format if 'result' isn't provided
6369
if (!logs || !Array.isArray(logs) || logs.length === 0) {
6470
logger.warn(`[${requestId}] No logs provided for workflow: ${id}`)
6571
return createErrorResponse('No logs provided', 400)

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { getBlock } from '@/blocks'
77
import type { BlockOutput } from '@/blocks/types'
88
import { Executor } from '@/executor'
99
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
10-
import { Serializer } from '@/serializer'
10+
import { Serializer, WorkflowValidationError } from '@/serializer'
1111
import type { SerializedWorkflow } from '@/serializer/types'
1212
import { useExecutionStore } from '@/stores/execution/store'
1313
import { useConsoleStore } from '@/stores/panel/console/store'
@@ -16,6 +16,7 @@ import { useEnvironmentStore } from '@/stores/settings/environment/store'
1616
import { useGeneralStore } from '@/stores/settings/general/store'
1717
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
1818
import { mergeSubblockState } from '@/stores/workflows/utils'
19+
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
1920
import { useCurrentWorkflow } from './use-current-workflow'
2021

2122
const logger = createLogger('useWorkflowExecution')
@@ -488,7 +489,7 @@ export function useWorkflowExecution() {
488489
}
489490
return result
490491
} catch (error: any) {
491-
const errorResult = handleExecutionError(error)
492+
const errorResult = handleExecutionError(error, { executionId })
492493
persistLogs(executionId, errorResult).catch((err) => {
493494
logger.error('Error persisting logs:', { error: err })
494495
})
@@ -518,12 +519,7 @@ export function useWorkflowExecution() {
518519
executionId?: string
519520
): Promise<ExecutionResult | StreamingExecution> => {
520521
// Use currentWorkflow but check if we're in diff mode
521-
const {
522-
blocks: workflowBlocks,
523-
edges: workflowEdges,
524-
loops: workflowLoops,
525-
parallels: workflowParallels,
526-
} = currentWorkflow
522+
const { blocks: workflowBlocks, edges: workflowEdges } = currentWorkflow
527523

528524
// Filter out blocks without type (these are layout-only blocks)
529525
const validBlocks = Object.entries(workflowBlocks).reduce(
@@ -646,12 +642,17 @@ export function useWorkflowExecution() {
646642
(edge) => !triggerBlockIds.includes(edge.source) && !triggerBlockIds.includes(edge.target)
647643
)
648644

649-
// Create serialized workflow with filtered blocks and edges
645+
// Derive subflows from the current filtered graph to avoid stale state
646+
const runtimeLoops = generateLoopBlocks(filteredStates)
647+
const runtimeParallels = generateParallelBlocks(filteredStates)
648+
649+
// Create serialized workflow with validation enabled
650650
const workflow = new Serializer().serializeWorkflow(
651651
filteredStates,
652652
filteredEdges,
653-
workflowLoops,
654-
workflowParallels
653+
runtimeLoops,
654+
runtimeParallels,
655+
true
655656
)
656657

657658
// If this is a chat execution, get the selected outputs
@@ -690,7 +691,7 @@ export function useWorkflowExecution() {
690691
return newExecutor.execute(activeWorkflowId || '')
691692
}
692693

693-
const handleExecutionError = (error: any) => {
694+
const handleExecutionError = (error: any, options?: { executionId?: string }) => {
694695
let errorMessage = 'Unknown error'
695696
if (error instanceof Error) {
696697
errorMessage = error.message || `Error: ${String(error)}`
@@ -723,6 +724,36 @@ export function useWorkflowExecution() {
723724
errorMessage = 'API request failed - no specific error details available'
724725
}
725726

727+
// If we failed before creating an executor (e.g., serializer validation), add a console entry
728+
if (!executor) {
729+
try {
730+
// Prefer attributing to specific subflow if we have a structured error
731+
let blockId = 'serialization'
732+
let blockName = 'Serialization'
733+
let blockType = 'serializer'
734+
if (error instanceof WorkflowValidationError) {
735+
blockId = error.blockId || blockId
736+
blockName = error.blockName || blockName
737+
blockType = error.blockType || blockType
738+
}
739+
740+
useConsoleStore.getState().addConsole({
741+
input: {},
742+
output: {},
743+
success: false,
744+
error: errorMessage,
745+
durationMs: 0,
746+
startedAt: new Date().toISOString(),
747+
endedAt: new Date().toISOString(),
748+
workflowId: activeWorkflowId || '',
749+
blockId,
750+
executionId: options?.executionId,
751+
blockName,
752+
blockType,
753+
})
754+
} catch {}
755+
}
756+
726757
const errorResult: ExecutionResult = {
727758
success: false,
728759
output: {},

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,51 @@ export class LoggingSession {
190190
return true
191191
} catch (error) {
192192
if (this.requestId) {
193-
logger.error(`[${this.requestId}] Logging start failed:`, error)
193+
logger.warn(
194+
`[${this.requestId}] Logging start failed - falling back to minimal session:`,
195+
error
196+
)
197+
}
198+
199+
// Fallback: create a minimal logging session without full workflow state
200+
try {
201+
const { userId, workspaceId, variables, triggerData } = params
202+
this.trigger = createTriggerObject(this.triggerType, triggerData)
203+
this.environment = createEnvironmentObject(
204+
this.workflowId,
205+
this.executionId,
206+
userId,
207+
workspaceId,
208+
variables
209+
)
210+
// Minimal workflow state when normalized data is unavailable
211+
this.workflowState = {
212+
blocks: {},
213+
edges: [],
214+
loops: {},
215+
parallels: {},
216+
} as unknown as WorkflowState
217+
218+
await executionLogger.startWorkflowExecution({
219+
workflowId: this.workflowId,
220+
executionId: this.executionId,
221+
trigger: this.trigger,
222+
environment: this.environment,
223+
workflowState: this.workflowState,
224+
})
225+
226+
if (this.requestId) {
227+
logger.debug(
228+
`[${this.requestId}] Started minimal logging for execution ${this.executionId}`
229+
)
230+
}
231+
return true
232+
} catch (fallbackError) {
233+
if (this.requestId) {
234+
logger.error(`[${this.requestId}] Minimal logging start also failed:`, fallbackError)
235+
}
236+
return false
194237
}
195-
return false
196238
}
197239
}
198240

apps/sim/serializer/index.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,21 @@ import { getTool } from '@/tools/utils'
88

99
const logger = createLogger('Serializer')
1010

11+
/**
12+
* Structured validation error for pre-execution workflow validation
13+
*/
14+
export class WorkflowValidationError extends Error {
15+
constructor(
16+
message: string,
17+
public blockId?: string,
18+
public blockType?: string,
19+
public blockName?: string
20+
) {
21+
super(message)
22+
this.name = 'WorkflowValidationError'
23+
}
24+
}
25+
1126
/**
1227
* Helper function to check if a subblock should be included in serialization based on current mode
1328
*/
@@ -29,6 +44,11 @@ export class Serializer {
2944
parallels?: Record<string, Parallel>,
3045
validateRequired = false
3146
): SerializedWorkflow {
47+
// Validate subflow requirements (loops/parallels) before serialization if requested
48+
if (validateRequired) {
49+
this.validateSubflowsBeforeExecution(blocks, loops || {}, parallels || {})
50+
}
51+
3252
return {
3353
version: '1.0',
3454
blocks: Object.values(blocks).map((block) => this.serializeBlock(block, validateRequired)),
@@ -43,6 +63,99 @@ export class Serializer {
4363
}
4464
}
4565

66+
/**
67+
* Validate loop and parallel subflows for required inputs when running in "each/collection" modes
68+
*/
69+
private validateSubflowsBeforeExecution(
70+
blocks: Record<string, BlockState>,
71+
loops: Record<string, Loop>,
72+
parallels: Record<string, Parallel>
73+
): void {
74+
// Validate loops in forEach mode
75+
Object.values(loops || {}).forEach((loop) => {
76+
if (!loop) return
77+
if (loop.loopType === 'forEach') {
78+
const items = (loop as any).forEachItems
79+
80+
const hasNonEmptyCollection = (() => {
81+
if (items === undefined || items === null) return false
82+
if (Array.isArray(items)) return items.length > 0
83+
if (typeof items === 'object') return Object.keys(items).length > 0
84+
if (typeof items === 'string') {
85+
const trimmed = items.trim()
86+
if (trimmed.length === 0) return false
87+
// If it looks like JSON, parse to confirm non-empty [] / {}
88+
if (trimmed.startsWith('[') || trimmed.startsWith('{')) {
89+
try {
90+
const parsed = JSON.parse(trimmed)
91+
if (Array.isArray(parsed)) return parsed.length > 0
92+
if (parsed && typeof parsed === 'object') return Object.keys(parsed).length > 0
93+
} catch {
94+
// Non-JSON or invalid JSON string – allow non-empty string (could be a reference like <start.items>)
95+
return true
96+
}
97+
}
98+
// Non-JSON string – allow (may be a variable reference/expression)
99+
return true
100+
}
101+
return false
102+
})()
103+
104+
if (!hasNonEmptyCollection) {
105+
const blockName = blocks[loop.id]?.name || 'Loop'
106+
const error = new WorkflowValidationError(
107+
`${blockName} requires a collection for forEach mode. Provide a non-empty array/object or a variable reference.`,
108+
loop.id,
109+
'loop',
110+
blockName
111+
)
112+
throw error
113+
}
114+
}
115+
})
116+
117+
// Validate parallels in collection mode
118+
Object.values(parallels || {}).forEach((parallel) => {
119+
if (!parallel) return
120+
if ((parallel as any).parallelType === 'collection') {
121+
const distribution = (parallel as any).distribution
122+
123+
const hasNonEmptyDistribution = (() => {
124+
if (distribution === undefined || distribution === null) return false
125+
if (Array.isArray(distribution)) return distribution.length > 0
126+
if (typeof distribution === 'object') return Object.keys(distribution).length > 0
127+
if (typeof distribution === 'string') {
128+
const trimmed = distribution.trim()
129+
if (trimmed.length === 0) return false
130+
// If it looks like JSON, parse to confirm non-empty [] / {}
131+
if (trimmed.startsWith('[') || trimmed.startsWith('{')) {
132+
try {
133+
const parsed = JSON.parse(trimmed)
134+
if (Array.isArray(parsed)) return parsed.length > 0
135+
if (parsed && typeof parsed === 'object') return Object.keys(parsed).length > 0
136+
} catch {
137+
return true
138+
}
139+
}
140+
return true
141+
}
142+
return false
143+
})()
144+
145+
if (!hasNonEmptyDistribution) {
146+
const blockName = blocks[parallel.id]?.name || 'Parallel'
147+
const error = new WorkflowValidationError(
148+
`${blockName} requires a collection for collection mode. Provide a non-empty array/object or a variable reference.`,
149+
parallel.id,
150+
'parallel',
151+
blockName
152+
)
153+
throw error
154+
}
155+
}
156+
})
157+
}
158+
46159
private serializeBlock(block: BlockState, validateRequired = false): SerializedBlock {
47160
// Special handling for subflow blocks (loops, parallels, etc.)
48161
if (block.type === 'loop' || block.type === 'parallel') {

0 commit comments

Comments
 (0)