Skip to content

Commit 77521a3

Browse files
fix(cancel-workflow-exec): move cancellation tracking for multi-task envs to redis (#2573)
* fix(cancel-workflow-exec): move cancellation tracking for multi-task envs to redis * cleanup cancellation keys after execution
1 parent cb8b9c5 commit 77521a3

File tree

8 files changed

+234
-23
lines changed

8 files changed

+234
-23
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
77
import { generateRequestId } from '@/lib/core/utils/request'
88
import { SSE_HEADERS } from '@/lib/core/utils/sse'
99
import { getBaseUrl } from '@/lib/core/utils/urls'
10+
import { markExecutionCancelled } from '@/lib/execution/cancellation'
1011
import { processInputFileFields } from '@/lib/execution/files'
1112
import { preprocessExecution } from '@/lib/execution/preprocessing'
1213
import { createLogger } from '@/lib/logs/console/logger'
@@ -767,10 +768,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
767768
},
768769
cancel() {
769770
isStreamClosed = true
770-
logger.info(
771-
`[${requestId}] Client aborted SSE stream, signalling cancellation via AbortController`
772-
)
771+
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
773772
abortController.abort()
773+
markExecutionCancelled(executionId).catch(() => {})
774774
},
775775
})
776776

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { type NextRequest, NextResponse } from 'next/server'
2+
import { checkHybridAuth } from '@/lib/auth/hybrid'
3+
import { markExecutionCancelled } from '@/lib/execution/cancellation'
4+
import { createLogger } from '@/lib/logs/console/logger'
5+
6+
const logger = createLogger('CancelExecutionAPI')
7+
8+
export const runtime = 'nodejs'
9+
export const dynamic = 'force-dynamic'
10+
11+
export async function POST(
12+
req: NextRequest,
13+
{ params }: { params: Promise<{ id: string; executionId: string }> }
14+
) {
15+
const { id: workflowId, executionId } = await params
16+
17+
try {
18+
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
19+
if (!auth.success || !auth.userId) {
20+
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
21+
}
22+
23+
logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId })
24+
25+
const marked = await markExecutionCancelled(executionId)
26+
27+
if (marked) {
28+
logger.info('Execution marked as cancelled in Redis', { executionId })
29+
} else {
30+
logger.info('Redis not available, cancellation will rely on connection close', {
31+
executionId,
32+
})
33+
}
34+
35+
return NextResponse.json({
36+
success: true,
37+
executionId,
38+
redisAvailable: marked,
39+
})
40+
} catch (error: any) {
41+
logger.error('Failed to cancel execution', { workflowId, executionId, error: error.message })
42+
return NextResponse.json(
43+
{ error: error.message || 'Failed to cancel execution' },
44+
{ status: 500 }
45+
)
46+
}
47+
}

