Skip to content

Commit c252e88

Browse files
Pbonmars-20031006priyanshu.solankiSg312icecrasher321
authored
improvement(logs): fixed logs for parallel and loop execution flow (#2468)
* fixed logs for parallel and loop execution flow * Fix array check for collection * fixed for empty loop and paralle blocks and showing input on dashboard * extracted utility functions * fixed the refrencing errors and making sure it propogates to the console * fix parallel * fix tests' --------- Co-authored-by: priyanshu.solanki <[email protected]> Co-authored-by: Siddharth Ganesan <[email protected]> Co-authored-by: Vikhyath Mondreti <[email protected]>
1 parent b0748c8 commit c252e88

File tree

10 files changed

+377
-114
lines changed

10 files changed

+377
-114
lines changed

apps/sim/executor/constants.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ export const DEFAULTS = {
128128
BLOCK_TITLE: 'Untitled Block',
129129
WORKFLOW_NAME: 'Workflow',
130130
MAX_LOOP_ITERATIONS: 1000,
131+
MAX_FOREACH_ITEMS: 1000,
132+
MAX_PARALLEL_BRANCHES: 20,
131133
MAX_WORKFLOW_DEPTH: 10,
132134
EXECUTION_TIME: 0,
133135
TOKENS: {

apps/sim/executor/dag/builder.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { LoopConstructor } from '@/executor/dag/construction/loops'
44
import { NodeConstructor } from '@/executor/dag/construction/nodes'
55
import { PathConstructor } from '@/executor/dag/construction/paths'
66
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
7+
import { buildSentinelStartId, extractBaseBlockId } from '@/executor/utils/subflow-utils'
78
import type {
89
SerializedBlock,
910
SerializedLoop,
@@ -79,6 +80,9 @@ export class DAGBuilder {
7980
}
8081
}
8182

83+
// Validate loop and parallel structure
84+
this.validateSubflowStructure(dag)
85+
8286
logger.info('DAG built', {
8387
totalNodes: dag.nodes.size,
8488
loopCount: dag.loopConfigs.size,
@@ -105,4 +109,43 @@ export class DAGBuilder {
105109
}
106110
}
107111
}
112+
113+
/**
114+
* Validates that loops and parallels have proper internal structure.
115+
* Throws an error if a loop/parallel has no blocks inside or no connections from start.
116+
*/
117+
private validateSubflowStructure(dag: DAG): void {
118+
for (const [id, config] of dag.loopConfigs) {
119+
this.validateSubflow(dag, id, config.nodes, 'Loop')
120+
}
121+
for (const [id, config] of dag.parallelConfigs) {
122+
this.validateSubflow(dag, id, config.nodes, 'Parallel')
123+
}
124+
}
125+
126+
private validateSubflow(
127+
dag: DAG,
128+
id: string,
129+
nodes: string[] | undefined,
130+
type: 'Loop' | 'Parallel'
131+
): void {
132+
if (!nodes || nodes.length === 0) {
133+
throw new Error(
134+
`${type} has no blocks inside. Add at least one block to the ${type.toLowerCase()}.`
135+
)
136+
}
137+
138+
const sentinelStartNode = dag.nodes.get(buildSentinelStartId(id))
139+
if (!sentinelStartNode) return
140+
141+
const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) =>
142+
nodes.includes(extractBaseBlockId(edge.target))
143+
)
144+
145+
if (!hasConnections) {
146+
throw new Error(
147+
`${type} start is not connected to any blocks. Connect a block to the ${type.toLowerCase()} start.`
148+
)
149+
}
150+
}
108151
}

