Skip to content

Commit fecec18

Browse files
committed
fix(parallel): add parallel sentinel to make parallel-parallel and parallel-loop work correctly
1 parent 7793a6d commit fecec18

File tree

8 files changed

+265
-108
lines changed

8 files changed

+265
-108
lines changed

apps/sim/executor/constants.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export const EDGE = {
5353
LOOP_CONTINUE: 'loop_continue',
5454
LOOP_CONTINUE_ALT: 'loop-continue-source',
5555
LOOP_EXIT: 'loop_exit',
56+
PARALLEL_EXIT: 'parallel_exit',
5657
ERROR: 'error',
5758
SOURCE: 'source',
5859
DEFAULT: 'default',
@@ -88,6 +89,16 @@ export const PARALLEL = {
8889
SUFFIX: '₎',
8990
},
9091

92+
SENTINEL: {
93+
PREFIX: 'parallel-',
94+
START_SUFFIX: '-sentinel-start',
95+
END_SUFFIX: '-sentinel-end',
96+
START_TYPE: 'start' as SentinelType,
97+
END_TYPE: 'end' as SentinelType,
98+
START_NAME_PREFIX: 'Parallel Start',
99+
END_NAME_PREFIX: 'Parallel End',
100+
},
101+
91102
DEFAULT_COUNT: 1,
92103
} as const
93104

