Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/sim/executor/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ export const DEFAULTS = {
BLOCK_TITLE: 'Untitled Block',
WORKFLOW_NAME: 'Workflow',
MAX_LOOP_ITERATIONS: 1000,
MAX_FOREACH_ITEMS: 1000,
MAX_PARALLEL_BRANCHES: 20,
MAX_WORKFLOW_DEPTH: 10,
EXECUTION_TIME: 0,
TOKENS: {
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ export class DAGExecutor {

const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
loopOrchestrator.setContextExtensions(this.contextExtensions)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
parallelOrchestrator.setResolver(resolver)
parallelOrchestrator.setContextExtensions(this.contextExtensions)
const allHandlers = createBlockHandlers()
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
const edgeManager = new EdgeManager(dag)
Expand Down
4 changes: 4 additions & 0 deletions apps/sim/executor/execution/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface LoopScope {
condition?: string
loopType?: 'for' | 'forEach' | 'while' | 'doWhile'
skipFirstConditionCheck?: boolean
/** Error message if loop validation failed (e.g., exceeded max iterations) */
validationError?: string
}

export interface ParallelScope {
Expand All @@ -23,6 +25,8 @@ export interface ParallelScope {
completedCount: number
totalExpectedNodes: number
items?: any[]
/** Error message if parallel validation failed (e.g., exceeded max branches) */
validationError?: string
}

export class ExecutionState implements BlockStateController {
Expand Down
113 changes: 104 additions & 9 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
import type { EdgeManager } from '@/executor/execution/edge-manager'
import type { LoopScope } from '@/executor/execution/state'
import type { BlockStateController } from '@/executor/execution/types'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { BlockStateController, ContextExtensions } from '@/executor/execution/types'
import type { BlockLog, ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { LoopConfigWithNodes } from '@/executor/types/loop'
import { replaceValidReferences } from '@/executor/utils/reference-validation'
import {
Expand All @@ -32,13 +32,18 @@ export interface LoopContinuationResult {

export class LoopOrchestrator {
private edgeManager: EdgeManager | null = null
private contextExtensions: ContextExtensions | null = null

constructor(
private dag: DAG,
private state: BlockStateController,
private resolver: VariableResolver
) {}

setContextExtensions(contextExtensions: ContextExtensions): void {
this.contextExtensions = contextExtensions
}

setEdgeManager(edgeManager: EdgeManager): void {
this.edgeManager = edgeManager
}
Expand All @@ -58,18 +63,54 @@ export class LoopOrchestrator {
const loopType = loopConfig.loopType

switch (loopType) {
case 'for':
case 'for': {
scope.loopType = 'for'
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS

if (requestedIterations > DEFAULTS.MAX_LOOP_ITERATIONS) {
const errorMessage = `For loop iterations (${requestedIterations}) exceeds maximum allowed (${DEFAULTS.MAX_LOOP_ITERATIONS}). Loop execution blocked.`
logger.error(errorMessage, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage)
// Set to 0 iterations to prevent loop from running
scope.maxIterations = 0
scope.validationError = errorMessage
} else {
scope.maxIterations = requestedIterations
}

scope.condition = buildLoopIndexCondition(scope.maxIterations)
break
}

case 'forEach': {
scope.loopType = 'forEach'
if (!Array.isArray(loopConfig.forEachItems)) {
const errorMessage =
'ForEach loop collection is not a valid array. Loop execution blocked.'
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage)
scope.items = []
scope.maxIterations = 0
scope.validationError = errorMessage
scope.condition = buildLoopIndexCondition(0)
break
}
const items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
scope.items = items
scope.maxIterations = items.length
scope.item = items[0]
const originalLength = items.length

if (originalLength > DEFAULTS.MAX_FOREACH_ITEMS) {
const errorMessage = `ForEach loop collection size (${originalLength}) exceeds maximum allowed (${DEFAULTS.MAX_FOREACH_ITEMS}). Loop execution blocked.`
logger.error(errorMessage, { loopId, originalLength })
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage)
scope.items = []
scope.maxIterations = 0
scope.validationError = errorMessage
} else {
scope.items = items
scope.maxIterations = items.length
scope.item = items[0]
}

scope.condition = buildLoopIndexCondition(scope.maxIterations)
break
}
Expand All @@ -79,15 +120,28 @@ export class LoopOrchestrator {
scope.condition = loopConfig.whileCondition
break

case 'doWhile':
case 'doWhile': {
scope.loopType = 'doWhile'
if (loopConfig.doWhileCondition) {
scope.condition = loopConfig.doWhileCondition
} else {
scope.maxIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS
const requestedIterations = loopConfig.iterations || DEFAULTS.MAX_LOOP_ITERATIONS

if (requestedIterations > DEFAULTS.MAX_LOOP_ITERATIONS) {
const errorMessage = `Do-While loop iterations (${requestedIterations}) exceeds maximum allowed (${DEFAULTS.MAX_LOOP_ITERATIONS}). Loop execution blocked.`
logger.error(errorMessage, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage)
// Set to 0 iterations to prevent loop from running
scope.maxIterations = 0
scope.validationError = errorMessage
} else {
scope.maxIterations = requestedIterations
}

scope.condition = buildLoopIndexCondition(scope.maxIterations)
}
break
}

default:
throw new Error(`Unknown loop type: ${loopType}`)
Expand All @@ -100,6 +154,47 @@ export class LoopOrchestrator {
return scope
}

/**
* Adds an error log entry for loop validation errors.
* These errors appear in the block console on the logs dashboard.
*/
private addLoopErrorLog(
ctx: ExecutionContext,
loopId: string,
loopType: string,
errorMessage: string
): void {
const now = new Date().toISOString()

// Get the actual loop block name from the workflow
const loopBlock = ctx.workflow?.blocks?.find((b) => b.id === loopId)
const blockName = loopBlock?.metadata?.name || `Loop`

const blockLog: BlockLog = {
blockId: loopId,
blockName,
blockType: 'loop',
startedAt: now,
endedAt: now,
durationMs: 0,
success: false,
error: errorMessage,
input: {},
output: { error: errorMessage },
loopId,
}
ctx.blockLogs.push(blockLog)

// Emit the error through onBlockComplete callback so it appears in the UI console
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(loopId, blockName, 'loop', {
input: {},
output: { error: errorMessage },
executionTime: 0,
})
}
}

storeLoopNodeOutput(
ctx: ExecutionContext,
loopId: string,
Expand Down
98 changes: 96 additions & 2 deletions apps/sim/executor/orchestrators/parallel.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { createLogger } from '@/lib/logs/console/logger'
import { DEFAULTS } from '@/executor/constants'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { ParallelScope } from '@/executor/execution/state'
import type { BlockStateWriter } from '@/executor/execution/types'
import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
import type { BlockLog, ExecutionContext, NormalizedBlockOutput } from '@/executor/types'
import type { ParallelConfigWithNodes } from '@/executor/types/parallel'
import {
buildBranchNodeId,
Expand Down Expand Up @@ -32,6 +33,7 @@ export interface ParallelAggregationResult {

export class ParallelOrchestrator {
private resolver: VariableResolver | null = null
private contextExtensions: ContextExtensions | null = null

constructor(
private dag: DAG,
Expand All @@ -42,18 +44,70 @@ export class ParallelOrchestrator {
this.resolver = resolver
}

setContextExtensions(contextExtensions: ContextExtensions): void {
this.contextExtensions = contextExtensions
}

initializeParallelScope(
ctx: ExecutionContext,
parallelId: string,
totalBranches: number,
terminalNodesCount = 1
): ParallelScope {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)

// Validate distribution items if parallel uses distribution
if (parallelConfig?.distribution !== undefined && parallelConfig?.distribution !== null) {
if (!Array.isArray(parallelConfig.distribution)) {
const errorMessage =
'Parallel distribution is not a valid array. Parallel execution blocked.'
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
this.addParallelErrorLog(ctx, parallelId, errorMessage)

const scope: ParallelScope = {
parallelId,
totalBranches: 0,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 0,
items: [],
validationError: errorMessage,
}
if (!ctx.parallelExecutions) {
ctx.parallelExecutions = new Map()
}
ctx.parallelExecutions.set(parallelId, scope)
return scope
}
}

const items = parallelConfig ? this.resolveDistributionItems(ctx, parallelConfig) : undefined

// If we have more items than pre-built branches, expand the DAG
const actualBranchCount = items && items.length > totalBranches ? items.length : totalBranches

// Validate branch count doesn't exceed maximum
if (actualBranchCount > DEFAULTS.MAX_PARALLEL_BRANCHES) {
const errorMessage = `Parallel branch count (${actualBranchCount}) exceeds maximum allowed (${DEFAULTS.MAX_PARALLEL_BRANCHES}). Parallel execution blocked.`
logger.error(errorMessage, { parallelId, actualBranchCount })
this.addParallelErrorLog(ctx, parallelId, errorMessage)

const scope: ParallelScope = {
parallelId,
totalBranches: 0,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 0,
items: [],
validationError: errorMessage,
}
if (!ctx.parallelExecutions) {
ctx.parallelExecutions = new Map()
}
ctx.parallelExecutions.set(parallelId, scope)
return scope
}

const scope: ParallelScope = {
parallelId,
totalBranches: actualBranchCount,
Expand Down Expand Up @@ -108,6 +162,46 @@ export class ParallelOrchestrator {
return scope
}

/**
* Adds an error log entry for parallel validation errors.
* These errors appear in the block console on the logs dashboard.
*/
private addParallelErrorLog(
ctx: ExecutionContext,
parallelId: string,
errorMessage: string
): void {
const now = new Date().toISOString()

// Get the actual parallel block name from the workflow
const parallelBlock = ctx.workflow?.blocks?.find((b) => b.id === parallelId)
const blockName = parallelBlock?.metadata?.name || 'Parallel'

const blockLog: BlockLog = {
blockId: parallelId,
blockName,
blockType: 'parallel',
startedAt: now,
endedAt: now,
durationMs: 0,
success: false,
error: errorMessage,
input: {},
output: { error: errorMessage },
parallelId,
}
ctx.blockLogs.push(blockLog)

// Emit the error through onBlockComplete callback so it appears in the UI console
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(parallelId, blockName, 'parallel', {
input: {},
output: { error: errorMessage },
executionTime: 0,
})
}
}

/**
* Dynamically expand the DAG to include additional branch nodes when
* the resolved item count exceeds the pre-built branch count.
Expand Down
4 changes: 3 additions & 1 deletion apps/sim/lib/logs/execution/trace-spans/trace-spans.ts
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,10 @@ function groupIterationBlocks(spans: TraceSpan[]): TraceSpan[] {
}
})

// Include loop/parallel spans that have errors (e.g., validation errors that blocked execution)
// These won't have iteration children, so they should appear directly in results
const nonIterationContainerSpans = normalSpans.filter(
(span) => span.type !== 'parallel' && span.type !== 'loop'
(span) => (span.type !== 'parallel' && span.type !== 'loop') || span.status === 'error'
)

if (iterationSpans.length > 0) {
Expand Down
Loading