Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/cancel/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { requestCancellation } from '@/lib/execution/active-executors'

const CancelExecutionSchema = z.object({
executionId: z.string().uuid(),
})

export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'

export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
await params

const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}

let body: any = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Avoid using any type. Use unknown instead for better type safety

Suggested change
let body: any = {}
let body: unknown = {}

Context Used: Context from dashboard - TypeScript conventions and type safety (source)

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/app/api/workflows/[id]/execute/cancel/route.ts
Line: 21:21

Comment:
**style:** Avoid using `any` type. Use `unknown` instead for better type safety

```suggestion
  let body: unknown = {}
```

**Context Used:** Context from `dashboard` - TypeScript conventions and type safety ([source](https://app.greptile.com/review/custom-context?memory=b4f0be8d-a787-4d5a-9098-a66b1449df25))

How can I resolve this? If you propose a fix, please make it concise.

try {
const text = await req.text()
if (text) {
body = JSON.parse(text)
}
} catch {
return NextResponse.json({ error: 'Invalid request body' }, { status: 400 })
}

const validation = CancelExecutionSchema.safeParse(body)
if (!validation.success) {
return NextResponse.json({ error: 'Invalid request body' }, { status: 400 })
}

const { executionId } = validation.data
const success = await requestCancellation(executionId)
return NextResponse.json({ success })
}
17 changes: 4 additions & 13 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { isTriggerDevEnabled } from '@/lib/core/config/feature-flags'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { clearCancellation } from '@/lib/execution/active-executors'
import { processInputFileFields } from '@/lib/execution/files'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { createLogger } from '@/lib/logs/console/logger'
Expand Down Expand Up @@ -496,7 +497,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}

const encoder = new TextEncoder()
let executorInstance: any = null
let isStreamClosed = false

