Skip to content

Commit b1cd8d1

Browse files
fix(executor): workflow abort has to send abort signal to route for correct state update (#2571)
1 parent 1145f5c commit b1cd8d1

File tree

9 files changed

+54
-62
lines changed

9 files changed

+54
-62
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
496496
}
497497

498498
const encoder = new TextEncoder()
499-
let executorInstance: any = null
499+
const abortController = new AbortController()
500500
let isStreamClosed = false
501501

502502
const stream = new ReadableStream<Uint8Array>({
@@ -688,11 +688,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
688688
onBlockStart,
689689
onBlockComplete,
690690
onStream,
691-
onExecutorCreated: (executor) => {
692-
executorInstance = executor
693-
},
694691
},
695692
loggingSession,
693+
abortSignal: abortController.signal,
696694
})
697695

698696
if (result.status === 'paused') {
@@ -769,11 +767,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
769767
},
770768
cancel() {
771769
isStreamClosed = true
772-
logger.info(`[${requestId}] Client aborted SSE stream, cancelling executor`)
773-
774-
if (executorInstance && typeof executorInstance.cancel === 'function') {
775-
executorInstance.cancel()
776-
}
770+
logger.info(
771+
`[${requestId}] Client aborted SSE stream, signalling cancellation via AbortController`
772+
)
773+
abortController.abort()
777774
},
778775
})
779776

