Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/cancel/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { requestCancellation } from '@/lib/execution/cancellation'

const CancelExecutionSchema = z.object({
executionId: z.string().uuid(),
})

export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'

export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
await params

const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}

let body: any = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Avoid using any type. Use unknown instead for better type safety

Suggested change
let body: any = {}
let body: unknown = {}

Context Used: Context from dashboard - TypeScript conventions and type safety (source)

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/workflows/[id]/execute/cancel/route.ts
Line: 21:21

Comment:
**style:** Avoid using `any` type. Use `unknown` instead for better type safety

```suggestion
  let body: unknown = {}
```

**Context Used:** Context from `dashboard` - TypeScript conventions and type safety ([source](https://app.greptile.com/review/custom-context?memory=b4f0be8d-a787-4d5a-9098-a66b1449df25))

How can I resolve this? If you propose a fix, please make it concise.

try {
const text = await req.text()
if (text) {
body = JSON.parse(text)
}
} catch {
return NextResponse.json({ error: 'Invalid request body' }, { status: 400 })
}

const validation = CancelExecutionSchema.safeParse(body)
if (!validation.success) {
return NextResponse.json({ error: 'Invalid request body' }, { status: 400 })
}

const { executionId } = validation.data
const success = await requestCancellation(executionId)
return NextResponse.json({ success })
}
17 changes: 4 additions & 13 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { clearCancellation } from '@/lib/execution/cancellation'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
Expand Down Expand Up @@ -496,7 +497,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}

const encoder = new TextEncoder()
let executorInstance: any = null
let isStreamClosed = false

