Skip to content

Commit 79f871c

Browse files
committed
feat(workflow): add checkpoint status and resume support for workflow steps
- Introduce new "checkpoint" agent status for paused workflows waiting for input - Add resume functionality with session tracking for interrupted workflows - Improve handling of chained prompts and queue management - Add skip functionality during workflow execution - Enhance debug logging for workflow state transitions
1 parent 4d6548e commit 79f871c

File tree

8 files changed

+165
-19
lines changed

8 files changed

+165
-19
lines changed

src/cli/tui/routes/workflow/components/timeline/status-utils.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ export function getStatusIcon(status: AgentStatus): string {
2525
return "●" // Filled circle
2626
case "retrying":
2727
return "⟳" // Retry symbol
28+
case "checkpoint":
29+
return "◉" // Waiting for input
30+
case "paused":
31+
return "॥" // Paused
2832
default:
2933
return "?"
3034
}
@@ -44,6 +48,10 @@ export function getStatusColor(status: AgentStatus, theme: Theme): RGBA {
4448
return theme.error // red for failed
4549
case "skipped":
4650
return theme.textMuted // gray/muted for skipped
51+
case "checkpoint":
52+
return theme.warning // yellow for waiting
53+
case "paused":
54+
return theme.warning // yellow for paused
4755
default:
4856
return theme.text // white
4957
}

