Skip to content

Commit dc3de95

Browse files
fix(logging): hitl + trigger dev crash protection (#2664)
* hitl gaps * deal with trigger worker crashes * cleanup import strcuture
1 parent 79be435 commit dc3de95

File tree

15 files changed

+602
-311
lines changed

15 files changed

+602
-311
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { db } from '@sim/db'
2+
import { workflowExecutionLogs } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { and, eq, lt, sql } from 'drizzle-orm'
5+
import { type NextRequest, NextResponse } from 'next/server'
6+
import { verifyCronAuth } from '@/lib/auth/internal'
7+
8+
const logger = createLogger('CleanupStaleExecutions')
9+
10+
const STALE_THRESHOLD_MINUTES = 30
11+
12+
export async function GET(request: NextRequest) {
13+
try {
14+
const authError = verifyCronAuth(request, 'Stale execution cleanup')
15+
if (authError) {
16+
return authError
17+
}
18+
19+
logger.info('Starting stale execution cleanup job')
20+
21+
const staleThreshold = new Date(Date.now() - STALE_THRESHOLD_MINUTES * 60 * 1000)
22+
23+
const staleExecutions = await db
24+
.select({
25+
id: workflowExecutionLogs.id,
26+
executionId: workflowExecutionLogs.executionId,
27+
workflowId: workflowExecutionLogs.workflowId,
28+
startedAt: workflowExecutionLogs.startedAt,
29+
})
30+
.from(workflowExecutionLogs)
31+
.where(
32+
and(
33+
eq(workflowExecutionLogs.status, 'running'),
34+
lt(workflowExecutionLogs.startedAt, staleThreshold)
35+
)
36+
)
37+
.limit(100)
38+
39+
logger.info(`Found ${staleExecutions.length} stale executions to clean up`)
40+
41+
let cleaned = 0
42+
let failed = 0
43+
44+
for (const execution of staleExecutions) {
45+
try {
46+
const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime()
47+
const staleDurationMinutes = Math.round(staleDurationMs / 60000)
48+
49+
await db
50+
.update(workflowExecutionLogs)
51+
.set({
52+
status: 'failed',
53+
endedAt: new Date(),
54+
totalDurationMs: staleDurationMs,
55+
executionData: sql`jsonb_set(
56+
COALESCE(execution_data, '{}'::jsonb),
57+
ARRAY['error'],
58+
to_jsonb(${`Execution terminated: worker timeout or crash after ${staleDurationMinutes} minutes`}::text)
59+
)`,
60+
})
61+
.where(eq(workflowExecutionLogs.id, execution.id))
62+
63+
logger.info(`Cleaned up stale execution ${execution.executionId}`, {
64+
workflowId: execution.workflowId,
65+
staleDurationMinutes,
66+
})
67+
68+
cleaned++
69+
} catch (error) {
70+
logger.error(`Failed to clean up execution ${execution.executionId}:`, {
71+
error: error instanceof Error ? error.message : String(error),
72+
})
73+
failed++
74+
}
75+
}
76+
77+
logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`)
78+
79+
return NextResponse.json({
80+
success: true,
81+
found: staleExecutions.length,
82+
cleaned,
83+
failed,
84+
thresholdMinutes: STALE_THRESHOLD_MINUTES,
85+
})
86+
} catch (error) {
87+
logger.error('Error in stale execution cleanup job:', error)
88+
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
89+
}
90+
}

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ import { createStreamingResponse } from '@/lib/workflows/streaming/streaming'
2323
import { createHttpResponseFromBlock, workflowHasResponseBlock } from '@/lib/workflows/utils'
2424
import type { WorkflowExecutionPayload } from '@/background/workflow-execution'
2525
import { normalizeName } from '@/executor/constants'
26-
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
26+
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
27+
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
2728
import type { StreamingExecution } from '@/executor/types'
2829
import { Serializer } from '@/serializer'
2930
import { CORE_TRIGGER_TYPES } from '@/stores/logs/filters/types'
30-
import type { SubflowType } from '@/stores/workflows/workflow/types'
3131

3232
const logger = createLogger('WorkflowExecuteAPI')
3333

@@ -541,11 +541,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
541541
blockId: string,
542542
blockName: string,
543543
blockType: string,
544-
iterationContext?: {
545-
iterationCurrent: number
546-
iterationTotal: number
547-
iterationType: SubflowType
548-
}
544+
iterationContext?: IterationContext
549545
) => {
550546
logger.info(`[${requestId}] 🔷 onBlockStart called:`, { blockId, blockName, blockType })
551547
sendEvent({
@@ -571,11 +567,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
571567
blockName: string,
572568
blockType: string,
573569
callbackData: any,
574-
iterationContext?: {
575-
iterationCurrent: number
576-
iterationTotal: number
577-
iterationType: SubflowType
578-
}
570+
iterationContext?: IterationContext
579571
) => {
580572
const hasError = callbackData.output?.error
581573

@@ -713,14 +705,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
713705
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
714706
executionId,
715707
})
708+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
716709
} else {
717-
await PauseResumeManager.persistPauseResult({
718-
workflowId,
719-
executionId,
720-
pausePoints: result.pausePoints || [],
721-
snapshotSeed: result.snapshotSeed,
722-
executorUserId: result.metadata?.userId,
723-
})
710+
try {
711+
await PauseResumeManager.persistPauseResult({
712+
workflowId,
713+
executionId,
714+
pausePoints: result.pausePoints || [],
715+
snapshotSeed: result.snapshotSeed,
716+
executorUserId: result.metadata?.userId,
717+
})
718+
} catch (pauseError) {
719+
logger.error(`[${requestId}] Failed to persist pause result`, {
720+
executionId,
721+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
722+
})
723+
await loggingSession.markAsFailed(
724+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
725+
)
726+
}
724727
}
725728
} else {
726729
await PauseResumeManager.processQueuedResumes(executionId)

apps/sim/background/schedule-execution.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import {
2323
getSubBlockValue,
2424
} from '@/lib/workflows/schedules/utils'
2525
import { REFERENCE } from '@/executor/constants'
26-
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
26+
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
27+
import type { ExecutionMetadata } from '@/executor/execution/types'
2728
import type { ExecutionResult } from '@/executor/types'
2829
import { createEnvVarPattern } from '@/executor/utils/reference-validation'
2930
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -285,14 +286,25 @@ async function runWorkflowExecution({
285286
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
286287
executionId,
287288
})
289+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
288290
} else {
289-
await PauseResumeManager.persistPauseResult({
290-
workflowId: payload.workflowId,
291-
executionId,
292-
pausePoints: executionResult.pausePoints || [],
293-
snapshotSeed: executionResult.snapshotSeed,
294-
executorUserId: executionResult.metadata?.userId,
295-
})
291+
try {
292+
await PauseResumeManager.persistPauseResult({
293+
workflowId: payload.workflowId,
294+
executionId,
295+
pausePoints: executionResult.pausePoints || [],
296+
snapshotSeed: executionResult.snapshotSeed,
297+
executorUserId: executionResult.metadata?.userId,
298+
})
299+
} catch (pauseError) {
300+
logger.error(`[${requestId}] Failed to persist pause result`, {
301+
executionId,
302+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
303+
})
304+
await loggingSession.markAsFailed(
305+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
306+
)
307+
}
296308
}
297309
} else {
298310
await PauseResumeManager.processQueuedResumes(executionId)

apps/sim/background/webhook-execution.ts

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import {
1717
loadWorkflowFromNormalizedTables,
1818
} from '@/lib/workflows/persistence/utils'
1919
import { getWorkflowById } from '@/lib/workflows/utils'
20-
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
20+
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
21+
import type { ExecutionMetadata } from '@/executor/execution/types'
2122
import type { ExecutionResult } from '@/executor/types'
2223
import { Serializer } from '@/serializer'
2324
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -268,14 +269,25 @@ async function executeWebhookJobInternal(
268269
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
269270
executionId,
270271
})
272+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
271273
} else {
272-
await PauseResumeManager.persistPauseResult({
273-
workflowId: payload.workflowId,
274-
executionId,
275-
pausePoints: executionResult.pausePoints || [],
276-
snapshotSeed: executionResult.snapshotSeed,
277-
executorUserId: executionResult.metadata?.userId,
278-
})
274+
try {
275+
await PauseResumeManager.persistPauseResult({
276+
workflowId: payload.workflowId,
277+
executionId,
278+
pausePoints: executionResult.pausePoints || [],
279+
snapshotSeed: executionResult.snapshotSeed,
280+
executorUserId: executionResult.metadata?.userId,
281+
})
282+
} catch (pauseError) {
283+
logger.error(`[${requestId}] Failed to persist pause result`, {
284+
executionId,
285+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
286+
})
287+
await loggingSession.markAsFailed(
288+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
289+
)
290+
}
279291
}
280292
} else {
281293
await PauseResumeManager.processQueuedResumes(executionId)
@@ -509,14 +521,25 @@ async function executeWebhookJobInternal(
509521
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
510522
executionId,
511523
})
524+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
512525
} else {
513-
await PauseResumeManager.persistPauseResult({
514-
workflowId: payload.workflowId,
515-
executionId,
516-
pausePoints: executionResult.pausePoints || [],
517-
snapshotSeed: executionResult.snapshotSeed,
518-
executorUserId: executionResult.metadata?.userId,
519-
})
526+
try {
527+
await PauseResumeManager.persistPauseResult({
528+
workflowId: payload.workflowId,
529+
executionId,
530+
pausePoints: executionResult.pausePoints || [],
531+
snapshotSeed: executionResult.snapshotSeed,
532+
executorUserId: executionResult.metadata?.userId,
533+
})
534+
} catch (pauseError) {
535+
logger.error(`[${requestId}] Failed to persist pause result`, {
536+
executionId,
537+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
538+
})
539+
await loggingSession.markAsFailed(
540+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
541+
)
542+
}
520543
}
521544
} else {
522545
await PauseResumeManager.processQueuedResumes(executionId)

apps/sim/background/workflow-execution.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
77
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
88
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
99
import { getWorkflowById } from '@/lib/workflows/utils'
10-
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
10+
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
11+
import type { ExecutionMetadata } from '@/executor/execution/types'
1112
import type { ExecutionResult } from '@/executor/types'
1213

1314
const logger = createLogger('TriggerWorkflowExecution')
@@ -112,14 +113,25 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
112113
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
113114
executionId,
114115
})
116+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
115117
} else {
116-
await PauseResumeManager.persistPauseResult({
117-
workflowId,
118-
executionId,
119-
pausePoints: result.pausePoints || [],
120-
snapshotSeed: result.snapshotSeed,
121-
executorUserId: result.metadata?.userId,
122-
})
118+
try {
119+
await PauseResumeManager.persistPauseResult({
120+
workflowId,
121+
executionId,
122+
pausePoints: result.pausePoints || [],
123+
snapshotSeed: result.snapshotSeed,
124+
executorUserId: result.metadata?.userId,
125+
})
126+
} catch (pauseError) {
127+
logger.error(`[${requestId}] Failed to persist pause result`, {
128+
executionId,
129+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
130+
})
131+
await loggingSession.markAsFailed(
132+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
133+
)
134+
}
123135
}
124136
} else {
125137
await PauseResumeManager.processQueuedResumes(executionId)

apps/sim/executor/execution/snapshot-serializer.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
import type { DAG } from '@/executor/dag/builder'
2-
import {
3-
type ExecutionMetadata,
4-
ExecutionSnapshot,
5-
type SerializableExecutionState,
6-
} from '@/executor/execution/snapshot'
2+
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
3+
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
74
import type { ExecutionContext, SerializedSnapshot } from '@/executor/types'
85

96
function mapFromEntries<T>(map?: Map<string, T>): Record<string, T> | undefined {

apps/sim/executor/execution/snapshot.ts

Lines changed: 1 addition & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,4 @@
1-
import type { Edge } from 'reactflow'
2-
import type { BlockLog, BlockState } from '@/executor/types'
3-
4-
export interface ExecutionMetadata {
5-
requestId: string
6-
executionId: string
7-
workflowId: string
8-
workspaceId: string
9-
userId: string
10-
sessionUserId?: string
11-
workflowUserId?: string
12-
triggerType: string
13-
triggerBlockId?: string
14-
useDraftState: boolean
15-
startTime: string
16-
isClientSession?: boolean
17-
pendingBlocks?: string[]
18-
resumeFromSnapshot?: boolean
19-
workflowStateOverride?: {
20-
blocks: Record<string, any>
21-
edges: Edge[]
22-
loops?: Record<string, any>
23-
parallels?: Record<string, any>
24-
deploymentVersionId?: string // ID of deployment version if this is deployed state
25-
}
26-
}
27-
28-
export interface ExecutionCallbacks {
29-
onStream?: (streamingExec: any) => Promise<void>
30-
onBlockStart?: (blockId: string, blockName: string, blockType: string) => Promise<void>
31-
onBlockComplete?: (
32-
blockId: string,
33-
blockName: string,
34-
blockType: string,
35-
output: any
36-
) => Promise<void>
37-
}
38-
39-
export interface SerializableExecutionState {
40-
blockStates: Record<string, BlockState>
41-
executedBlocks: string[]
42-
blockLogs: BlockLog[]
43-
decisions: {
44-
router: Record<string, string>
45-
condition: Record<string, string>
46-
}
47-
completedLoops: string[]
48-
loopExecutions?: Record<string, any>
49-
parallelExecutions?: Record<string, any>
50-
parallelBlockMapping?: Record<string, any>
51-
activeExecutionPath: string[]
52-
pendingQueue?: string[]
53-
remainingEdges?: Edge[]
54-
dagIncomingEdges?: Record<string, string[]>
55-
completedPauseContexts?: string[]
56-
}
1+
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
572

583
export class ExecutionSnapshot {
594
constructor(

0 commit comments

Comments
 (0)