apps/sim/executor/execution/executor.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,10 @@ export class DAGExecutor {
6363

6464
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
6565
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
66+
loopOrchestrator.setContextExtensions(this.contextExtensions)
6667
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
6768
parallelOrchestrator.setResolver(resolver)
69+
parallelOrchestrator.setContextExtensions(this.contextExtensions)
6870
const allHandlers = createBlockHandlers()
6971
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
7072
const edgeManager = new EdgeManager(dag)

apps/sim/executor/execution/state.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ export interface LoopScope {
1414
condition?: string
1515
loopType?: 'for' | 'forEach' | 'while' | 'doWhile'
1616
skipFirstConditionCheck?: boolean
17+
/** Error message if loop validation failed (e.g., exceeded max iterations) */
18+
validationError?: string
1719
}
1820

1921
export interface ParallelScope {
@@ -23,6 +25,8 @@ export interface ParallelScope {
2325
completedCount: number
2426
totalExpectedNodes: number
2527
items?: any[]
28+
/** Error message if parallel validation failed (e.g., exceeded max branches) */
29+
validationError?: string
2630
}
2731

2832
export class ExecutionState implements BlockStateController {

apps/sim/executor/orchestrators/loop.ts

Lines changed: 107 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@ import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
55
import type { DAG } from '@/executor/dag/builder'
66
import type { EdgeManager } from '@/executor/execution/edge-manager'
77
import type { LoopScope } from '@/executor/execution/state'
8-
import type { BlockStateController } from '@/executor/execution/types'
8+
import type { BlockStateController, ContextExtensions } from '@/executor/execution/types'
99
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
1010
import type { LoopConfigWithNodes } from '@/executor/types/loop'
1111
import { replaceValidReferences } from '@/executor/utils/reference-validation'
1212
import {
13+
addSubflowErrorLog,
1314
buildSentinelEndId,
1415
buildSentinelStartId,
1516
extractBaseBlockId,
17+
resolveArrayInput,
18+
validateMaxCount,
1619
} from '@/executor/utils/subflow-utils'
1720
import type { VariableResolver } from '@/executor/variables/resolver'
1821
import type { SerializedLoop } from '@/serializer/types'
@@ -32,13 +35,18 @@ export interface LoopContinuationResult {
3235

3336
export class LoopOrchestrator {
3437
private edgeManager: EdgeManager | null = null
38+
private contextExtensions: ContextExtensions | null = null
3539

3640
constructor(
3741
private dag: DAG,
3842
private state: BlockStateController,
3943
private resolver: VariableResolver
4044
) {}
4145

46+
setContextExtensions(contextExtensions: ContextExtensions): void {
47+
this.contextExtensions = contextExtensions
48+
}
49+
4250
setEdgeManager(edgeManager: EdgeManager): void {
4351
this.edgeManager = edgeManager
4452
}
@@ -48,7 +56,6 @@ export class LoopOrchestrator {
4856
if (!loopConfig) {
4957
throw new Error(`Loop config not found: ${loopId}`)
5058
}
51-
5259
const scope: LoopScope = {
5360
iteration: 0,
5461
currentIterationOutputs: new Map(),
@@ -58,15 +65,70 @@ export class LoopOrchestrator {
5865
const loopType = loopConfig.loopType
5966

6067
switch (loopType) {
61-
case 'for':
68+
case 'for': {
6269
scope.loopType = 'for'
63-
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
70+
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
71+
72+
const iterationError = validateMaxCount(
73+
requestedIterations,
74+
DEFAULTS.MAX_LOOP_ITERATIONS,
75+
'For loop iterations'
76+
)
77+
if (iterationError) {
78+
logger.error(iterationError, { loopId, requestedIterations })
79+
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
80+
iterations: requestedIterations,
81+
})
82+
scope.maxIterations = 0
83+
scope.validationError = iterationError
84+
scope.condition = buildLoopIndexCondition(0)
85+
ctx.loopExecutions?.set(loopId, scope)
86+
throw new Error(iterationError)
87+
}
88+
89+
scope.maxIterations = requestedIterations
6490
scope.condition = buildLoopIndexCondition(scope.maxIterations)
6591
break
92+
}
6693

6794
case 'forEach': {
6895
scope.loopType = 'forEach'
69-
const items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
96+
let items: any[]
97+
try {
98+
items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
99+
} catch (error) {
100+
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
101+
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
102+
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
103+
forEachItems: loopConfig.forEachItems,
104+
})
105+
scope.items = []
106+
scope.maxIterations = 0
107+
scope.validationError = errorMessage
108+
scope.condition = buildLoopIndexCondition(0)
109+
ctx.loopExecutions?.set(loopId, scope)
110+
throw new Error(errorMessage)
111+
}
112+
113+
const sizeError = validateMaxCount(
114+
items.length,
115+
DEFAULTS.MAX_FOREACH_ITEMS,
116+
'ForEach loop collection size'
117+
)
118+
if (sizeError) {
119+
logger.error(sizeError, { loopId, collectionSize: items.length })
120+
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
121+
forEachItems: loopConfig.forEachItems,
122+
collectionSize: items.length,
123+
})
124+
scope.items = []
125+
scope.maxIterations = 0
126+
scope.validationError = sizeError
127+
scope.condition = buildLoopIndexCondition(0)
128+
ctx.loopExecutions?.set(loopId, scope)
129+
throw new Error(sizeError)
130+
}
131+
70132
scope.items = items
71133
scope.maxIterations = items.length
72134
scope.item = items[0]
@@ -79,15 +141,35 @@ export class LoopOrchestrator {
79141
scope.condition = loopConfig.whileCondition
80142
break
81143

82-
case 'doWhile':
144+
case 'doWhile': {
83145
scope.loopType = 'doWhile'
84146
if (loopConfig.doWhileCondition) {
85147
scope.condition = loopConfig.doWhileCondition
86148
} else {
87-
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
149+
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
150+
151+
const iterationError = validateMaxCount(
152+
requestedIterations,
153+
DEFAULTS.MAX_LOOP_ITERATIONS,
154+
'Do-While loop iterations'
155+
)
156+
if (iterationError) {
157+
logger.error(iterationError, { loopId, requestedIterations })
158+
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
159+
iterations: requestedIterations,
160+
})
161+
scope.maxIterations = 0
162+
scope.validationError = iterationError
163+
scope.condition = buildLoopIndexCondition(0)
164+
ctx.loopExecutions?.set(loopId, scope)
165+
throw new Error(iterationError)
166+
}
167+
168+
scope.maxIterations = requestedIterations
88169
scope.condition = buildLoopIndexCondition(scope.maxIterations)
89170
}
90171
break
172+
}
91173

92174
default:
93175
throw new Error(`Unknown loop type: ${loopType}`)
@@ -100,6 +182,23 @@ export class LoopOrchestrator {
100182
return scope
101183
}
102184

185+
private addLoopErrorLog(
186+
ctx: ExecutionContext,
187+
loopId: string,
188+
loopType: string,
189+
errorMessage: string,
190+
inputData?: any
191+
): void {
192+
addSubflowErrorLog(
193+
ctx,
194+
loopId,
195+
'loop',
196+
errorMessage,
197+
{ loopType, ...inputData },
198+
this.contextExtensions
199+
)
200+
}
201+
103202
storeLoopNodeOutput(
104203
ctx: ExecutionContext,
105204
loopId: string,
@@ -412,54 +511,6 @@ export class LoopOrchestrator {
412511
}
413512

414513
private resolveForEachItems(ctx: ExecutionContext, items: any): any[] {
415-
if (Array.isArray(items)) {
416-
return items
417-
}
418-
419-
if (typeof items === 'object' && items !== null) {
420-
return Object.entries(items)
421-
}
422-
423-
if (typeof items === 'string') {
424-
if (items.startsWith('<') && items.endsWith('>')) {
425-
const resolved = this.resolver.resolveSingleReference(ctx, '', items)
426-
if (Array.isArray(resolved)) {
427-
return resolved
428-
}
429-
return []
430-
}
431-
432-
try {
433-
const normalized = items.replace(/'/g, '"')
434-
const parsed = JSON.parse(normalized)
435-
if (Array.isArray(parsed)) {
436-
return parsed
437-
}
438-
return []
439-
} catch (error) {
440-
logger.error('Failed to parse forEach items', { items, error })
441-
return []
442-
}
443-
}
444-
445-
try {
446-
const resolved = this.resolver.resolveInputs(ctx, 'loop_foreach_items', { items }).items
447-
448-
if (Array.isArray(resolved)) {
449-
return resolved
450-
}
451-
452-
logger.warn('ForEach items did not resolve to array', {
453-
items,
454-
resolved,
455-
})
456-
457-
return []
458-
} catch (error: any) {
459-
logger.error('Error resolving forEach items, returning empty array:', {
460-
error: error.message,
461-
})
462-
return []
463-
}
514+
return resolveArrayInput(ctx, items, this.resolver)
464515
}
465516
}

0 commit comments

Comments
 (0)