Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/sim/executor/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ export const DEFAULTS = {
BLOCK_TITLE: 'Untitled Block',
WORKFLOW_NAME: 'Workflow',
MAX_LOOP_ITERATIONS: 1000,
MAX_FOREACH_ITEMS: 1000,
MAX_PARALLEL_BRANCHES: 20,
MAX_WORKFLOW_DEPTH: 10,
EXECUTION_TIME: 0,
TOKENS: {
Expand Down
43 changes: 43 additions & 0 deletions apps/sim/executor/dag/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { LoopConstructor } from '@/executor/dag/construction/loops'
import { NodeConstructor } from '@/executor/dag/construction/nodes'
import { PathConstructor } from '@/executor/dag/construction/paths'
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
import { buildSentinelStartId, extractBaseBlockId } from '@/executor/utils/subflow-utils'
import type {
SerializedBlock,
SerializedLoop,
Expand Down Expand Up @@ -79,6 +80,9 @@ export class DAGBuilder {
}
}

// Validate loop and parallel structure
this.validateSubflowStructure(dag)

logger.info('DAG built', {
totalNodes: dag.nodes.size,
loopCount: dag.loopConfigs.size,
Expand All @@ -105,4 +109,43 @@ export class DAGBuilder {
}
}
}

/**
* Validates that loops and parallels have proper internal structure.
* Throws an error if a loop/parallel has no blocks inside or no connections from start.
*/
private validateSubflowStructure(dag: DAG): void {
for (const [id, config] of dag.loopConfigs) {
this.validateSubflow(dag, id, config.nodes, 'Loop')
}
for (const [id, config] of dag.parallelConfigs) {
this.validateSubflow(dag, id, config.nodes, 'Parallel')
}
}

private validateSubflow(
dag: DAG,
id: string,
nodes: string[] | undefined,
type: 'Loop' | 'Parallel'
): void {
if (!nodes || nodes.length === 0) {
throw new Error(
`${type} has no blocks inside. Add at least one block to the ${type.toLowerCase()}.`
)
}

const sentinelStartNode = dag.nodes.get(buildSentinelStartId(id))
if (!sentinelStartNode) return

const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) =>
nodes.includes(extractBaseBlockId(edge.target))
)

if (!hasConnections) {
throw new Error(
`${type} start is not connected to any blocks. Connect a block to the ${type.toLowerCase()} start.`
)
}
}
}
2 changes: 2 additions & 0 deletions apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ export class DAGExecutor {

const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
loopOrchestrator.setContextExtensions(this.contextExtensions)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
parallelOrchestrator.setResolver(resolver)
parallelOrchestrator.setContextExtensions(this.contextExtensions)
const allHandlers = createBlockHandlers()
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
const edgeManager = new EdgeManager(dag)
Expand Down
4 changes: 4 additions & 0 deletions apps/sim/executor/execution/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface LoopScope {
condition?: string
loopType?: 'for' | 'forEach' | 'while' | 'doWhile'
skipFirstConditionCheck?: boolean
/** Error message if loop validation failed (e.g., exceeded max iterations) */
validationError?: string
}

export interface ParallelScope {
Expand All @@ -23,6 +25,8 @@ export interface ParallelScope {
completedCount: number
totalExpectedNodes: number
items?: any[]
/** Error message if parallel validation failed (e.g., exceeded max branches) */
validationError?: string
}

export class ExecutionState implements BlockStateController {
Expand Down
163 changes: 107 additions & 56 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
import type { EdgeManager } from '@/executor/execution/edge-manager'
import type { LoopScope } from '@/executor/execution/state'
import type { BlockStateController } from '@/executor/execution/types'
import type { BlockStateController, ContextExtensions } from '@/executor/execution/types'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { LoopConfigWithNodes } from '@/executor/types/loop'
import { replaceValidReferences } from '@/executor/utils/reference-validation'
import {
addSubflowErrorLog,
buildSentinelEndId,
buildSentinelStartId,
extractBaseBlockId,
resolveArrayInput,
validateMaxCount,
} from '@/executor/utils/subflow-utils'
import type { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedLoop } from '@/serializer/types'
Expand All @@ -32,13 +35,18 @@ export interface LoopContinuationResult {

export class LoopOrchestrator {
private edgeManager: EdgeManager | null = null
private contextExtensions: ContextExtensions | null = null

constructor(
private dag: DAG,
private state: BlockStateController,
private resolver: VariableResolver
) {}

setContextExtensions(contextExtensions: ContextExtensions): void {
this.contextExtensions = contextExtensions
}

setEdgeManager(edgeManager: EdgeManager): void {
this.edgeManager = edgeManager
}
Expand All @@ -48,7 +56,6 @@ export class LoopOrchestrator {
if (!loopConfig) {
throw new Error(`Loop config not found: ${loopId}`)
}

const scope: LoopScope = {
iteration: 0,
currentIterationOutputs: new Map(),
Expand All @@ -58,15 +65,70 @@ export class LoopOrchestrator {
const loopType = loopConfig.loopType

switch (loopType) {
case 'for':
case 'for': {
scope.loopType = 'for'
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS

const iterationError = validateMaxCount(
requestedIterations,
DEFAULTS.MAX_LOOP_ITERATIONS,
'For loop iterations'
)
if (iterationError) {
logger.error(iterationError, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
iterations: requestedIterations,
})
scope.maxIterations = 0
scope.validationError = iterationError
scope.condition = buildLoopIndexCondition(0)
ctx.loopExecutions?.set(loopId, scope)
throw new Error(iterationError)
}

scope.maxIterations = requestedIterations
scope.condition = buildLoopIndexCondition(scope.maxIterations)
break
}

case 'forEach': {
scope.loopType = 'forEach'
const items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
let items: any[]
try {
items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
} catch (error) {
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
forEachItems: loopConfig.forEachItems,
})
scope.items = []
scope.maxIterations = 0
scope.validationError = errorMessage
scope.condition = buildLoopIndexCondition(0)
ctx.loopExecutions?.set(loopId, scope)
throw new Error(errorMessage)
}

const sizeError = validateMaxCount(
items.length,
DEFAULTS.MAX_FOREACH_ITEMS,
'ForEach loop collection size'
)
if (sizeError) {
logger.error(sizeError, { loopId, collectionSize: items.length })
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
forEachItems: loopConfig.forEachItems,
collectionSize: items.length,
})
scope.items = []
scope.maxIterations = 0
scope.validationError = sizeError
scope.condition = buildLoopIndexCondition(0)
ctx.loopExecutions?.set(loopId, scope)
throw new Error(sizeError)
}

scope.items = items
scope.maxIterations = items.length
scope.item = items[0]
Expand All @@ -79,15 +141,35 @@ export class LoopOrchestrator {
scope.condition = loopConfig.whileCondition
break

case 'doWhile':
case 'doWhile': {
scope.loopType = 'doWhile'
if (loopConfig.doWhileCondition) {
scope.condition = loopConfig.doWhileCondition
} else {
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS

const iterationError = validateMaxCount(
requestedIterations,
DEFAULTS.MAX_LOOP_ITERATIONS,
'Do-While loop iterations'
)
if (iterationError) {
logger.error(iterationError, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
iterations: requestedIterations,
})
scope.maxIterations = 0
scope.validationError = iterationError
scope.condition = buildLoopIndexCondition(0)
ctx.loopExecutions?.set(loopId, scope)
throw new Error(iterationError)
}

scope.maxIterations = requestedIterations
scope.condition = buildLoopIndexCondition(scope.maxIterations)
}
break
}

default:
throw new Error(`Unknown loop type: ${loopType}`)
Expand All @@ -100,6 +182,23 @@ export class LoopOrchestrator {
return scope
}

private addLoopErrorLog(
ctx: ExecutionContext,
loopId: string,
loopType: string,
errorMessage: string,
inputData?: any
): void {
addSubflowErrorLog(
ctx,
loopId,
'loop',
errorMessage,
{ loopType, ...inputData },
this.contextExtensions
)
}

storeLoopNodeOutput(
ctx: ExecutionContext,
loopId: string,
Expand Down Expand Up @@ -412,54 +511,6 @@ export class LoopOrchestrator {
}

private resolveForEachItems(ctx: ExecutionContext, items: any): any[] {
if (Array.isArray(items)) {
return items
}

if (typeof items === 'object' && items !== null) {
return Object.entries(items)
}

if (typeof items === 'string') {
if (items.startsWith('<') && items.endsWith('>')) {
const resolved = this.resolver.resolveSingleReference(ctx, '', items)
if (Array.isArray(resolved)) {
return resolved
}
return []
}

try {
const normalized = items.replace(/'/g, '"')
const parsed = JSON.parse(normalized)
if (Array.isArray(parsed)) {
return parsed
}
return []
} catch (error) {
logger.error('Failed to parse forEach items', { items, error })
return []
}
}

try {
const resolved = this.resolver.resolveInputs(ctx, 'loop_foreach_items', { items }).items

if (Array.isArray(resolved)) {
return resolved
}

logger.warn('ForEach items did not resolve to array', {
items,
resolved,
})

return []
} catch (error: any) {
logger.error('Error resolving forEach items, returning empty array:', {
error: error.message,
})
return []
}
return resolveArrayInput(ctx, items, this.resolver)
}
}
Loading
Loading