const stream = new ReadableStream<Uint8Array>({
Expand Down Expand Up @@ -688,9 +688,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
onBlockStart,
onBlockComplete,
onStream,
onExecutorCreated: (executor) => {
executorInstance = executor
},
},
loggingSession,
})
Expand Down Expand Up @@ -757,24 +754,18 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
},
})
} finally {
await clearCancellation(executionId)

if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
} catch {
// Stream already closed - nothing to do
// Stream already closed
}
}
}
},
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client aborted SSE stream, cancelling executor`)

if (executorInstance && typeof executorInstance.cancel === 'function') {
executorInstance.cancel()
}
},
})

return new NextResponse(stream, {
Expand Down
16 changes: 14 additions & 2 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { isCancellationRequested } from '@/lib/execution/active-executors'
import { createLogger } from '@/lib/logs/console/logger'
import { BlockType } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
Expand Down Expand Up @@ -33,13 +34,24 @@ export class ExecutionEngine {
this.allowResumeTriggers = this.context.metadata.resumeFromSnapshot === true
}

private async checkCancellation(): Promise<boolean> {
if (this.context.isCancelled) return true
const executionId = this.context.executionId
if (!executionId) return false
const cancelled = await isCancellationRequested(executionId)
if (cancelled) {
this.context.isCancelled = true
}
return cancelled
}

async run(triggerBlockId?: string): Promise<ExecutionResult> {
const startTime = Date.now()
try {
this.initializeQueue(triggerBlockId)

while (this.hasWork()) {
if (this.context.isCancelled && this.executing.size === 0) {
if ((await this.checkCancellation()) && this.executing.size === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best place to put this?

break
}
await this.processQueue()
Expand Down Expand Up @@ -234,7 +246,7 @@ export class ExecutionEngine {

private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (this.context.isCancelled) {
if (await this.checkCancellation()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check needs to move into redis

break
}
const nodeId = this.dequeue()
Expand Down
4 changes: 3 additions & 1 deletion apps/sim/executor/execution/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ export class DAGExecutor {
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)

// Link cancellation flag to context
Object.defineProperty(context, 'isCancelled', {
get: () => this.isCancelled,
set: (value: boolean) => {
this.isCancelled = value
},
enumerable: true,
configurable: true,
})
Expand Down
1 change: 0 additions & 1 deletion apps/sim/executor/execution/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export interface ExecutionCallbacks {
blockType: string,
output: any
) => Promise<void>
onExecutorCreated?: (executor: any) => void
}

export interface SerializableExecutionState {
Expand Down
23 changes: 23 additions & 0 deletions apps/sim/hooks/use-execution-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ export interface ExecuteStreamOptions {
*/
export function useExecutionStream() {
const abortControllerRef = useRef<AbortController | null>(null)
const currentExecutionRef = useRef<{ workflowId: string; executionId: string | null }>({
workflowId: '',
executionId: null,
})

const execute = useCallback(async (options: ExecuteStreamOptions) => {
const { workflowId, callbacks = {}, ...payload } = options
Expand All @@ -89,6 +93,8 @@ export function useExecutionStream() {
const abortController = new AbortController()
abortControllerRef.current = abortController

currentExecutionRef.current = { workflowId, executionId: null }

try {
const response = await fetch(`/api/workflows/${workflowId}/execute`, {
method: 'POST',
Expand All @@ -108,6 +114,11 @@ export function useExecutionStream() {
throw new Error('No response body')
}

const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current.executionId = executionId
}

// Read SSE stream
const reader = response.body.getReader()
const decoder = new TextDecoder()
Expand Down Expand Up @@ -215,6 +226,7 @@ export function useExecutionStream() {
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = { workflowId: '', executionId: null }
}
}, [])

Expand All @@ -223,6 +235,17 @@ export function useExecutionStream() {
abortControllerRef.current.abort()
abortControllerRef.current = null
}

const { workflowId, executionId } = currentExecutionRef.current
if (workflowId && executionId) {
fetch(`/api/workflows/${workflowId}/execute/cancel`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ executionId }),
}).catch(() => {})
Comment on lines +241 to +245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The catch block silently ignores cancellation API errors. Consider logging the error to help with debugging cancellation issues in production.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/sim/hooks/use-execution-stream.ts
Line: 241:245

Comment:
**style:** The catch block silently ignores cancellation API errors. Consider logging the error to help with debugging cancellation issues in production.

<sub>Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!</sub>

How can I resolve this? If you propose a fix, please make it concise.

}

currentExecutionRef.current = { workflowId: '', executionId: null }
}, [])

return {
Expand Down
13 changes: 13 additions & 0 deletions apps/sim/lib/execution/active-executors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { getCancellationAdapter } from './storage'

export async function requestCancellation(executionId: string): Promise<boolean> {
return getCancellationAdapter().requestCancellation(executionId)
}

export async function isCancellationRequested(executionId: string): Promise<boolean> {
return getCancellationAdapter().isCancellationRequested(executionId)
}

export async function clearCancellation(executionId: string): Promise<void> {
return getCancellationAdapter().clearCancellation(executionId)
}
6 changes: 6 additions & 0 deletions apps/sim/lib/execution/storage/adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface CancellationStorageAdapter {
requestCancellation(executionId: string): Promise<boolean>
isCancellationRequested(executionId: string): Promise<boolean>
clearCancellation(executionId: string): Promise<void>
dispose(): void
}
27 changes: 27 additions & 0 deletions apps/sim/lib/execution/storage/factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { getRedisClient } from '@/lib/core/config/redis'
import { createLogger } from '@/lib/logs/console/logger'
import type { CancellationStorageAdapter } from './adapter'
import { MemoryCancellationStore } from './memory-store'
import { RedisCancellationStore } from './redis-store'

const logger = createLogger('CancellationStorage')

let cachedAdapter: CancellationStorageAdapter | null = null

export function getCancellationAdapter(): CancellationStorageAdapter {
if (cachedAdapter) {
return cachedAdapter
}

const redis = getRedisClient()

if (redis) {
logger.info('Cancellation storage: Using Redis')
cachedAdapter = new RedisCancellationStore(redis)
} else {
logger.info('Cancellation storage: Using in-memory')
cachedAdapter = new MemoryCancellationStore()
}

return cachedAdapter
}
4 changes: 4 additions & 0 deletions apps/sim/lib/execution/storage/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export type { CancellationStorageAdapter } from './adapter'
export { getCancellationAdapter } from './factory'
export { MemoryCancellationStore } from './memory-store'
export { RedisCancellationStore } from './redis-store'
30 changes: 30 additions & 0 deletions apps/sim/lib/execution/storage/memory-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import type { CancellationStorageAdapter } from './adapter'

const TTL_MS = 5 * 60 * 1000

export class MemoryCancellationStore implements CancellationStorageAdapter {
private store = new Map<string, number>()

async requestCancellation(executionId: string): Promise<boolean> {
this.store.set(executionId, Date.now() + TTL_MS)
return true
}

async isCancellationRequested(executionId: string): Promise<boolean> {
const expiry = this.store.get(executionId)
if (!expiry) return false
if (Date.now() > expiry) {
this.store.delete(executionId)
return false
}
return true
}

async clearCancellation(executionId: string): Promise<void> {
this.store.delete(executionId)
}

dispose(): void {
this.store.clear()
}
}
39 changes: 39 additions & 0 deletions apps/sim/lib/execution/storage/redis-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type Redis from 'ioredis'
import type { CancellationStorageAdapter } from './adapter'

const KEY_PREFIX = 'execution:cancel:'
const TTL_SECONDS = 300

export class RedisCancellationStore implements CancellationStorageAdapter {
constructor(private redis: Redis) {}

async requestCancellation(executionId: string): Promise<boolean> {
try {
await this.redis.set(`${KEY_PREFIX}${executionId}`, '1', 'EX', TTL_SECONDS)
return true
} catch {
return false
}
}

async isCancellationRequested(executionId: string): Promise<boolean> {
try {
const result = await this.redis.exists(`${KEY_PREFIX}${executionId}`)
return result === 1
} catch {
return false
}
}

async clearCancellation(executionId: string): Promise<void> {
try {
await this.redis.del(`${KEY_PREFIX}${executionId}`)
} catch {
// Ignore cleanup errors
}
}

dispose(): void {
// Redis client managed externally
}
}
6 changes: 1 addition & 5 deletions apps/sim/lib/workflows/executor/execution-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export async function executeWorkflowCore(
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
metadata
const { onBlockStart, onBlockComplete, onStream, onExecutorCreated } = callbacks
const { onBlockStart, onBlockComplete, onStream } = callbacks

const providedWorkspaceId = metadata.workspaceId
if (!providedWorkspaceId) {
Expand Down Expand Up @@ -349,10 +349,6 @@ export async function executeWorkflowCore(
}
}

if (onExecutorCreated) {
onExecutorCreated(executorInstance)
}

const result = (await executorInstance.execute(
workflowId,
resolvedTriggerBlockId
Expand Down
Loading