Skip to content

Commit 121e93e

Browse files
committed
fix(cancellation): workflow cancellation handled for non-local envs
1 parent 1145f5c commit 121e93e

File tree

13 files changed

+203
-22
lines changed

13 files changed

+203
-22
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { type NextRequest, NextResponse } from 'next/server'
2+
import { z } from 'zod'
3+
import { checkHybridAuth } from '@/lib/auth/hybrid'
4+
import { requestCancellation } from '@/lib/execution/active-executors'
5+
6+
const CancelExecutionSchema = z.object({
7+
executionId: z.string().uuid(),
8+
})
9+
10+
export const runtime = 'nodejs'
11+
export const dynamic = 'force-dynamic'
12+
13+
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
14+
await params
15+
16+
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
17+
if (!auth.success || !auth.userId) {
18+
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
19+
}
20+
21+
let body: any = {}
22+
try {
23+
const text = await req.text()
24+
if (text) {
25+
body = JSON.parse(text)
26+
}
27+
} catch {
28+
return NextResponse.json({ error: 'Invalid request body' }, { status: 400 })
29+
}
30+
31+
const validation = CancelExecutionSchema.safeParse(body)
32+
if (!validation.success) {
33+
return NextResponse.json({ error: 'Invalid request body' }, { status: 400 })
34+
}
35+
36+
const { executionId } = validation.data
37+
const success = await requestCancellation(executionId)
38+
return NextResponse.json({ success })
39+
}

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
77
import { generateRequestId } from '@/lib/core/utils/request'
88
import { SSE_HEADERS } from '@/lib/core/utils/sse'
99
import { getBaseUrl } from '@/lib/core/utils/urls'
10+
import { clearCancellation } from '@/lib/execution/active-executors'
1011
import { processInputFileFields } from '@/lib/execution/files'
1112
import { preprocessExecution } from '@/lib/execution/preprocessing'
1213
import { createLogger } from '@/lib/logs/console/logger'
@@ -496,7 +497,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
496497
}
497498

498499
const encoder = new TextEncoder()
499-
let executorInstance: any = null
500500
let isStreamClosed = false
501501

502502
const stream = new ReadableStream<Uint8Array>({
@@ -688,9 +688,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
688688
onBlockStart,
689689
onBlockComplete,
690690
onStream,
691-
onExecutorCreated: (executor) => {
692-
executorInstance = executor
693-
},
694691
},
695692
loggingSession,
696693
})
@@ -757,24 +754,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
757754
},
758755
})
759756
} finally {
757+
await clearCancellation(executionId)
758+
760759
if (!isStreamClosed) {
761760
try {
762761
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
763762
controller.close()
764763
} catch {
765-
// Stream already closed - nothing to do
764+
// Stream already closed
766765
}
767766
}
768767
}
769768
},
770-
cancel() {
771-
isStreamClosed = true
772-
logger.info(`[${requestId}] Client aborted SSE stream, cancelling executor`)
773-
774-
if (executorInstance && typeof executorInstance.cancel === 'function') {
775-
executorInstance.cancel()
776-
}
777-
},
778769
})
779770

