Skip to content

Commit f79e87e

Browse files
improvement(parallel): update parallel subflow to support conditional routing (#1444)
* improvement(parallel): update parallel subblock to support conditional routing * ignore disconnected blocks in parallel/loop * added utils * fix z-index issues with edges in subflow * fixed aggregation of results from loop & parallel blocks * feat(manual-trigger): add manual trigger (#1452) * feat(manual-trigger): add manual trigger * consolidate input format extraction * exclude triggers from console logs + deployed chat error surfacing * works * centralize error messages + logging for deployed chat * fix(css-config): use correct version (#1453) * fix(css-config): use correct version * fix lint * improvement(parallel): update parallel subblock to support conditional routing * ignore disconnected blocks in parallel/loop * added utils * fix z-index issues with edges in subflow * fixed aggregation of results from loop & parallel blocks * change z index within component and remvoe global css * fix cascade deletion subflows sockets case * improve results array for subflows * fix onedgeclick inside subflows * fix test --------- Co-authored-by: waleed <waleed> Co-authored-by: Vikhyath Mondreti <[email protected]> Co-authored-by: Vikhyath Mondreti <[email protected]>
1 parent 18599ac commit f79e87e

File tree

17 files changed

+698
-179
lines changed

17 files changed

+698
-179
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-edge/workflow-edge.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ export const WorkflowEdge = ({
178178
style={{
179179
transform: `translate(-50%, -50%) translate(${labelX}px,${labelY}px)`,
180180
pointerEvents: 'all',
181-
zIndex: 22,
181+
zIndex: 100,
182182
}}
183183
onClick={(e) => {
184184
e.preventDefault()

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1751,9 +1751,7 @@ const WorkflowContent = React.memo(() => {
17511751

17521752
// An edge is inside a loop if either source or target has a parent
17531753
// If source and target have different parents, prioritize source's parent
1754-
const parentLoopId =
1755-
(sourceNode?.id && blocks[sourceNode.id]?.data?.parentId) ||
1756-
(targetNode?.id && blocks[targetNode.id]?.data?.parentId)
1754+
const parentLoopId = sourceNode?.parentId || targetNode?.parentId
17571755

17581756
// Create a unique identifier that combines edge ID and parent context
17591757
const contextId = `${edge.id}${parentLoopId ? `-${parentLoopId}` : ''}`
@@ -1772,9 +1770,7 @@ const WorkflowContent = React.memo(() => {
17721770
// Check if this edge connects nodes inside a loop
17731771
const sourceNode = getNodes().find((n) => n.id === edge.source)
17741772
const targetNode = getNodes().find((n) => n.id === edge.target)
1775-
const parentLoopId =
1776-
(sourceNode?.id && blocks[sourceNode.id]?.data?.parentId) ||
1777-
(targetNode?.id && blocks[targetNode.id]?.data?.parentId)
1773+
const parentLoopId = sourceNode?.parentId || targetNode?.parentId
17781774
const isInsideLoop = Boolean(parentLoopId)
17791775

17801776
// Create a unique context ID for this edge
@@ -1867,6 +1863,12 @@ const WorkflowContent = React.memo(() => {
18671863
return (
18681864
<div className='flex h-screen w-full flex-col overflow-hidden'>
18691865
<div className='relative h-full w-full flex-1 transition-all duration-200'>
1866+
<style jsx global>{`
1867+
/* Ensure edge labels (e.g., delete X) render above group/subflow nodes */
1868+
.react-flow__edge-labels {
1869+
z-index: 60 !important;
1870+
}
1871+
`}</style>
18701872
<div className='fixed top-0 right-0 z-10'>
18711873
<Panel />
18721874
</div>

apps/sim/executor/__test-utils__/executor-mocks.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,8 @@ export const createParallelManagerMock = (options?: {
604604
getIterationItem: vi.fn(),
605605
areAllVirtualBlocksExecuted: vi
606606
.fn()
607-
.mockImplementation((parallelId, parallel, executedBlocks, state) => {
607+
.mockImplementation((parallelId, parallel, executedBlocks, state, context) => {
608+
// Simple mock implementation - check all blocks (ignoring conditional routing for tests)
608609
for (const nodeId of parallel.nodes) {
609610
for (let i = 0; i < state.parallelCount; i++) {
610611
const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${i}`

apps/sim/executor/consts.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ export enum BlockType {
1414
RESPONSE = 'response',
1515
WORKFLOW = 'workflow',
1616
STARTER = 'starter',
17-
SCHEDULE = 'schedule',
18-
WEBHOOK_TRIGGER = 'webhook_trigger',
1917
}
2018

2119
/**

apps/sim/executor/handlers/condition/condition-handler.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,9 @@ export class ConditionBlockHandler implements BlockHandler {
194194
`Condition block ${block.id} selected path: ${selectedCondition.title} (${selectedCondition.id}) -> ${targetBlock.metadata?.name || targetBlock.id}`
195195
)
196196

197-
// Update context decisions
198-
context.decisions.condition.set(block.id, selectedCondition.id)
197+
// Update context decisions - use virtual block ID if available (for parallel execution)
198+
const decisionKey = context.currentVirtualBlockId || block.id
199+
context.decisions.condition.set(decisionKey, selectedCondition.id)
199200

200201
// Return output, preserving source output structure if possible
201202
return {

apps/sim/executor/handlers/parallel/parallel-handler.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createLogger } from '@/lib/logs/console/logger'
22
import type { BlockOutput } from '@/blocks/types'
33
import { BlockType } from '@/executor/consts'
4+
import { ParallelRoutingUtils } from '@/executor/parallels/utils'
45
import type { PathTracker } from '@/executor/path/path'
56
import type { InputResolver } from '@/executor/resolver/resolver'
67
import { Routing } from '@/executor/routing/routing'
@@ -338,17 +339,13 @@ export class ParallelBlockHandler implements BlockHandler {
338339

339340
if (!parallel || !parallelState) return false
340341

341-
// Check each node in the parallel for all iterations
342-
for (const nodeId of parallel.nodes) {
343-
for (let i = 0; i < parallelState.parallelCount; i++) {
344-
const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${i}`
345-
if (!context.executedBlocks.has(virtualBlockId)) {
346-
return false
347-
}
348-
}
349-
}
350-
351-
return true
342+
// Use the shared utility that respects conditional routing
343+
return ParallelRoutingUtils.areAllRequiredVirtualBlocksExecuted(
344+
parallel,
345+
parallelState.parallelCount,
346+
context.executedBlocks,
347+
context
348+
)
352349
}
353350

354351
/**

apps/sim/executor/index.ts

Lines changed: 181 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
} from '@/executor/handlers'
2121
import { LoopManager } from '@/executor/loops/loops'
2222
import { ParallelManager } from '@/executor/parallels/parallels'
23+
import { ParallelRoutingUtils } from '@/executor/parallels/utils'
2324
import { PathTracker } from '@/executor/path/path'
2425
import { InputResolver } from '@/executor/resolver/resolver'
2526
import type {
@@ -31,6 +32,7 @@ import type {
3132
StreamingExecution,
3233
} from '@/executor/types'
3334
import { streamingResponseFormatProcessor } from '@/executor/utils'
35+
import { VirtualBlockUtils } from '@/executor/utils/virtual-blocks'
3436
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
3537
import { useExecutionStore } from '@/stores/execution/store'
3638
import { useConsoleStore } from '@/stores/panel/console/store'
@@ -1111,7 +1113,7 @@ export class Executor {
11111113
if (parallelState) {
11121114
let allVirtualInstancesExecuted = true
11131115
for (let i = 0; i < parallelState.parallelCount; i++) {
1114-
const virtualBlockId = `${block.id}_parallel_${insideParallel}_iteration_${i}`
1116+
const virtualBlockId = VirtualBlockUtils.generateParallelId(block.id, insideParallel, i)
11151117
if (!executedBlocks.has(virtualBlockId)) {
11161118
allVirtualInstancesExecuted = false
11171119
break
@@ -1205,7 +1207,9 @@ export class Executor {
12051207
}
12061208

12071209
/**
1208-
* Check if a specific parallel iteration is complete (all blocks executed).
1210+
* Check if a specific parallel iteration is complete (all blocks that should execute have executed).
1211+
* This method now considers conditional execution paths - only blocks in the active execution
1212+
* path are expected to execute.
12091213
*
12101214
* @param parallelId - ID of the parallel block
12111215
* @param iteration - Iteration index to check
@@ -1223,15 +1227,124 @@ export class Executor {
12231227
return true
12241228
}
12251229

1226-
for (const nodeId of parallel.nodes) {
1227-
const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${iteration}`
1230+
const expectedBlocks = this.getExpectedBlocksForIteration(
1231+
parallelId,
1232+
iteration,
1233+
parallel,
1234+
context
1235+
)
1236+
1237+
// Check if all expected blocks have been executed
1238+
for (const nodeId of expectedBlocks) {
1239+
const virtualBlockId = VirtualBlockUtils.generateParallelId(nodeId, parallelId, iteration)
12281240
if (!context.executedBlocks.has(virtualBlockId)) {
12291241
return false
12301242
}
12311243
}
12321244
return true
12331245
}
12341246

1247+
/**
1248+
* Get the blocks that are expected to execute in a parallel iteration based on
1249+
* the active execution path. This handles conditional logic where some blocks
1250+
* may not execute due to condition or router blocks.
1251+
*
1252+
* @param parallelId - ID of the parallel block
1253+
* @param iteration - Iteration index
1254+
* @param parallel - Parallel configuration
1255+
* @param context - Execution context
1256+
* @returns Array of node IDs that should execute in this iteration
1257+
*/
1258+
private getExpectedBlocksForIteration(
1259+
parallelId: string,
1260+
iteration: number,
1261+
parallel: any,
1262+
context: ExecutionContext
1263+
): string[] {
1264+
if (!parallel || !parallel.nodes) {
1265+
return []
1266+
}
1267+
1268+
const expectedBlocks: string[] = []
1269+
1270+
for (const nodeId of parallel.nodes) {
1271+
const block = this.actualWorkflow.blocks.find((b) => b.id === nodeId)
1272+
1273+
// If block doesn't exist in workflow, fall back to original behavior (assume it should execute)
1274+
// This maintains compatibility with tests and edge cases
1275+
if (!block) {
1276+
expectedBlocks.push(nodeId)
1277+
continue
1278+
}
1279+
1280+
if (!block.enabled) {
1281+
continue
1282+
}
1283+
1284+
const virtualBlockId = VirtualBlockUtils.generateParallelId(nodeId, parallelId, iteration)
1285+
1286+
// Skip blocks that have already been executed
1287+
if (context.executedBlocks.has(virtualBlockId)) {
1288+
expectedBlocks.push(nodeId)
1289+
continue
1290+
}
1291+
1292+
// Check if this block should execute based on the active execution path
1293+
// We need to check if the original block is reachable based on current routing decisions
1294+
try {
1295+
const shouldExecute = this.shouldBlockExecuteInParallelIteration(
1296+
nodeId,
1297+
parallelId,
1298+
iteration,
1299+
context
1300+
)
1301+
1302+
if (shouldExecute) {
1303+
expectedBlocks.push(nodeId)
1304+
}
1305+
} catch (error) {
1306+
// If path checking fails, default to including the block to maintain existing behavior
1307+
logger.warn(
1308+
`Path check failed for block ${nodeId} in parallel ${parallelId}, iteration ${iteration}:`,
1309+
error
1310+
)
1311+
expectedBlocks.push(nodeId)
1312+
}
1313+
}
1314+
1315+
return expectedBlocks
1316+
}
1317+
1318+
/**
1319+
* Determines if a block should execute in a specific parallel iteration
1320+
* based on conditional routing and active execution paths.
1321+
*
1322+
* Blocks are excluded from execution if they are completely unconnected (no incoming connections).
1323+
* Starting blocks (with external connections only) and conditionally routed blocks execute as expected.
1324+
*
1325+
* @param nodeId - ID of the block to check
1326+
* @param parallelId - ID of the parallel block
1327+
* @param iteration - Current iteration index
1328+
* @param context - Execution context
1329+
* @returns Whether the block should execute
1330+
*/
1331+
private shouldBlockExecuteInParallelIteration(
1332+
nodeId: string,
1333+
parallelId: string,
1334+
iteration: number,
1335+
context: ExecutionContext
1336+
): boolean {
1337+
const parallel = this.actualWorkflow.parallels?.[parallelId]
1338+
if (!parallel) return false
1339+
1340+
return ParallelRoutingUtils.shouldBlockExecuteInParallelIteration(
1341+
nodeId,
1342+
parallel,
1343+
iteration,
1344+
context
1345+
)
1346+
}
1347+
12351348
/**
12361349
* Check if there are more parallel iterations to process.
12371350
* This ensures the execution loop continues when iterations are being processed sequentially.
@@ -1269,6 +1382,8 @@ export class Executor {
12691382

12701383
/**
12711384
* Process a single parallel iteration with topological ordering of dependencies.
1385+
* Now includes conditional execution logic - only processes blocks that should execute
1386+
* based on the active execution path (handles conditions, routers, etc.).
12721387
*
12731388
* @param parallelId - ID of the parallel block
12741389
* @param iteration - Current iteration index
@@ -1293,9 +1408,9 @@ export class Executor {
12931408
}
12941409
>()
12951410

1296-
// Build dependency graph for this iteration
1411+
// Build dependency graph for this iteration - only include blocks that should execute
12971412
for (const nodeId of parallel.nodes) {
1298-
const virtualBlockId = `${nodeId}_parallel_${parallelId}_iteration_${iteration}`
1413+
const virtualBlockId = VirtualBlockUtils.generateParallelId(nodeId, parallelId, iteration)
12991414
const isExecuted = context.executedBlocks.has(virtualBlockId)
13001415

13011416
if (isExecuted) {
@@ -1305,6 +1420,26 @@ export class Executor {
13051420
const block = this.actualWorkflow.blocks.find((b) => b.id === nodeId)
13061421
if (!block || !block.enabled) continue
13071422

1423+
// Check if this block should execute in this iteration based on conditional paths
1424+
try {
1425+
const shouldExecute = this.shouldBlockExecuteInParallelIteration(
1426+
nodeId,
1427+
parallelId,
1428+
iteration,
1429+
context
1430+
)
1431+
1432+
if (!shouldExecute) {
1433+
continue
1434+
}
1435+
} catch (error) {
1436+
// If path checking fails, default to processing the block to maintain existing behavior
1437+
logger.warn(
1438+
`Path check failed for block ${nodeId} in parallel ${parallelId}, iteration ${iteration}:`,
1439+
error
1440+
)
1441+
}
1442+
13081443
// Find dependencies within this iteration
13091444
const incomingConnections = this.actualWorkflow.connections.filter(
13101445
(conn) => conn.target === nodeId
@@ -1314,7 +1449,11 @@ export class Executor {
13141449
for (const conn of incomingConnections) {
13151450
// Check if the source is within the same parallel
13161451
if (parallel.nodes.includes(conn.source)) {
1317-
const sourceDependencyId = `${conn.source}_parallel_${parallelId}_iteration_${iteration}`
1452+
const sourceDependencyId = VirtualBlockUtils.generateParallelId(
1453+
conn.source,
1454+
parallelId,
1455+
iteration
1456+
)
13181457
dependencies.push(sourceDependencyId)
13191458
} else {
13201459
// External dependency - check if it's met
@@ -1422,7 +1561,11 @@ export class Executor {
14221561
sourceBlock &&
14231562
this.actualWorkflow.parallels?.[insideParallel]?.nodes.includes(conn.source)
14241563
) {
1425-
sourceId = `${conn.source}_parallel_${insideParallel}_iteration_${iterationIndex}`
1564+
sourceId = VirtualBlockUtils.generateParallelId(
1565+
conn.source,
1566+
insideParallel,
1567+
iterationIndex
1568+
)
14261569
}
14271570
}
14281571

@@ -1769,6 +1912,21 @@ export class Executor {
17691912
)
17701913
}
17711914

1915+
// Store result for loops (IDENTICAL to parallel logic)
1916+
const containingLoopId = this.resolver.getContainingLoopId(block.id)
1917+
if (containingLoopId && !parallelInfo) {
1918+
// Only store for loops if not already in a parallel (avoid double storage)
1919+
const currentIteration = context.loopIterations.get(containingLoopId)
1920+
if (currentIteration !== undefined) {
1921+
this.loopManager.storeIterationResult(
1922+
context,
1923+
containingLoopId,
1924+
currentIteration - 1, // Convert to 0-based index
1925+
output
1926+
)
1927+
}
1928+
}
1929+
17721930
// Update the execution log
17731931
blockLog.success = true
17741932
blockLog.output = output
@@ -1886,6 +2044,21 @@ export class Executor {
18862044
)
18872045
}
18882046

2047+
// Store result for loops (IDENTICAL to parallel logic)
2048+
const containingLoopId = this.resolver.getContainingLoopId(block.id)
2049+
if (containingLoopId && !parallelInfo) {
2050+
// Only store for loops if not already in a parallel (avoid double storage)
2051+
const currentIteration = context.loopIterations.get(containingLoopId)
2052+
if (currentIteration !== undefined) {
2053+
this.loopManager.storeIterationResult(
2054+
context,
2055+
containingLoopId,
2056+
currentIteration - 1, // Convert to 0-based index
2057+
output
2058+
)
2059+
}
2060+
}
2061+
18892062
// Update the execution log
18902063
blockLog.success = true
18912064
blockLog.output = output

0 commit comments

Comments
 (0)