const stream = new ReadableStream<Uint8Array>({
Expand Down Expand Up @@ -688,9 +688,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
onBlockStart,
onBlockComplete,
onStream,
onExecutorCreated: (executor) => {
executorInstance = executor
},
},
loggingSession,
})
Expand Down Expand Up @@ -757,24 +754,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
})
} finally {
await clearCancellation(executionId)

if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
} catch {
// Stream already closed - nothing to do
// Stream already closed
}
}
}
},
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client aborted SSE stream, cancelling executor`)

if (executorInstance && typeof executorInstance.cancel === 'function') {
executorInstance.cancel()
}
},
})

return new NextResponse(stream, {
Expand Down
16 changes: 14 additions & 2 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isCancellationRequested } from '@/lib/execution/cancellation'
import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
Expand Down Expand Up @@ -33,13 +34,24 @@ export class ExecutionEngine {
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
}

private async checkCancellation(): Promise<boolean> {
if (this.context.isCancelled) return true
const executionId = this.context.executionId
if (!executionId) return false
const cancelled = await isCancellationRequested(executionId)
if (cancelled) {
this.context.isCancelled = true
}
return cancelled
}

async run(triggerBlockId?: string): Promise<ExecutionResult> {
const startTime = Date.now()
try {
this.initializeQueue(triggerBlockId)

while (this.hasWork()) {
if (this.context.isCancelled && this.executing.size === 0) {
if ((await this.checkCancellation()) && this.executing.size === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best place to put this?

break
}
await this.processQueue()
Expand Down Expand Up @@ -234,7 +246,7 @@ export class ExecutionEngine {

private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (this.context.isCancelled) {
if (await this.checkCancellation()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check needs to move into redis

break
}
const nodeId = this.dequeue()
Expand Down
4 changes: 3 additions & 1 deletion apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ export class DAGExecutor {
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)

// Link cancellation flag to context
Object.defineProperty(context, 'isCancelled', {
get: () => this.isCancelled,
set: (value: boolean) => {
this.isCancelled = value
},
enumerable: true,
configurable: true,
})
Expand Down
1 change: 0 additions & 1 deletion apps/sim/executor/execution/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export interface ExecutionCallbacks {
blockType: string,
output: any
) => Promise<void>
onExecutorCreated?: (executor: any) => void
}

export interface SerializableExecutionState {
Expand Down
23 changes: 23 additions & 0 deletions apps/sim/hooks/use-execution-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ export interface ExecuteStreamOptions {
*/
export function useExecutionStream() {
const abortControllerRef = useRef<AbortController | null>(null)
const currentExecutionRef = useRef<{ workflowId: string; executionId: string | null }>({
workflowId: '',
executionId: null,
})

const execute = useCallback(async (options: ExecuteStreamOptions) => {
const { workflowId, callbacks = {}, ...payload } = options
Expand All @@ -89,6 +93,8 @@ export function useExecutionStream() {
const abortController = new AbortController()
abortControllerRef.current = abortController

currentExecutionRef.current = { workflowId, executionId: null }

try {
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
method: 'POST',
Expand All @@ -108,6 +114,11 @@ export function useExecutionStream() {
throw new Error('No response body')
}

const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current.executionId = executionId
}

// Read SSE stream
const reader = response.body.getReader()
const decoder = new TextDecoder()
Expand Down Expand Up @@ -215,6 +226,7 @@ export function useExecutionStream() {
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = { workflowId: '', executionId: null }
}
}, [])

Expand All @@ -223,6 +235,17 @@ export function useExecutionStream() {
abortControllerRef.current.abort()
abortControllerRef.current = null
}

const { workflowId, executionId } = currentExecutionRef.current
if (workflowId && executionId) {
fetch(`/api/workflows/${workflowId}/execute/cancel`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ executionId }),
}).catch(() => {})
Comment on lines +241 to +245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The catch block silently ignores cancellation API errors. Consider logging the error to help with debugging cancellation issues in production.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/hooks/use-execution-stream.ts
Line: 241:245

Comment:
**style:** The catch block silently ignores cancellation API errors. Consider logging the error to help with debugging cancellation issues in production.

<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>

How can I resolve this? If you propose a fix, please make it concise.

}

currentExecutionRef.current = { workflowId: '', executionId: null }
}, [])

return {
Expand Down
50 changes: 50 additions & 0 deletions apps/sim/lib/execution/cancellation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { getRedisClient } from '@/lib/core/config/redis'

const KEY_PREFIX = 'execution:cancel:'
const TTL_SECONDS = 300
const TTL_MS = TTL_SECONDS * 1000

const memoryStore = new Map<string, number>()

export async function requestCancellation(executionId: string): Promise<boolean> {
const redis = getRedisClient()
if (redis) {
try {
await redis.set(`${KEY_PREFIX}${executionId}`, '1', 'EX', TTL_SECONDS)
return true
} catch {
return false
}
}
memoryStore.set(executionId, Date.now() + TTL_MS)
return true
}

export async function isCancellationRequested(executionId: string): Promise<boolean> {
const redis = getRedisClient()
if (redis) {
try {
return (await redis.exists(`${KEY_PREFIX}${executionId}`)) === 1
} catch {
return false
}
}
const expiry = memoryStore.get(executionId)
if (!expiry) return false
if (Date.now() > expiry) {
memoryStore.delete(executionId)
return false
}
return true
}

export async function clearCancellation(executionId: string): Promise<void> {
const redis = getRedisClient()
if (redis) {
try {
await redis.del(`${KEY_PREFIX}${executionId}`)
} catch {}
return
}
memoryStore.delete(executionId)
}
6 changes: 1 addition & 5 deletions apps/sim/lib/workflows/executor/execution-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export async function executeWorkflowCore(
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
metadata
const { onBlockStart, onBlockComplete, onStream, onExecutorCreated } = callbacks
const { onBlockStart, onBlockComplete, onStream } = callbacks

const providedWorkspaceId = metadata.workspaceId
if (!providedWorkspaceId) {
Expand Down Expand Up @@ -349,10 +349,6 @@ export async function executeWorkflowCore(
}
}

if (onExecutorCreated) {
onExecutorCreated(executorInstance)
}

const result = (await executorInstance.execute(
workflowId,
resolvedTriggerBlockId
Expand Down
Loading