780771
return new NextResponse(stream, {

apps/sim/executor/execution/engine.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { isCancellationRequested } from '@/lib/execution/active-executors'
12
import { createLogger } from '@/lib/logs/console/logger'
23
import { BlockType } from '@/executor/constants'
34
import type { DAG } from '@/executor/dag/builder'
@@ -33,13 +34,24 @@ export class ExecutionEngine {
3334
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
3435
}
3536

37+
private async checkCancellation(): Promise<boolean> {
38+
if (this.context.isCancelled) return true
39+
const executionId = this.context.executionId
40+
if (!executionId) return false
41+
const cancelled = await isCancellationRequested(executionId)
42+
if (cancelled) {
43+
this.context.isCancelled = true
44+
}
45+
return cancelled
46+
}
47+
3648
async run(triggerBlockId?: string): Promise<ExecutionResult> {
3749
const startTime = Date.now()
3850
try {
3951
this.initializeQueue(triggerBlockId)
4052

4153
while (this.hasWork()) {
42-
if (this.context.isCancelled && this.executing.size === 0) {
54+
if ((await this.checkCancellation()) && this.executing.size === 0) {
4355
break
4456
}
4557
await this.processQueue()
@@ -234,7 +246,7 @@ export class ExecutionEngine {
234246

235247
private async processQueue(): Promise<void> {
236248
while (this.readyQueue.length > 0) {
237-
if (this.context.isCancelled) {
249+
if (await this.checkCancellation()) {
238250
break
239251
}
240252
const nodeId = this.dequeue()

apps/sim/executor/execution/executor.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,11 @@ export class DAGExecutor {
5454
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
5555
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
5656

57-
// Link cancellation flag to context
5857
Object.defineProperty(context, 'isCancelled', {
5958
get: () => this.isCancelled,
59+
set: (value: boolean) => {
60+
this.isCancelled = value
61+
},
6062
enumerable: true,
6163
configurable: true,
6264
})

apps/sim/executor/execution/snapshot.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ export interface ExecutionCallbacks {
3434
blockType: string,
3535
output: any
3636
) => Promise<void>
37-
onExecutorCreated?: (executor: any) => void
3837
}
3938

4039
export interface SerializableExecutionState {

apps/sim/hooks/use-execution-stream.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ export interface ExecuteStreamOptions {
7676
*/
7777
export function useExecutionStream() {
7878
const abortControllerRef = useRef<AbortController | null>(null)
79+
const currentExecutionRef = useRef<{ workflowId: string; executionId: string | null }>({
80+
workflowId: '',
81+
executionId: null,
82+
})
7983

8084
const execute = useCallback(async (options: ExecuteStreamOptions) => {
8185
const { workflowId, callbacks = {}, ...payload } = options
@@ -89,6 +93,8 @@ export function useExecutionStream() {
8993
const abortController = new AbortController()
9094
abortControllerRef.current = abortController
9195

96+
currentExecutionRef.current = { workflowId, executionId: null }
97+
9298
try {
9399
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
94100
method: 'POST',
@@ -108,6 +114,11 @@ export function useExecutionStream() {
108114
throw new Error('No response body')
109115
}
110116

117+
const executionId = response.headers.get('X-Execution-Id')
118+
if (executionId) {
119+
currentExecutionRef.current.executionId = executionId
120+
}
121+
111122
// Read SSE stream
112123
const reader = response.body.getReader()
113124
const decoder = new TextDecoder()
@@ -215,6 +226,7 @@ export function useExecutionStream() {
215226
throw error
216227
} finally {
217228
abortControllerRef.current = null
229+
currentExecutionRef.current = { workflowId: '', executionId: null }
218230
}
219231
}, [])
220232

@@ -223,6 +235,17 @@ export function useExecutionStream() {
223235
abortControllerRef.current.abort()
224236
abortControllerRef.current = null
225237
}
238+
239+
const { workflowId, executionId } = currentExecutionRef.current
240+
if (workflowId && executionId) {
241+
fetch(`/api/workflows/${workflowId}/execute/cancel`, {
242+
method: 'POST',
243+
headers: { 'Content-Type': 'application/json' },
244+
body: JSON.stringify({ executionId }),
245+
}).catch(() => {})
246+
}
247+
248+
currentExecutionRef.current = { workflowId: '', executionId: null }
226249
}, [])
227250

228251
return {
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { getCancellationAdapter } from './storage'
2+
3+
export async function requestCancellation(executionId: string): Promise<boolean> {
4+
return getCancellationAdapter().requestCancellation(executionId)
5+
}
6+
7+
export async function isCancellationRequested(executionId: string): Promise<boolean> {
8+
return getCancellationAdapter().isCancellationRequested(executionId)
9+
}
10+
11+
export async function clearCancellation(executionId: string): Promise<void> {
12+
return getCancellationAdapter().clearCancellation(executionId)
13+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export interface CancellationStorageAdapter {
2+
requestCancellation(executionId: string): Promise<boolean>
3+
isCancellationRequested(executionId: string): Promise<boolean>
4+
clearCancellation(executionId: string): Promise<void>
5+
dispose(): void
6+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { getRedisClient } from '@/lib/core/config/redis'
2+
import { createLogger } from '@/lib/logs/console/logger'
3+
import type { CancellationStorageAdapter } from './adapter'
4+
import { MemoryCancellationStore } from './memory-store'
5+
import { RedisCancellationStore } from './redis-store'
6+
7+
const logger = createLogger('CancellationStorage')
8+
9+
let cachedAdapter: CancellationStorageAdapter | null = null
10+
11+
export function getCancellationAdapter(): CancellationStorageAdapter {
12+
if (cachedAdapter) {
13+
return cachedAdapter
14+
}
15+
16+
const redis = getRedisClient()
17+
18+
if (redis) {
19+
logger.info('Cancellation storage: Using Redis')
20+
cachedAdapter = new RedisCancellationStore(redis)
21+
} else {
22+
logger.info('Cancellation storage: Using in-memory')
23+
cachedAdapter = new MemoryCancellationStore()
24+
}
25+
26+
return cachedAdapter
27+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export type { CancellationStorageAdapter } from './adapter'
2+
export { getCancellationAdapter } from './factory'
3+
export { MemoryCancellationStore } from './memory-store'
4+
export { RedisCancellationStore } from './redis-store'

0 commit comments

Comments
 (0)