apps/sim/executor/execution/engine.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export class ExecutionEngine {
3939
this.initializeQueue(triggerBlockId)
4040

4141
while (this.hasWork()) {
42-
if (this.context.isCancelled && this.executing.size === 0) {
42+
if (this.context.abortSignal?.aborted && this.executing.size === 0) {
4343
break
4444
}
4545
await this.processQueue()
@@ -54,7 +54,7 @@ export class ExecutionEngine {
5454
this.context.metadata.endTime = new Date(endTime).toISOString()
5555
this.context.metadata.duration = endTime - startTime
5656

57-
if (this.context.isCancelled) {
57+
if (this.context.abortSignal?.aborted) {
5858
return {
5959
success: false,
6060
output: this.finalOutput,
@@ -75,7 +75,7 @@ export class ExecutionEngine {
7575
this.context.metadata.endTime = new Date(endTime).toISOString()
7676
this.context.metadata.duration = endTime - startTime
7777

78-
if (this.context.isCancelled) {
78+
if (this.context.abortSignal?.aborted) {
7979
return {
8080
success: false,
8181
output: this.finalOutput,
@@ -234,7 +234,7 @@ export class ExecutionEngine {
234234

235235
private async processQueue(): Promise<void> {
236236
while (this.readyQueue.length > 0) {
237-
if (this.context.isCancelled) {
237+
if (this.context.abortSignal?.aborted) {
238238
break
239239
}
240240
const nodeId = this.dequeue()

apps/sim/executor/execution/executor.ts

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ export class DAGExecutor {
3737
private workflowInput: WorkflowInput
3838
private workflowVariables: Record<string, unknown>
3939
private contextExtensions: ContextExtensions
40-
private isCancelled = false
4140
private dagBuilder: DAGBuilder
4241

4342
constructor(options: DAGExecutorOptions) {
@@ -54,13 +53,6 @@ export class DAGExecutor {
5453
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
5554
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
5655

57-
// Link cancellation flag to context
58-
Object.defineProperty(context, 'isCancelled', {
59-
get: () => this.isCancelled,
60-
enumerable: true,
61-
configurable: true,
62-
})
63-
6456
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
6557
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
6658
loopOrchestrator.setContextExtensions(this.contextExtensions)
@@ -82,10 +74,6 @@ export class DAGExecutor {
8274
return await engine.run(triggerBlockId)
8375
}
8476

85-
cancel(): void {
86-
this.isCancelled = true
87-
}
88-
8977
async continueExecution(
9078
_pendingBlocks: string[],
9179
context: ExecutionContext
@@ -180,6 +168,7 @@ export class DAGExecutor {
180168
onStream: this.contextExtensions.onStream,
181169
onBlockStart: this.contextExtensions.onBlockStart,
182170
onBlockComplete: this.contextExtensions.onBlockComplete,
171+
abortSignal: this.contextExtensions.abortSignal,
183172
}
184173

185174
if (this.contextExtensions.resumeFromSnapshot) {

apps/sim/executor/execution/snapshot.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ export interface ExecutionCallbacks {
3434
blockType: string,
3535
output: any
3636
) => Promise<void>
37-
onExecutorCreated?: (executor: any) => void
3837
}
3938

4039
export interface SerializableExecutionState {

apps/sim/executor/execution/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ export interface ContextExtensions {
2222
dagIncomingEdges?: Record<string, string[]>
2323
snapshotState?: SerializableExecutionState
2424
metadata?: ExecutionMetadata
25+
/**
26+
* AbortSignal for cancellation support.
27+
* When aborted, the execution should stop gracefully.
28+
*/
29+
abortSignal?: AbortSignal
2530
onStream?: (streamingExecution: unknown) => Promise<void>
2631
onBlockStart?: (
2732
blockId: string,

apps/sim/executor/handlers/wait/wait-handler.ts

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,37 @@
1-
import { createLogger } from '@/lib/logs/console/logger'
21
import { BlockType } from '@/executor/constants'
32
import type { BlockHandler, ExecutionContext } from '@/executor/types'
43
import type { SerializedBlock } from '@/serializer/types'
54

6-
const logger = createLogger('WaitBlockHandler')
7-
85
/**
9-
* Helper function to sleep for a specified number of milliseconds
10-
* On client-side: checks for cancellation every 100ms (non-blocking for UI)
11-
* On server-side: simple sleep without polling (server execution can't be cancelled mid-flight)
6+
* Helper function to sleep for a specified number of milliseconds with AbortSignal support.
7+
* The sleep will be cancelled immediately when the AbortSignal is aborted.
128
*/
13-
const sleep = async (ms: number, checkCancelled?: () => boolean): Promise<boolean> => {
14-
const isClientSide = typeof window !== 'undefined'
15-
16-
if (!isClientSide) {
17-
await new Promise((resolve) => setTimeout(resolve, ms))
18-
return true
9+
const sleep = async (ms: number, signal?: AbortSignal): Promise<boolean> => {
10+
if (signal?.aborted) {
11+
return false
1912
}
2013

21-
const chunkMs = 100
22-
let elapsed = 0
14+
return new Promise((resolve) => {
15+
let timeoutId: NodeJS.Timeout | undefined
2316

24-
while (elapsed < ms) {
25-
if (checkCancelled?.()) {
26-
return false
17+
const onAbort = () => {
18+
if (timeoutId) {
19+
clearTimeout(timeoutId)
20+
}
21+
resolve(false)
2722
}
2823

29-
const sleepTime = Math.min(chunkMs, ms - elapsed)
30-
await new Promise((resolve) => setTimeout(resolve, sleepTime))
31-
elapsed += sleepTime
32-
}
24+
if (signal) {
25+
signal.addEventListener('abort', onAbort, { once: true })
26+
}
3327

34-
return true
28+
timeoutId = setTimeout(() => {
29+
if (signal) {
30+
signal.removeEventListener('abort', onAbort)
31+
}
32+
resolve(true)
33+
}, ms)
34+
})
3535
}
3636

3737
/**
@@ -65,11 +65,7 @@ export class WaitBlockHandler implements BlockHandler {
6565
throw new Error(`Wait time exceeds maximum of ${maxDisplay}`)
6666
}
6767

68-
const checkCancelled = () => {
69-
return (ctx as any).isCancelled === true
70-
}
71-
72-
const completed = await sleep(waitMs, checkCancelled)
68+
const completed = await sleep(waitMs, ctx.abortSignal)
7369

7470
if (!completed) {
7571
return {

apps/sim/executor/orchestrators/loop.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ export class LoopOrchestrator {
229229
}
230230
}
231231

232-
if (ctx.isCancelled) {
232+
if (ctx.abortSignal?.aborted) {
233233
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
234234
return this.createExitResult(ctx, loopId, scope)
235235
}

apps/sim/executor/types.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,12 @@ export interface ExecutionContext {
222222
output: any
223223
) => Promise<void>
224224

225-
// Cancellation support
226-
isCancelled?: boolean
225+
/**
226+
* AbortSignal for cancellation support.
227+
* When the signal is aborted, execution should stop gracefully.
228+
* This is triggered when the SSE client disconnects.
229+
*/
230+
abortSignal?: AbortSignal
227231

228232
// Dynamically added nodes that need to be scheduled (e.g., from parallel expansion)
229233
pendingDynamicNodes?: string[]

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ export interface ExecuteWorkflowCoreOptions {
3232
callbacks: ExecutionCallbacks
3333
loggingSession: LoggingSession
3434
skipLogCreation?: boolean // For resume executions - reuse existing log entry
35+
/**
36+
* AbortSignal for cancellation support.
37+
* When aborted (e.g., client disconnects from SSE), execution stops gracefully.
38+
*/
39+
abortSignal?: AbortSignal
3540
}
3641

3742
function parseVariableValueByType(value: any, type: string): any {
@@ -98,11 +103,11 @@ function parseVariableValueByType(value: any, type: string): any {
98103
export async function executeWorkflowCore(
99104
options: ExecuteWorkflowCoreOptions
100105
): Promise<ExecutionResult> {
101-
const { snapshot, callbacks, loggingSession, skipLogCreation } = options
106+
const { snapshot, callbacks, loggingSession, skipLogCreation, abortSignal } = options
102107
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
103108
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
104109
metadata
105-
const { onBlockStart, onBlockComplete, onStream, onExecutorCreated } = callbacks
110+
const { onBlockStart, onBlockComplete, onStream } = callbacks
106111

107112
const providedWorkspaceId = metadata.workspaceId
108113
if (!providedWorkspaceId) {
@@ -326,6 +331,7 @@ export async function executeWorkflowCore(
326331
dagIncomingEdges: snapshot.state?.dagIncomingEdges,
327332
snapshotState: snapshot.state,
328333
metadata,
334+
abortSignal,
329335
}
330336

331337
const executorInstance = new Executor({
@@ -349,10 +355,6 @@ export async function executeWorkflowCore(
349355
}
350356
}
351357

352-
if (onExecutorCreated) {
353-
onExecutorCreated(executorInstance)
354-
}
355-
356358
const result = (await executorInstance.execute(
357359
workflowId,
358360
resolvedTriggerBlockId

0 commit comments

Comments
 (0)