apps/sim/executor/execution/engine.ts

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
12
import { createLogger } from '@/lib/logs/console/logger'
23
import { BlockType } from '@/executor/constants'
34
import type { DAG } from '@/executor/dag/builder'
@@ -23,6 +24,10 @@ export class ExecutionEngine {
2324
private finalOutput: NormalizedBlockOutput = {}
2425
private pausedBlocks: Map<string, PauseMetadata> = new Map()
2526
private allowResumeTriggers: boolean
27+
private cancelledFlag = false
28+
private lastCancellationCheck = 0
29+
private readonly useRedisCancellation: boolean
30+
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500
2631

2732
constructor(
2833
private context: ExecutionContext,
@@ -31,6 +36,35 @@ export class ExecutionEngine {
3136
private nodeOrchestrator: NodeExecutionOrchestrator
3237
) {
3338
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
39+
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
40+
}
41+
42+
private async checkCancellation(): Promise<boolean> {
43+
if (this.cancelledFlag) {
44+
return true
45+
}
46+
47+
if (this.useRedisCancellation) {
48+
const now = Date.now()
49+
if (now - this.lastCancellationCheck < this.CANCELLATION_CHECK_INTERVAL_MS) {
50+
return false
51+
}
52+
this.lastCancellationCheck = now
53+
54+
const cancelled = await isExecutionCancelled(this.context.executionId!)
55+
if (cancelled) {
56+
this.cancelledFlag = true
57+
logger.info('Execution cancelled via Redis', { executionId: this.context.executionId })
58+
}
59+
return cancelled
60+
}
61+
62+
if (this.context.abortSignal?.aborted) {
63+
this.cancelledFlag = true
64+
return true
65+
}
66+
67+
return false
3468
}
3569

3670
async run(triggerBlockId?: string): Promise<ExecutionResult> {
@@ -39,7 +73,7 @@ export class ExecutionEngine {
3973
this.initializeQueue(triggerBlockId)
4074

4175
while (this.hasWork()) {
42-
if (this.context.abortSignal?.aborted && this.executing.size === 0) {
76+
if ((await this.checkCancellation()) && this.executing.size === 0) {
4377
break
4478
}
4579
await this.processQueue()
@@ -54,7 +88,7 @@ export class ExecutionEngine {
5488
this.context.metadata.endTime = new Date(endTime).toISOString()
5589
this.context.metadata.duration = endTime - startTime
5690

57-
if (this.context.abortSignal?.aborted) {
91+
if (this.cancelledFlag) {
5892
return {
5993
success: false,
6094
output: this.finalOutput,
@@ -75,7 +109,7 @@ export class ExecutionEngine {
75109
this.context.metadata.endTime = new Date(endTime).toISOString()
76110
this.context.metadata.duration = endTime - startTime
77111

78-
if (this.context.abortSignal?.aborted) {
112+
if (this.cancelledFlag) {
79113
return {
80114
success: false,
81115
output: this.finalOutput,
@@ -234,7 +268,7 @@ export class ExecutionEngine {
234268

235269
private async processQueue(): Promise<void> {
236270
while (this.readyQueue.length > 0) {
237-
if (this.context.abortSignal?.aborted) {
271+
if (await this.checkCancellation()) {
238272
break
239273
}
240274
const nodeId = this.dequeue()

apps/sim/executor/handlers/wait/wait-handler.ts

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,61 @@
1+
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
12
import { BlockType } from '@/executor/constants'
23
import type { BlockHandler, ExecutionContext } from '@/executor/types'
34
import type { SerializedBlock } from '@/serializer/types'
45

5-
/**
6-
* Helper function to sleep for a specified number of milliseconds with AbortSignal support.
7-
* The sleep will be cancelled immediately when the AbortSignal is aborted.
8-
*/
9-
const sleep = async (ms: number, signal?: AbortSignal): Promise<boolean> => {
10-
if (signal?.aborted) {
6+
const CANCELLATION_CHECK_INTERVAL_MS = 500
7+
8+
interface SleepOptions {
9+
signal?: AbortSignal
10+
executionId?: string
11+
}
12+
13+
const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> => {
14+
const { signal, executionId } = options
15+
const useRedis = isRedisCancellationEnabled() && !!executionId
16+
17+
if (!useRedis && signal?.aborted) {
1118
return false
1219
}
1320

1421
return new Promise((resolve) => {
15-
let timeoutId: NodeJS.Timeout | undefined
22+
let mainTimeoutId: NodeJS.Timeout | undefined
23+
let checkIntervalId: NodeJS.Timeout | undefined
24+
let resolved = false
25+
26+
const cleanup = () => {
27+
if (mainTimeoutId) clearTimeout(mainTimeoutId)
28+
if (checkIntervalId) clearInterval(checkIntervalId)
29+
if (!useRedis && signal) signal.removeEventListener('abort', onAbort)
30+
}
1631

1732
const onAbort = () => {
18-
if (timeoutId) clearTimeout(timeoutId)
33+
if (resolved) return
34+
resolved = true
35+
cleanup()
1936
resolve(false)
2037
}
2138

22-
if (signal) {
39+
if (useRedis) {
40+
checkIntervalId = setInterval(async () => {
41+
if (resolved) return
42+
try {
43+
const cancelled = await isExecutionCancelled(executionId!)
44+
if (cancelled) {
45+
resolved = true
46+
cleanup()
47+
resolve(false)
48+
}
49+
} catch {}
50+
}, CANCELLATION_CHECK_INTERVAL_MS)
51+
} else if (signal) {
2352
signal.addEventListener('abort', onAbort, { once: true })
2453
}
2554

26-
timeoutId = setTimeout(() => {
27-
if (signal) {
28-
signal.removeEventListener('abort', onAbort)
29-
}
55+
mainTimeoutId = setTimeout(() => {
56+
if (resolved) return
57+
resolved = true
58+
cleanup()
3059
resolve(true)
3160
}, ms)
3261
})
@@ -63,7 +92,10 @@ export class WaitBlockHandler implements BlockHandler {
6392
throw new Error(`Wait time exceeds maximum of ${maxDisplay}`)
6493
}
6594

66-
const completed = await sleep(waitMs, ctx.abortSignal)
95+
const completed = await sleep(waitMs, {
96+
signal: ctx.abortSignal,
97+
executionId: ctx.executionId,
98+
})
6799

68100
if (!completed) {
69101
return {

apps/sim/executor/orchestrators/loop.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { generateRequestId } from '@/lib/core/utils/request'
2+
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
23
import { executeInIsolatedVM } from '@/lib/execution/isolated-vm'
34
import { createLogger } from '@/lib/logs/console/logger'
45
import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
@@ -229,7 +230,14 @@ export class LoopOrchestrator {
229230
}
230231
}
231232

232-
if (ctx.abortSignal?.aborted) {
233+
const useRedis = isRedisCancellationEnabled() && !!ctx.executionId
234+
let isCancelled = false
235+
if (useRedis) {
236+
isCancelled = await isExecutionCancelled(ctx.executionId!)
237+
} else {
238+
isCancelled = ctx.abortSignal?.aborted ?? false
239+
}
240+
if (isCancelled) {
233241
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
234242
return this.createExitResult(ctx, loopId, scope)
235243
}

apps/sim/hooks/use-execution-stream.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ export interface ExecuteStreamOptions {
7676
*/
7777
export function useExecutionStream() {
7878
const abortControllerRef = useRef<AbortController | null>(null)
79+
const currentExecutionRef = useRef<{ workflowId: string; executionId: string } | null>(null)
7980

8081
const execute = useCallback(async (options: ExecuteStreamOptions) => {
8182
const { workflowId, callbacks = {}, ...payload } = options
@@ -88,6 +89,7 @@ export function useExecutionStream() {
8889
// Create new abort controller
8990
const abortController = new AbortController()
9091
abortControllerRef.current = abortController
92+
currentExecutionRef.current = null
9193

9294
try {
9395
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
@@ -108,6 +110,11 @@ export function useExecutionStream() {
108110
throw new Error('No response body')
109111
}
110112

113+
const executionId = response.headers.get('X-Execution-Id')
114+
if (executionId) {
115+
currentExecutionRef.current = { workflowId, executionId }
116+
}
117+
111118
// Read SSE stream
112119
const reader = response.body.getReader()
113120
const decoder = new TextDecoder()
@@ -215,14 +222,23 @@ export function useExecutionStream() {
215222
throw error
216223
} finally {
217224
abortControllerRef.current = null
225+
currentExecutionRef.current = null
218226
}
219227
}, [])
220228

221229
const cancel = useCallback(() => {
230+
const execution = currentExecutionRef.current
231+
if (execution) {
232+
fetch(`/api/workflows/${execution.workflowId}/executions/${execution.executionId}/cancel`, {
233+
method: 'POST',
234+
}).catch(() => {})
235+
}
236+
222237
if (abortControllerRef.current) {
223238
abortControllerRef.current.abort()
224239
abortControllerRef.current = null
225240
}
241+
currentExecutionRef.current = null
226242
}, [])
227243

228244
return {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import { getRedisClient } from '@/lib/core/config/redis'
2+
import { createLogger } from '@/lib/logs/console/logger'
3+
4+
const logger = createLogger('ExecutionCancellation')
5+
6+
const EXECUTION_CANCEL_PREFIX = 'execution:cancel:'
7+
const EXECUTION_CANCEL_EXPIRY = 60 * 60
8+
9+
export function isRedisCancellationEnabled(): boolean {
10+
return getRedisClient() !== null
11+
}
12+
13+
/**
14+
* Mark an execution as cancelled in Redis.
15+
* Returns true if Redis is available and the flag was set, false otherwise.
16+
*/
17+
export async function markExecutionCancelled(executionId: string): Promise<boolean> {
18+
const redis = getRedisClient()
19+
if (!redis) {
20+
return false
21+
}
22+
23+
try {
24+
await redis.set(`${EXECUTION_CANCEL_PREFIX}${executionId}`, '1', 'EX', EXECUTION_CANCEL_EXPIRY)
25+
logger.info('Marked execution as cancelled', { executionId })
26+
return true
27+
} catch (error) {
28+
logger.error('Failed to mark execution as cancelled', { executionId, error })
29+
return false
30+
}
31+
}
32+
33+
/**
34+
* Check if an execution has been cancelled via Redis.
35+
* Returns false if Redis is not available (fallback to local abort signal).
36+
*/
37+
export async function isExecutionCancelled(executionId: string): Promise<boolean> {
38+
const redis = getRedisClient()
39+
if (!redis) {
40+
return false
41+
}
42+
43+
try {
44+
const result = await redis.exists(`${EXECUTION_CANCEL_PREFIX}${executionId}`)
45+
return result === 1
46+
} catch (error) {
47+
logger.error('Failed to check execution cancellation', { executionId, error })
48+
return false
49+
}
50+
}
51+
52+
/**
53+
* Clear the cancellation flag for an execution.
54+
*/
55+
export async function clearExecutionCancellation(executionId: string): Promise<void> {
56+
const redis = getRedisClient()
57+
if (!redis) {
58+
return
59+
}
60+
61+
try {
62+
await redis.del(`${EXECUTION_CANCEL_PREFIX}${executionId}`)
63+
} catch (error) {
64+
logger.error('Failed to clear execution cancellation', { executionId, error })
65+
}
66+
}

0 commit comments

Comments
 (0)