Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
Expand Down Expand Up @@ -767,10 +768,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
cancel() {
isStreamClosed = true
logger.info(
`[${requestId}] Client aborted SSE stream, signalling cancellation via AbortController`
)
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { createLogger } from '@/lib/logs/console/logger'

const logger = createLogger('CancelExecutionAPI')

export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'

export async function POST(
req: NextRequest,
{ params }: { params: Promise<{ id: string; executionId: string }> }
) {
const { id: workflowId, executionId } = await params

try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}

logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId })

const marked = await markExecutionCancelled(executionId)

if (marked) {
logger.info('Execution marked as cancelled in Redis', { executionId })
} else {
logger.info('Redis not available, cancellation will rely on connection close', {
executionId,
})
}

return NextResponse.json({
success: true,
executionId,
redisAvailable: marked,
})
} catch (error: any) {
logger.error('Failed to cancel execution', { workflowId, executionId, error: error.message })
return NextResponse.json(
{ error: error.message || 'Failed to cancel execution' },
{ status: 500 }
)
}
}
42 changes: 38 additions & 4 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
Expand All @@ -23,6 +24,10 @@ export class ExecutionEngine {
private finalOutput: NormalizedBlockOutput = {}
private pausedBlocks: Map<string, PauseMetadata> = new Map()
private allowResumeTriggers: boolean
private cancelledFlag = false
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
private readonly CANCELLATION_CHECK_INTERVAL_MS = 500

constructor(
private context: ExecutionContext,
Expand All @@ -31,6 +36,35 @@ export class ExecutionEngine {
private nodeOrchestrator: NodeExecutionOrchestrator
) {
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
this.useRedisCancellation = isRedisCancellationEnabled() && !!this.context.executionId
}

private async checkCancellation(): Promise<boolean> {
if (this.cancelledFlag) {
return true
}

if (this.useRedisCancellation) {
const now = Date.now()
if (now - this.lastCancellationCheck < this.CANCELLATION_CHECK_INTERVAL_MS) {
return false
}
this.lastCancellationCheck = now

const cancelled = await isExecutionCancelled(this.context.executionId!)
if (cancelled) {
this.cancelledFlag = true
logger.info('Execution cancelled via Redis', { executionId: this.context.executionId })
}
return cancelled
}

if (this.context.abortSignal?.aborted) {
this.cancelledFlag = true
return true
}

return false
}

async run(triggerBlockId?: string): Promise<ExecutionResult> {
Expand All @@ -39,7 +73,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)

while (this.hasWork()) {
if (this.context.abortSignal?.aborted && this.executing.size === 0) {
if ((await this.checkCancellation()) && this.executing.size === 0) {
break
}
await this.processQueue()
Expand All @@ -54,7 +88,7 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime

if (this.context.abortSignal?.aborted) {
if (this.cancelledFlag) {
return {
success: false,
output: this.finalOutput,
Expand All @@ -75,7 +109,7 @@ export class ExecutionEngine {
this.context.metadata.endTime = new Date(endTime).toISOString()
this.context.metadata.duration = endTime - startTime

if (this.context.abortSignal?.aborted) {
if (this.cancelledFlag) {
return {
success: false,
output: this.finalOutput,
Expand Down Expand Up @@ -234,7 +268,7 @@ export class ExecutionEngine {

private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (this.context.abortSignal?.aborted) {
if (await this.checkCancellation()) {
break
}
const nodeId = this.dequeue()
Expand Down
60 changes: 46 additions & 14 deletions apps/sim/executor/handlers/wait/wait-handler.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,61 @@
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
import { BlockType } from '@/executor/constants'
import type { BlockHandler, ExecutionContext } from '@/executor/types'
import type { SerializedBlock } from '@/serializer/types'

/**
* Helper function to sleep for a specified number of milliseconds with AbortSignal support.
* The sleep will be cancelled immediately when the AbortSignal is aborted.
*/
const sleep = async (ms: number, signal?: AbortSignal): Promise<boolean> => {
if (signal?.aborted) {
const CANCELLATION_CHECK_INTERVAL_MS = 500

interface SleepOptions {
signal?: AbortSignal
executionId?: string
}

const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> => {
const { signal, executionId } = options
const useRedis = isRedisCancellationEnabled() && !!executionId

if (!useRedis && signal?.aborted) {
return false
}

return new Promise((resolve) => {
let timeoutId: NodeJS.Timeout | undefined
let mainTimeoutId: NodeJS.Timeout | undefined
let checkIntervalId: NodeJS.Timeout | undefined
let resolved = false

const cleanup = () => {
if (mainTimeoutId) clearTimeout(mainTimeoutId)
if (checkIntervalId) clearInterval(checkIntervalId)
if (!useRedis && signal) signal.removeEventListener('abort', onAbort)
}

const onAbort = () => {
if (timeoutId) clearTimeout(timeoutId)
if (resolved) return
resolved = true
cleanup()
resolve(false)
}

if (signal) {
if (useRedis) {
checkIntervalId = setInterval(async () => {
if (resolved) return
try {
const cancelled = await isExecutionCancelled(executionId!)
if (cancelled) {
resolved = true
cleanup()
resolve(false)
}
} catch {}
}, CANCELLATION_CHECK_INTERVAL_MS)
} else if (signal) {
signal.addEventListener('abort', onAbort, { once: true })
}

timeoutId = setTimeout(() => {
if (signal) {
signal.removeEventListener('abort', onAbort)
}
mainTimeoutId = setTimeout(() => {
if (resolved) return
resolved = true
cleanup()
resolve(true)
}, ms)
})
Expand Down Expand Up @@ -63,7 +92,10 @@ export class WaitBlockHandler implements BlockHandler {
throw new Error(`Wait time exceeds maximum of ${maxDisplay}`)
}

const completed = await sleep(waitMs, ctx.abortSignal)
const completed = await sleep(waitMs, {
signal: ctx.abortSignal,
executionId: ctx.executionId,
})

if (!completed) {
return {
Expand Down
10 changes: 9 additions & 1 deletion apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { generateRequestId } from '@/lib/core/utils/request'
import { isExecutionCancelled, isRedisCancellationEnabled } from '@/lib/execution/cancellation'
import { executeInIsolatedVM } from '@/lib/execution/isolated-vm'
import { createLogger } from '@/lib/logs/console/logger'
import { buildLoopIndexCondition, DEFAULTS, EDGE } from '@/executor/constants'
Expand Down Expand Up @@ -229,7 +230,14 @@ export class LoopOrchestrator {
}
}

if (ctx.abortSignal?.aborted) {
const useRedis = isRedisCancellationEnabled() && !!ctx.executionId
let isCancelled = false
if (useRedis) {
isCancelled = await isExecutionCancelled(ctx.executionId!)
} else {
isCancelled = ctx.abortSignal?.aborted ?? false
}
if (isCancelled) {
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
return this.createExitResult(ctx, loopId, scope)
}
Expand Down
16 changes: 16 additions & 0 deletions apps/sim/hooks/use-execution-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export interface ExecuteStreamOptions {
*/
export function useExecutionStream() {
const abortControllerRef = useRef<AbortController | null>(null)
const currentExecutionRef = useRef<{ workflowId: string; executionId: string } | null>(null)

const execute = useCallback(async (options: ExecuteStreamOptions) => {
const { workflowId, callbacks = {}, ...payload } = options
Expand All @@ -88,6 +89,7 @@ export function useExecutionStream() {
// Create new abort controller
const abortController = new AbortController()
abortControllerRef.current = abortController
currentExecutionRef.current = null

try {
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
Expand All @@ -108,6 +110,11 @@ export function useExecutionStream() {
throw new Error('No response body')
}

const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current = { workflowId, executionId }
}

// Read SSE stream
const reader = response.body.getReader()
const decoder = new TextDecoder()
Expand Down Expand Up @@ -215,14 +222,23 @@ export function useExecutionStream() {
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = null
}
}, [])

const cancel = useCallback(() => {
const execution = currentExecutionRef.current
if (execution) {
fetch(`/api/workflows/${execution.workflowId}/executions/${execution.executionId}/cancel`, {
method: 'POST',
}).catch(() => {})
}

if (abortControllerRef.current) {
abortControllerRef.current.abort()
abortControllerRef.current = null
}
currentExecutionRef.current = null
}, [])

return {
Expand Down
66 changes: 66 additions & 0 deletions apps/sim/lib/execution/cancellation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { getRedisClient } from '@/lib/core/config/redis'
import { createLogger } from '@/lib/logs/console/logger'

const logger = createLogger('ExecutionCancellation')

const EXECUTION_CANCEL_PREFIX = 'execution:cancel:'
const EXECUTION_CANCEL_EXPIRY = 60 * 60

export function isRedisCancellationEnabled(): boolean {
return getRedisClient() !== null
}

/**
* Mark an execution as cancelled in Redis.
* Returns true if Redis is available and the flag was set, false otherwise.
*/
export async function markExecutionCancelled(executionId: string): Promise<boolean> {
const redis = getRedisClient()
if (!redis) {
return false
}

try {
await redis.set(`${EXECUTION_CANCEL_PREFIX}${executionId}`, '1', 'EX', EXECUTION_CANCEL_EXPIRY)
logger.info('Marked execution as cancelled', { executionId })
return true
} catch (error) {
logger.error('Failed to mark execution as cancelled', { executionId, error })
return false
}
}

/**
* Check if an execution has been cancelled via Redis.
* Returns false if Redis is not available (fallback to local abort signal).
*/
export async function isExecutionCancelled(executionId: string): Promise<boolean> {
const redis = getRedisClient()
if (!redis) {
return false
}

try {
const result = await redis.exists(`${EXECUTION_CANCEL_PREFIX}${executionId}`)
return result === 1
} catch (error) {
logger.error('Failed to check execution cancellation', { executionId, error })
return false
}
}

/**
* Clear the cancellation flag for an execution.
*/
export async function clearExecutionCancellation(executionId: string): Promise<void> {
const redis = getRedisClient()
if (!redis) {
return
}

try {
await redis.del(`${EXECUTION_CANCEL_PREFIX}${executionId}`)
} catch (error) {
logger.error('Failed to clear execution cancellation', { executionId, error })
}
}
Loading