apps/sim/executor/dag/builder.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@ import { createLogger } from '@sim/logger'
22
import { EdgeConstructor } from '@/executor/dag/construction/edges'
33
import { LoopConstructor } from '@/executor/dag/construction/loops'
44
import { NodeConstructor } from '@/executor/dag/construction/nodes'
5+
import { ParallelConstructor } from '@/executor/dag/construction/parallels'
56
import { PathConstructor } from '@/executor/dag/construction/paths'
67
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
7-
import { buildSentinelStartId, extractBaseBlockId } from '@/executor/utils/subflow-utils'
8+
import {
9+
buildParallelSentinelStartId,
10+
buildSentinelStartId,
11+
extractBaseBlockId,
12+
} from '@/executor/utils/subflow-utils'
813
import type {
914
SerializedBlock,
1015
SerializedLoop,
@@ -31,6 +36,7 @@ export interface DAG {
3136
export class DAGBuilder {
3237
private pathConstructor = new PathConstructor()
3338
private loopConstructor = new LoopConstructor()
39+
private parallelConstructor = new ParallelConstructor()
3440
private nodeConstructor = new NodeConstructor()
3541
private edgeConstructor = new EdgeConstructor()
3642

@@ -50,6 +56,7 @@ export class DAGBuilder {
5056
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId)
5157

5258
this.loopConstructor.execute(dag, reachableBlocks)
59+
this.parallelConstructor.execute(dag, reachableBlocks)
5360

5461
const { blocksInLoops, blocksInParallels, pauseTriggerMapping } = this.nodeConstructor.execute(
5562
workflow,
@@ -135,7 +142,9 @@ export class DAGBuilder {
135142
)
136143
}
137144

138-
const sentinelStartNode = dag.nodes.get(buildSentinelStartId(id))
145+
const sentinelStartId =
146+
type === 'Loop' ? buildSentinelStartId(id) : buildParallelSentinelStartId(id)
147+
const sentinelStartNode = dag.nodes.get(sentinelStartId)
139148
if (!sentinelStartNode) return
140149

141150
const hasConnections = Array.from(sentinelStartNode.outgoingEdges.values()).some((edge) =>

apps/sim/executor/dag/construction/edges.ts

Lines changed: 49 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { EDGE, isConditionBlockType, isRouterBlockType } from '@/executor/consta
33
import type { DAG } from '@/executor/dag/builder'
44
import {
55
buildBranchNodeId,
6+
buildParallelSentinelEndId,
7+
buildParallelSentinelStartId,
68
buildSentinelEndId,
79
buildSentinelStartId,
810
calculateBranchCount,
@@ -51,7 +53,7 @@ export class EdgeConstructor {
5153
)
5254

5355
this.wireLoopSentinels(dag, reachableBlocks)
54-
this.wireParallelBlocks(workflow, dag, loopBlockIds, parallelBlockIds, pauseTriggerMapping)
56+
this.wireParallelSentinels(dag, pauseTriggerMapping)
5557
}
5658

5759
private buildMetadataMaps(workflow: SerializedWorkflow): EdgeMetadata {
@@ -157,43 +159,40 @@ export class EdgeConstructor {
157159
const sourceIsParallelBlock = parallelBlockIds.has(source)
158160
const targetIsParallelBlock = parallelBlockIds.has(target)
159161

160-
if (
161-
sourceIsLoopBlock ||
162-
targetIsLoopBlock ||
163-
sourceIsParallelBlock ||
164-
targetIsParallelBlock
165-
) {
166-
let loopSentinelStartId: string | undefined
167-
168-
if (sourceIsLoopBlock) {
169-
const sentinelEndId = buildSentinelEndId(originalSource)
170-
loopSentinelStartId = buildSentinelStartId(originalSource)
171-
172-
if (!dag.nodes.has(sentinelEndId) || !dag.nodes.has(loopSentinelStartId)) {
173-
continue
174-
}
175-
176-
source = sentinelEndId
177-
sourceHandle = EDGE.LOOP_EXIT
162+
if (sourceIsLoopBlock) {
163+
const sentinelEndId = buildSentinelEndId(originalSource)
164+
const loopSentinelStartId = buildSentinelStartId(originalSource)
165+
if (!dag.nodes.has(sentinelEndId) || !dag.nodes.has(loopSentinelStartId)) {
166+
continue
178167
}
168+
source = sentinelEndId
169+
sourceHandle = EDGE.LOOP_EXIT
170+
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
171+
}
179172

180-
if (targetIsLoopBlock) {
181-
const sentinelStartId = buildSentinelStartId(target)
182-
183-
if (!dag.nodes.has(sentinelStartId)) {
184-
continue
185-
}
186-
187-
target = sentinelStartId
173+
if (targetIsLoopBlock) {
174+
const sentinelStartId = buildSentinelStartId(target)
175+
if (!dag.nodes.has(sentinelStartId)) {
176+
continue
188177
}
178+
target = sentinelStartId
179+
}
189180

190-
if (loopSentinelStartId) {
191-
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
181+
if (sourceIsParallelBlock) {
182+
const sentinelEndId = buildParallelSentinelEndId(originalSource)
183+
if (!dag.nodes.has(sentinelEndId)) {
184+
continue
192185
}
186+
source = sentinelEndId
187+
sourceHandle = EDGE.PARALLEL_EXIT
188+
}
193189

194-
if (sourceIsParallelBlock || targetIsParallelBlock) {
190+
if (targetIsParallelBlock) {
191+
const sentinelStartId = buildParallelSentinelStartId(target)
192+
if (!dag.nodes.has(sentinelStartId)) {
195193
continue
196194
}
195+
target = sentinelStartId
197196
}
198197

199198
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
@@ -256,80 +255,40 @@ export class EdgeConstructor {
256255
}
257256
}
258257

259-
private wireParallelBlocks(
260-
workflow: SerializedWorkflow,
261-
dag: DAG,
262-
loopBlockIds: Set<string>,
263-
parallelBlockIds: Set<string>,
264-
pauseTriggerMapping: Map<string, string>
265-
): void {
258+
private wireParallelSentinels(dag: DAG, pauseTriggerMapping: Map<string, string>): void {
266259
for (const [parallelId, parallelConfig] of dag.parallelConfigs) {
267260
const nodes = parallelConfig.nodes
268261

269262
if (nodes.length === 0) continue
270263

264+
const sentinelStartId = buildParallelSentinelStartId(parallelId)
265+
const sentinelEndId = buildParallelSentinelEndId(parallelId)
266+
267+
if (!dag.nodes.has(sentinelStartId) || !dag.nodes.has(sentinelEndId)) {
268+
continue
269+
}
270+
271271
const { entryNodes, terminalNodes, branchCount } = this.findParallelBoundaryNodes(
272272
nodes,
273273
parallelId,
274274
dag
275275
)
276276

277-
logger.info('Wiring parallel block edges', {
278-
parallelId,
279-
entryNodes,
280-
terminalNodes,
281-
branchCount,
282-
})
283-
284-
for (const connection of workflow.connections) {
285-
const { source, target, sourceHandle, targetHandle } = connection
286-
287-
if (target === parallelId) {
288-
if (loopBlockIds.has(source) || parallelBlockIds.has(source)) continue
289-
290-
if (nodes.includes(source)) {
291-
logger.warn('Invalid: parallel block connected from its own internal node', {
292-
parallelId,
293-
source,
294-
})
295-
continue
296-
}
297-
298-
logger.info('Wiring edge to parallel block', { source, parallelId, entryNodes })
299-
300-
for (const entryNodeId of entryNodes) {
301-
for (let i = 0; i < branchCount; i++) {
302-
const branchNodeId = buildBranchNodeId(entryNodeId, i)
303-
304-
if (dag.nodes.has(branchNodeId)) {
305-
this.addEdge(dag, source, branchNodeId, sourceHandle, targetHandle)
306-
}
307-
}
277+
for (const entryNodeId of entryNodes) {
278+
for (let i = 0; i < branchCount; i++) {
279+
const branchNodeId = buildBranchNodeId(entryNodeId, i)
280+
if (dag.nodes.has(branchNodeId)) {
281+
this.addEdge(dag, sentinelStartId, branchNodeId)
308282
}
309283
}
284+
}
310285

311-
if (source === parallelId) {
312-
if (loopBlockIds.has(target) || parallelBlockIds.has(target)) continue
313-
314-
if (nodes.includes(target)) {
315-
logger.warn('Invalid: parallel block connected to its own internal node', {
316-
parallelId,
317-
target,
318-
})
319-
continue
320-
}
321-
322-
logger.info('Wiring edge from parallel block', { parallelId, target, terminalNodes })
323-
324-
for (const terminalNodeId of terminalNodes) {
325-
for (let i = 0; i < branchCount; i++) {
326-
const branchNodeId = buildBranchNodeId(terminalNodeId, i)
327-
328-
if (dag.nodes.has(branchNodeId)) {
329-
const resolvedSourceId = pauseTriggerMapping.get(branchNodeId) ?? branchNodeId
330-
this.addEdge(dag, resolvedSourceId, target, sourceHandle, targetHandle)
331-
}
332-
}
286+
for (const terminalNodeId of terminalNodes) {
287+
for (let i = 0; i < branchCount; i++) {
288+
const branchNodeId = buildBranchNodeId(terminalNodeId, i)
289+
if (dag.nodes.has(branchNodeId)) {
290+
const resolvedSourceId = pauseTriggerMapping.get(branchNodeId) ?? branchNodeId
291+
this.addEdge(dag, resolvedSourceId, sentinelEndId)
333292
}
334293
}
335294
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { createLogger } from '@sim/logger'
2+
import { BlockType, PARALLEL, type SentinelType } from '@/executor/constants'
3+
import type { DAG, DAGNode } from '@/executor/dag/builder'
4+
import {
5+
buildParallelSentinelEndId,
6+
buildParallelSentinelStartId,
7+
} from '@/executor/utils/subflow-utils'
8+
9+
const logger = createLogger('ParallelConstructor')
10+
11+
export class ParallelConstructor {
12+
execute(dag: DAG, reachableBlocks: Set<string>): void {
13+
for (const [parallelId, parallelConfig] of dag.parallelConfigs) {
14+
const parallelNodes = parallelConfig.nodes
15+
16+
if (parallelNodes.length === 0) {
17+
continue
18+
}
19+
20+
if (!this.hasReachableNodes(parallelNodes, reachableBlocks)) {
21+
continue
22+
}
23+
24+
this.createSentinelPair(dag, parallelId)
25+
}
26+
}
27+
28+
private hasReachableNodes(parallelNodes: string[], reachableBlocks: Set<string>): boolean {
29+
return parallelNodes.some((nodeId) => reachableBlocks.has(nodeId))
30+
}
31+
32+
private createSentinelPair(dag: DAG, parallelId: string): void {
33+
const startId = buildParallelSentinelStartId(parallelId)
34+
const endId = buildParallelSentinelEndId(parallelId)
35+
36+
dag.nodes.set(
37+
startId,
38+
this.createSentinelNode({
39+
id: startId,
40+
parallelId,
41+
sentinelType: PARALLEL.SENTINEL.START_TYPE,
42+
blockType: BlockType.SENTINEL_START,
43+
name: `${PARALLEL.SENTINEL.START_NAME_PREFIX} (${parallelId})`,
44+
})
45+
)
46+
47+
dag.nodes.set(
48+
endId,
49+
this.createSentinelNode({
50+
id: endId,
51+
parallelId,
52+
sentinelType: PARALLEL.SENTINEL.END_TYPE,
53+
blockType: BlockType.SENTINEL_END,
54+
name: `${PARALLEL.SENTINEL.END_NAME_PREFIX} (${parallelId})`,
55+
})
56+
)
57+
}
58+
59+
private createSentinelNode(config: {
60+
id: string
61+
parallelId: string
62+
sentinelType: SentinelType
63+
blockType: BlockType
64+
name: string
65+
}): DAGNode {
66+
return {
67+
id: config.id,
68+
block: {
69+
id: config.id,
70+
enabled: true,
71+
metadata: {
72+
id: config.blockType,
73+
name: config.name,
74+
parallelId: config.parallelId,
75+
},
76+
config: { params: {} },
77+
} as any,
78+
incomingEdges: new Set(),
79+
outgoingEdges: new Map(),
80+
metadata: {
81+
isSentinel: true,
82+
isParallelSentinel: true,
83+
sentinelType: config.sentinelType,
84+
parallelId: config.parallelId,
85+
},
86+
}
87+
}
88+
}

apps/sim/executor/dag/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export interface DAGEdge {
77

88
export interface NodeMetadata {
99
isParallelBranch?: boolean
10+
isParallelSentinel?: boolean
1011
parallelId?: string
1112
branchIndex?: number
1213
branchTotal?: number

apps/sim/executor/execution/edge-manager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ export class EdgeManager {
129129
return handle === EDGE.LOOP_CONTINUE || handle === EDGE.LOOP_CONTINUE_ALT
130130
}
131131

132+
if (output.selectedRoute === EDGE.PARALLEL_EXIT) {
133+
return handle === EDGE.PARALLEL_EXIT
134+
}
135+
132136
if (!handle) {
133137
return true
134138
}

0 commit comments

Comments
 (0)