src/cli/tui/routes/workflow/context/ui-state/actions/workflow-actions.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ export function createWorkflowActions(ctx: WorkflowActionsContext) {
3939
const state = ctx.getState()
4040
ctx.setState({ ...state, inputState })
4141
if (inputState && inputState.active) {
42-
setWorkflowStatus("paused")
42+
// Only show "Paused" for manual pause (no queue), not for chained prompts
43+
const hasQueue = inputState.queuedPrompts && inputState.queuedPrompts.length > 0
44+
if (!hasQueue) {
45+
setWorkflowStatus("paused")
46+
}
47+
// With queue (chained prompts), keep current status
4348
} else {
4449
setWorkflowStatus("running")
4550
}

src/cli/tui/routes/workflow/state/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export type AgentStatus =
88
| "skipped"
99
| "retrying"
1010
| "paused"
11+
| "checkpoint"
1112

1213
export interface AgentTelemetry {
1314
tokensIn: number

src/cli/tui/routes/workflow/workflow-shell.tsx

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -248,15 +248,15 @@ export function WorkflowShell(props: WorkflowShellProps) {
248248
// Prompt box focus state (for inline prompt box)
249249
const [isPromptBoxFocused, setIsPromptBoxFocused] = createSignal(true)
250250

251-
// Check if output window is showing the running agent (not a manually selected one)
251+
// Check if output window is showing the active agent (running or at checkpoint)
252252
const isShowingRunningAgent = createMemo(() => {
253253
const s = state()
254-
const running = s.agents.find((a) => a.status === "running")
255-
if (!running) return false
256-
// If no explicit selection, we're showing the running agent
254+
const active = s.agents.find((a) => a.status === "running" || a.status === "checkpoint")
255+
if (!active) return false
256+
// If no explicit selection, we're showing the active agent
257257
if (!s.selectedAgentId) return true
258-
// If selected agent is the running agent
259-
return s.selectedAgentId === running.id && s.selectedItemType !== "sub"
258+
// If selected agent is the active agent
259+
return s.selectedAgentId === active.id && s.selectedItemType !== "sub"
260260
})
261261

262262
// Auto-focus prompt box when input waiting becomes active

src/workflows/execution/runner.ts

Lines changed: 117 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import * as path from 'node:path';
1111
import * as fs from 'node:fs';
1212

1313
import { debug } from '../../shared/logging/logger.js';
14-
import { formatAgentLog } from '../../shared/logging/index.js';
14+
import { formatUserInput } from '../../shared/formatters/outputMarkers.js';
15+
import { AgentLoggerService, AgentMonitorService } from '../../agents/monitoring/index.js';
1516
import type { ModuleStep, WorkflowTemplate } from '../templates/types.js';
1617
import type { WorkflowEventEmitter } from '../events/index.js';
1718
import {
@@ -32,6 +33,14 @@ import { executeStep } from './step.js';
3233
import { selectEngine } from './engine.js';
3334
import { loadControllerConfig } from '../../shared/workflows/controller.js';
3435
import { registry } from '../../infra/engines/index.js';
36+
import {
37+
markStepStarted,
38+
initStepSession,
39+
markChainCompleted,
40+
markStepCompleted,
41+
getStepData,
42+
getChainResumeInfo,
43+
} from '../../shared/workflows/steps.js';
3544

3645
/**
3746
* Runner options
@@ -123,6 +132,13 @@ export class WorkflowRunner {
123132
};
124133
process.on('workflow:pause', pauseHandler);
125134

135+
// Skip listener (Ctrl+S while agent running)
136+
const skipHandler = () => {
137+
debug('[Runner] Skip requested');
138+
this.abortController?.abort();
139+
};
140+
process.on('workflow:skip', skipHandler);
141+
126142
// Stop listener
127143
const stopHandler = () => {
128144
debug('[Runner] Stop requested');
@@ -142,6 +158,7 @@ export class WorkflowRunner {
142158
this.machine.subscribe((state) => {
143159
if (this.machine.isFinal) {
144160
process.removeListener('workflow:pause', pauseHandler);
161+
process.removeListener('workflow:skip', skipHandler);
145162
process.removeListener('workflow:stop', stopHandler);
146163
process.removeListener('workflow:mode-change', modeChangeHandler);
147164
}
@@ -205,6 +222,41 @@ export class WorkflowRunner {
205222

206223
debug('[Runner] Executing step %d: %s', ctx.currentStepIndex, step.agentName);
207224

225+
// Check for resume data (existing session from previous run)
226+
const stepData = await getStepData(this.cmRoot, ctx.currentStepIndex);
227+
const isResuming = stepData?.sessionId && !stepData.completedAt;
228+
229+
// If resuming, skip execution and go directly to waiting state
230+
if (isResuming) {
231+
debug('[Runner] Resuming step %d - going to waiting state', ctx.currentStepIndex);
232+
233+
// Register monitoring ID so TUI loads existing logs
234+
if (stepData.monitoringId !== undefined) {
235+
this.emitter.registerMonitoringId(uniqueAgentId, stepData.monitoringId);
236+
}
237+
238+
this.emitter.updateAgentStatus(uniqueAgentId, 'checkpoint');
239+
this.emitter.logMessage(uniqueAgentId, '═'.repeat(80));
240+
this.emitter.logMessage(uniqueAgentId, `${step.agentName} resumed - waiting for input.`);
241+
242+
// Set context with saved data
243+
ctx.currentMonitoringId = stepData.monitoringId;
244+
ctx.currentOutput = {
245+
output: '',
246+
monitoringId: stepData.monitoringId,
247+
};
248+
249+
// Go to waiting state
250+
this.machine.send({
251+
type: 'STEP_COMPLETE',
252+
output: { output: '', monitoringId: stepData.monitoringId },
253+
});
254+
return;
255+
}
256+
257+
// Track step start for resume
258+
await markStepStarted(this.cmRoot, ctx.currentStepIndex);
259+
208260
// Reset pause flag
209261
this.pauseRequested = false;
210262

@@ -214,7 +266,7 @@ export class WorkflowRunner {
214266
// Update UI
215267
this.emitter.updateAgentStatus(uniqueAgentId, 'running');
216268
this.emitter.logMessage(uniqueAgentId, '═'.repeat(80));
217-
this.emitter.logMessage(uniqueAgentId, `${step.agentName} started to work.`);
269+
this.emitter.logMessage(uniqueAgentId, `${step.agentName} ${isResuming ? 'resumed work.' : 'started to work.'}`);
218270

219271
// Reset behavior file
220272
const behaviorFile = path.join(this.cwd, '.codemachine/memory/behavior.json');
@@ -237,13 +289,16 @@ export class WorkflowRunner {
237289
}
238290

239291
try {
240-
// Execute the step
292+
// Execute the step (with resume data if available)
241293
const output = await executeStep(step, this.cwd, {
242294
logger: () => {},
243295
stderrLogger: () => {},
244296
emitter: this.emitter,
245297
abortSignal: this.abortController.signal,
246298
uniqueAgentId,
299+
resumeMonitoringId: isResuming ? stepData.monitoringId : undefined,
300+
resumeSessionId: isResuming ? stepData.sessionId : undefined,
301+
resumePrompt: isResuming ? 'Continue from where you left off.' : undefined,
247302
});
248303

249304
// Check if paused
@@ -256,18 +311,33 @@ export class WorkflowRunner {
256311

257312
// Step completed
258313
debug('[Runner] Step completed');
259-
this.emitter.updateAgentStatus(uniqueAgentId, 'completed');
314+
315+
// Track session info for resume
316+
if (output.monitoringId !== undefined) {
317+
const monitor = AgentMonitorService.getInstance();
318+
const agentInfo = monitor.getAgent(output.monitoringId);
319+
const sessionId = agentInfo?.sessionId ?? '';
320+
await initStepSession(this.cmRoot, ctx.currentStepIndex, sessionId, output.monitoringId);
321+
}
260322

261323
const stepOutput: StepOutput = {
262324
output: output.output,
263325
monitoringId: output.monitoringId,
264326
};
265327

266328
// Update context with chained prompts if any
329+
debug('[Runner] chainedPrompts from output: %d items', output.chainedPrompts?.length ?? 0);
267330
if (output.chainedPrompts && output.chainedPrompts.length > 0) {
331+
debug('[Runner] Setting promptQueue with %d chained prompts:', output.chainedPrompts.length);
332+
output.chainedPrompts.forEach((p, i) => debug('[Runner] [%d] %s: %s', i, p.name, p.label));
268333
this.machine.context.promptQueue = output.chainedPrompts;
269334
this.machine.context.promptQueueIndex = 0;
335+
// Show checkpoint status while waiting for chained prompt input
336+
this.emitter.updateAgentStatus(uniqueAgentId, 'checkpoint');
337+
debug('[Runner] Agent at checkpoint, waiting for chained prompt input');
270338
} else {
339+
debug('[Runner] No chained prompts, marking agent completed');
340+
this.emitter.updateAgentStatus(uniqueAgentId, 'completed');
271341
this.machine.context.promptQueue = [];
272342
this.machine.context.promptQueueIndex = 0;
273343
}
@@ -284,11 +354,9 @@ export class WorkflowRunner {
284354
debug('[Runner] Step aborted (skip)');
285355
this.emitter.updateAgentStatus(uniqueAgentId, 'skipped');
286356
this.emitter.logMessage(uniqueAgentId, `${step.agentName} was skipped.`);
287-
// Treat skip as completing the step with empty output
288-
this.machine.send({
289-
type: 'STEP_COMPLETE',
290-
output: { output: '', monitoringId: undefined },
291-
});
357+
// Track step completion for resume
358+
await markStepCompleted(this.cmRoot, ctx.currentStepIndex);
359+
this.machine.send({ type: 'SKIP' });
292360
}
293361
return;
294362
}
@@ -308,7 +376,8 @@ export class WorkflowRunner {
308376
private async handleWaiting(): Promise<void> {
309377
const ctx = this.machine.context;
310378

311-
debug('[Runner] Handling waiting state, autoMode=%s', ctx.autoMode);
379+
debug('[Runner] Handling waiting state, autoMode=%s, promptQueue=%d items, queueIndex=%d',
380+
ctx.autoMode, ctx.promptQueue.length, ctx.promptQueueIndex);
312381

313382
// Build input context
314383
const inputContext: InputContext = {
@@ -334,10 +403,19 @@ export class WorkflowRunner {
334403
}
335404

336405
// Handle result
406+
const step = this.moduleSteps[ctx.currentStepIndex];
407+
const uniqueAgentId = `${step.agentId}-step-${ctx.currentStepIndex}`;
408+
337409
switch (result.type) {
338410
case 'input':
339411
if (result.value === '') {
340412
// Empty input = advance to next step
413+
debug('[Runner] Empty input, marking agent completed and advancing');
414+
this.emitter.updateAgentStatus(uniqueAgentId, 'completed');
415+
this.emitter.logMessage(uniqueAgentId, `${step.agentName} has completed their work.`);
416+
this.emitter.logMessage(uniqueAgentId, '\n' + '═'.repeat(80) + '\n');
417+
// Track step completion for resume
418+
await markStepCompleted(this.cmRoot, ctx.currentStepIndex);
341419
this.machine.send({ type: 'INPUT_RECEIVED', input: '' });
342420
} else {
343421
// Has input = resume current step with input, then wait again
@@ -346,6 +424,12 @@ export class WorkflowRunner {
346424
break;
347425

348426
case 'skip':
427+
debug('[Runner] Skip requested, marking agent skipped');
428+
this.emitter.updateAgentStatus(uniqueAgentId, 'skipped');
429+
this.emitter.logMessage(uniqueAgentId, `${step.agentName} was skipped.`);
430+
this.emitter.logMessage(uniqueAgentId, '\n' + '═'.repeat(80) + '\n');
431+
// Track step completion for resume
432+
await markStepCompleted(this.cmRoot, ctx.currentStepIndex);
349433
this.machine.send({ type: 'SKIP' });
350434
break;
351435

@@ -365,12 +449,30 @@ export class WorkflowRunner {
365449

366450
debug('[Runner] Resuming step with input: %s...', input.slice(0, 50));
367451

368-
// Update queue index if using queued prompt
452+
// Get sessionId from step data for resume
453+
const stepData = await getStepData(this.cmRoot, ctx.currentStepIndex);
454+
const sessionId = stepData?.sessionId;
455+
456+
// Detect queued vs custom input
457+
let isQueuedPrompt = false;
369458
if (ctx.promptQueue.length > 0 && ctx.promptQueueIndex < ctx.promptQueue.length) {
370459
const queuedPrompt = ctx.promptQueue[ctx.promptQueueIndex];
371460
if (input === queuedPrompt.content) {
461+
isQueuedPrompt = true;
462+
const chainIndex = ctx.promptQueueIndex;
372463
ctx.promptQueueIndex += 1;
373464
debug('[Runner] Advanced queue to index %d', ctx.promptQueueIndex);
465+
// Track chain completion for resume
466+
await markChainCompleted(this.cmRoot, ctx.currentStepIndex, chainIndex);
467+
}
468+
}
469+
470+
// Log custom user input (magenta)
471+
if (!isQueuedPrompt) {
472+
const formatted = formatUserInput(input);
473+
this.emitter.logMessage(uniqueAgentId, formatted);
474+
if (monitoringId !== undefined) {
475+
AgentLoggerService.getInstance().write(monitoringId, `\n${formatted}\n`);
374476
}
375477
}
376478

@@ -386,6 +488,7 @@ export class WorkflowRunner {
386488
abortSignal: this.abortController.signal,
387489
uniqueAgentId,
388490
resumeMonitoringId: monitoringId,
491+
resumeSessionId: sessionId,
389492
resumePrompt: input,
390493
});
391494

@@ -396,6 +499,9 @@ export class WorkflowRunner {
396499
};
397500
ctx.currentMonitoringId = output.monitoringId;
398501

502+
// Back to checkpoint while waiting for next input
503+
this.emitter.updateAgentStatus(uniqueAgentId, 'checkpoint');
504+
399505
// Stay in waiting state - will get more input
400506
// (The waiting handler will be called again)
401507
} catch (error) {

src/workflows/execution/step.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ export interface StepExecutorOptions {
3535
resumeMonitoringId?: number;
3636
/** Custom prompt for resume (instead of "Continue from where you left off") */
3737
resumePrompt?: string;
38+
/** Session ID for resuming (direct, for when monitoringId lookup fails) */
39+
resumeSessionId?: string;
3840
/** Selected conditions for filtering conditional chained prompt paths */
3941
selectedConditions?: string[];
4042
}
@@ -135,6 +137,7 @@ export async function executeStep(
135137
uniqueAgentId: options.uniqueAgentId,
136138
resumeMonitoringId: options.resumeMonitoringId,
137139
resumePrompt: options.resumePrompt,
140+
resumeSessionId: options.resumeSessionId,
138141
selectedConditions: options.selectedConditions,
139142
// Pass emitter as UI so runner can register monitoring ID immediately
140143
ui: options.emitter,

src/workflows/input/user.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ export class UserInputProvider implements InputProvider {
7777
}
7878

7979
debug('[UserInput] Received input: prompt=%s, skip=%s', data?.prompt, data?.skip);
80+
debug('[UserInput] Context: promptQueue=%d items, queueIndex=%d',
81+
this.currentContext.promptQueue.length, this.currentContext.promptQueueIndex);
8082

8183
// Handle skip
8284
if (data?.skip) {
@@ -93,10 +95,13 @@ export class UserInputProvider implements InputProvider {
9395
// If no input provided, check queue
9496
if (!input && this.currentContext.promptQueue.length > 0) {
9597
const queueIndex = this.currentContext.promptQueueIndex;
98+
debug('[UserInput] Checking queue: queueIndex=%d, queueLength=%d', queueIndex, this.currentContext.promptQueue.length);
9699
if (queueIndex < this.currentContext.promptQueue.length) {
97100
const queuedPrompt = this.currentContext.promptQueue[queueIndex];
98101
input = queuedPrompt.content;
99-
debug('[UserInput] Using queued prompt: %s', queuedPrompt.label);
102+
debug('[UserInput] Using queued prompt [%d]: %s - "%s"', queueIndex, queuedPrompt.label, input?.slice(0, 50));
103+
} else {
104+
debug('[UserInput] Queue exhausted (index %d >= length %d)', queueIndex, this.currentContext.promptQueue.length);
100105
}
101106
}
102107

src/workflows/state/machine.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,24 @@ export function createWorkflowMachine(initialContext: Partial<WorkflowContext> =
167167
}
168168
},
169169
},
170+
SKIP: [
171+
// Skip while running - advance to next step
172+
{
173+
target: 'running',
174+
guard: (ctx) => ctx.currentStepIndex < ctx.totalSteps - 1,
175+
action: (ctx) => {
176+
ctx.currentStepIndex += 1;
177+
ctx.promptQueue = [];
178+
ctx.promptQueueIndex = 0;
179+
debug('[FSM] Skipped during run, advancing to step %d', ctx.currentStepIndex + 1);
180+
},
181+
},
182+
// If last step, complete
183+
{
184+
target: 'completed',
185+
guard: (ctx) => ctx.currentStepIndex >= ctx.totalSteps - 1,
186+
},
187+
],
170188
PAUSE: {
171189
target: 'waiting',
172190
action: (ctx) => {

0 commit comments

Comments
 (0)