Skip to content

Commit 742d59f

Browse files
Sg312waleedlatif1icecrasher321
authored
feat(hitl): add human in the loop block (#1832)
* fix(billing): should allow restoring subscription (#1728) * fix(already-cancelled-sub): UI should allow restoring subscription * restore functionality fixed * fix * Add pause resume block * Add db schema * Initial test passes * Tests pass * Execution pauses * Snapshot serializer * Ui checkpoint * Works 1 * Pause resume simple v1 * Hitl block works in parallel branches without timing overlap * Pending status to logs * Pause resume ui link * Big context consolidation * HITL works in loops * Fix parallels * Reference blocks properly * Fix tag dropdown and start block resolution * Filter console logs for hitl block * Fix notifs * Fix logs page * Fix logs page again * Fix * Checkpoint * Cleanup v1 * Refactor v2 * Refactor v3 * Refactor v4 * Refactor v5 * Resume page * Fix variables in loops * Fix var res bugs * Ui changes * Approval block * Hitl works e2e v1 * Fix tets * Row level lock --------- Co-authored-by: Waleed <[email protected]> Co-authored-by: Vikhyath Mondreti <[email protected]>
1 parent f9ce65e commit 742d59f

File tree

90 files changed

+13491
-1121
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+13491
-1121
lines changed

apps/sim/app/api/logs/route.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { db } from '@sim/db'
2-
import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
2+
import { pausedExecutions, permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
33
import { and, desc, eq, gte, inArray, lte, type SQL, sql } from 'drizzle-orm'
44
import { type NextRequest, NextResponse } from 'next/server'
55
import { z } from 'zod'
@@ -68,6 +68,9 @@ export async function GET(request: NextRequest) {
6868
workflowWorkspaceId: workflow.workspaceId,
6969
workflowCreatedAt: workflow.createdAt,
7070
workflowUpdatedAt: workflow.updatedAt,
71+
pausedStatus: pausedExecutions.status,
72+
pausedTotalPauseCount: pausedExecutions.totalPauseCount,
73+
pausedResumedCount: pausedExecutions.resumedCount,
7174
}
7275
: {
7376
// Basic mode - exclude large fields for better performance
@@ -92,11 +95,18 @@ export async function GET(request: NextRequest) {
9295
workflowWorkspaceId: workflow.workspaceId,
9396
workflowCreatedAt: workflow.createdAt,
9497
workflowUpdatedAt: workflow.updatedAt,
98+
pausedStatus: pausedExecutions.status,
99+
pausedTotalPauseCount: pausedExecutions.totalPauseCount,
100+
pausedResumedCount: pausedExecutions.resumedCount,
95101
}
96102

97103
const baseQuery = db
98104
.select(selectColumns)
99105
.from(workflowExecutionLogs)
106+
.leftJoin(
107+
pausedExecutions,
108+
eq(pausedExecutions.executionId, workflowExecutionLogs.executionId)
109+
)
100110
.innerJoin(
101111
workflow,
102112
and(
@@ -186,6 +196,10 @@ export async function GET(request: NextRequest) {
186196
const countQuery = db
187197
.select({ count: sql<number>`count(*)` })
188198
.from(workflowExecutionLogs)
199+
.leftJoin(
200+
pausedExecutions,
201+
eq(pausedExecutions.executionId, workflowExecutionLogs.executionId)
202+
)
189203
.innerJoin(
190204
workflow,
191205
and(
@@ -340,13 +354,18 @@ export async function GET(request: NextRequest) {
340354
return {
341355
id: log.id,
342356
workflowId: log.workflowId,
343-
executionId: params.details === 'full' ? log.executionId : undefined,
357+
executionId: log.executionId,
344358
level: log.level,
345359
duration: log.totalDurationMs ? `${log.totalDurationMs}ms` : null,
346360
trigger: log.trigger,
347361
createdAt: log.startedAt.toISOString(),
348362
files: params.details === 'full' ? log.files || undefined : undefined,
349363
workflow: workflowSummary,
364+
pauseSummary: {
365+
status: log.pausedStatus ?? null,
366+
total: log.pausedTotalPauseCount ?? 0,
367+
resumed: log.pausedResumedCount ?? 0,
368+
},
350369
executionData:
351370
params.details === 'full'
352371
? {
@@ -361,6 +380,10 @@ export async function GET(request: NextRequest) {
361380
params.details === 'full'
362381
? (costSummary as any)
363382
: { total: (costSummary as any)?.total || 0 },
383+
hasPendingPause:
384+
(Number(log.pausedTotalPauseCount ?? 0) > 0 &&
385+
Number(log.pausedResumedCount ?? 0) < Number(log.pausedTotalPauseCount ?? 0)) ||
386+
(log.pausedStatus && log.pausedStatus !== 'fully_resumed'),
364387
}
365388
})
366389
return NextResponse.json(
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { type NextRequest, NextResponse } from 'next/server'
2+
import { createLogger } from '@/lib/logs/console/logger'
3+
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
4+
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
5+
6+
const logger = createLogger('WorkflowResumeAPI')
7+
8+
export const runtime = 'nodejs'
9+
export const dynamic = 'force-dynamic'
10+
11+
export async function POST(
12+
request: NextRequest,
13+
{
14+
params,
15+
}: {
16+
params: Promise<{ workflowId: string; executionId: string; contextId: string }>
17+
}
18+
) {
19+
const { workflowId, executionId, contextId } = await params
20+
21+
const access = await validateWorkflowAccess(request, workflowId, false)
22+
if (access.error) {
23+
return NextResponse.json({ error: access.error.message }, { status: access.error.status })
24+
}
25+
26+
const workflow = access.workflow!
27+
28+
let payload: any = {}
29+
try {
30+
payload = await request.json()
31+
} catch {
32+
payload = {}
33+
}
34+
35+
const resumeInput = payload?.input ?? payload ?? {}
36+
const userId = workflow.userId ?? ''
37+
38+
try {
39+
const enqueueResult = await PauseResumeManager.enqueueOrStartResume({
40+
executionId,
41+
contextId,
42+
resumeInput,
43+
userId,
44+
})
45+
46+
if (enqueueResult.status === 'queued') {
47+
return NextResponse.json({
48+
status: 'queued',
49+
executionId: enqueueResult.resumeExecutionId,
50+
queuePosition: enqueueResult.queuePosition,
51+
message: 'Resume queued. It will run after current resumes finish.',
52+
})
53+
}
54+
55+
PauseResumeManager.startResumeExecution({
56+
resumeEntryId: enqueueResult.resumeEntryId,
57+
resumeExecutionId: enqueueResult.resumeExecutionId,
58+
pausedExecution: enqueueResult.pausedExecution,
59+
contextId: enqueueResult.contextId,
60+
resumeInput: enqueueResult.resumeInput,
61+
userId: enqueueResult.userId,
62+
}).catch((error) => {
63+
logger.error('Failed to start resume execution', {
64+
workflowId,
65+
parentExecutionId: executionId,
66+
resumeExecutionId: enqueueResult.resumeExecutionId,
67+
error,
68+
})
69+
})
70+
71+
return NextResponse.json({
72+
status: 'started',
73+
executionId: enqueueResult.resumeExecutionId,
74+
message: 'Resume execution started.',
75+
})
76+
} catch (error: any) {
77+
logger.error('Resume request failed', {
78+
workflowId,
79+
executionId,
80+
contextId,
81+
error,
82+
})
83+
return NextResponse.json(
84+
{ error: error.message || 'Failed to queue resume request' },
85+
{ status: 400 }
86+
)
87+
}
88+
}
89+
90+
export async function GET(
91+
request: NextRequest,
92+
{
93+
params,
94+
}: {
95+
params: Promise<{ workflowId: string; executionId: string; contextId: string }>
96+
}
97+
) {
98+
const { workflowId, executionId, contextId } = await params
99+
100+
const access = await validateWorkflowAccess(request, workflowId, false)
101+
if (access.error) {
102+
return NextResponse.json({ error: access.error.message }, { status: access.error.status })
103+
}
104+
105+
const detail = await PauseResumeManager.getPauseContextDetail({
106+
workflowId,
107+
executionId,
108+
contextId,
109+
})
110+
111+
if (!detail) {
112+
return NextResponse.json({ error: 'Pause context not found' }, { status: 404 })
113+
}
114+
115+
return NextResponse.json(detail)
116+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { type NextRequest, NextResponse } from 'next/server'
2+
import { createLogger } from '@/lib/logs/console/logger'
3+
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
4+
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
5+
6+
const logger = createLogger('WorkflowResumeExecutionAPI')
7+
8+
export const runtime = 'nodejs'
9+
export const dynamic = 'force-dynamic'
10+
11+
export async function GET(
12+
request: NextRequest,
13+
{
14+
params,
15+
}: {
16+
params: Promise<{ workflowId: string; executionId: string }>
17+
}
18+
) {
19+
const { workflowId, executionId } = await params
20+
21+
const access = await validateWorkflowAccess(request, workflowId, false)
22+
if (access.error) {
23+
return NextResponse.json({ error: access.error.message }, { status: access.error.status })
24+
}
25+
26+
try {
27+
const detail = await PauseResumeManager.getPausedExecutionDetail({
28+
workflowId,
29+
executionId,
30+
})
31+
32+
if (!detail) {
33+
return NextResponse.json({ error: 'Paused execution not found' }, { status: 404 })
34+
}
35+
36+
return NextResponse.json(detail)
37+
} catch (error: any) {
38+
logger.error('Failed to load paused execution detail', {
39+
workflowId,
40+
executionId,
41+
error,
42+
})
43+
return NextResponse.json(
44+
{ error: error?.message || 'Failed to load paused execution detail' },
45+
{ status: 500 }
46+
)
47+
}
48+
}

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
} from '@/lib/workflows/db-helpers'
1313
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
1414
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
15+
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
1516
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
1617
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
1718
import type { StreamingExecution } from '@/executor/types'
@@ -135,6 +136,24 @@ export async function executeWorkflow(
135136
loggingSession,
136137
})
137138

139+
if (result.status === 'paused') {
140+
if (!result.snapshotSeed) {
141+
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
142+
executionId,
143+
})
144+
} else {
145+
await PauseResumeManager.persistPauseResult({
146+
workflowId,
147+
executionId,
148+
pausePoints: result.pausePoints || [],
149+
snapshotSeed: result.snapshotSeed,
150+
executorUserId: result.metadata?.userId,
151+
})
152+
}
153+
} else {
154+
await PauseResumeManager.processQueuedResumes(executionId)
155+
}
156+
138157
if (streamConfig?.skipLoggingComplete) {
139158
return {
140159
...result,
@@ -605,6 +624,24 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
605624
loggingSession,
606625
})
607626

627+
if (result.status === 'paused') {
628+
if (!result.snapshotSeed) {
629+
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
630+
executionId,
631+
})
632+
} else {
633+
await PauseResumeManager.persistPauseResult({
634+
workflowId,
635+
executionId,
636+
pausePoints: result.pausePoints || [],
637+
snapshotSeed: result.snapshotSeed,
638+
executorUserId: result.metadata?.userId,
639+
})
640+
}
641+
} else {
642+
await PauseResumeManager.processQueuedResumes(executionId)
643+
}
644+
608645
if (result.error === 'Workflow execution was cancelled') {
609646
logger.info(`[${requestId}] Workflow execution was cancelled`)
610647
sendEvent({
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { type NextRequest, NextResponse } from 'next/server'
2+
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
3+
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
4+
5+
export const runtime = 'nodejs'
6+
export const dynamic = 'force-dynamic'
7+
8+
export async function GET(
9+
request: NextRequest,
10+
{
11+
params,
12+
}: {
13+
params: { id: string; executionId: string }
14+
}
15+
) {
16+
const workflowId = params.id
17+
const executionId = params.executionId
18+
19+
const access = await validateWorkflowAccess(request, workflowId, false)
20+
if (access.error) {
21+
return NextResponse.json({ error: access.error.message }, { status: access.error.status })
22+
}
23+
24+
const detail = await PauseResumeManager.getPausedExecutionDetail({
25+
workflowId,
26+
executionId,
27+
})
28+
29+
if (!detail) {
30+
return NextResponse.json({ error: 'Paused execution not found' }, { status: 404 })
31+
}
32+
33+
return NextResponse.json(detail)
34+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { type NextRequest, NextResponse } from 'next/server'
2+
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
3+
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
4+
5+
export const runtime = 'nodejs'
6+
export const dynamic = 'force-dynamic'
7+
8+
export async function GET(
9+
request: NextRequest,
10+
{
11+
params,
12+
}: {
13+
params: { id: string }
14+
}
15+
) {
16+
const workflowId = params.id
17+
18+
const access = await validateWorkflowAccess(request, workflowId, false)
19+
if (access.error) {
20+
return NextResponse.json({ error: access.error.message }, { status: access.error.status })
21+
}
22+
23+
const statusFilter = request.nextUrl.searchParams.get('status') || undefined
24+
25+
const pausedExecutions = await PauseResumeManager.listPausedExecutions({
26+
workflowId,
27+
status: statusFilter,
28+
})
29+
30+
return NextResponse.json({ pausedExecutions })
31+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { redirect } from 'next/navigation'
2+
3+
export const runtime = 'nodejs'
4+
export const dynamic = 'force-dynamic'
5+
6+
interface PageParams {
7+
workflowId: string
8+
executionId: string
9+
contextId: string
10+
}
11+
12+
export default async function ResumePage({ params }: { params: Promise<PageParams> }) {
13+
const { workflowId, executionId, contextId } = await params
14+
redirect(`/resume/${workflowId}/${executionId}?contextId=${contextId}`)
15+
}

0 commit comments

Comments
 (0)