Skip to content

Commit 7309ec6

Browse files
committed
feat(input): add mode switching between user and controller input
- Extend InputResult type to include source field - Add mode change listeners to both user and controller input providers - Implement immediate abort on mode switch in controller provider - Handle mode switch signals in workflow runner - Add proper cleanup of listeners and abort controllers
1 parent 79f871c commit 7309ec6

File tree

4 files changed

+94
-11
lines changed

4 files changed

+94
-11
lines changed

src/workflows/execution/runner.ts

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,14 @@ export class WorkflowRunner {
150150
// Mode change listener
151151
const modeChangeHandler = async (data: { autonomousMode: boolean }) => {
152152
debug('[Runner] Mode change: autoMode=%s', data.autonomousMode);
153+
// If in waiting state, let the provider's listener handle it
154+
// The provider will return __SWITCH_TO_AUTO__ or __SWITCH_TO_MANUAL__
155+
// and handleWaiting() will call setAutoMode()
156+
if (this.machine.state === 'waiting') {
157+
debug('[Runner] In waiting state, provider will handle mode switch');
158+
return;
159+
}
160+
// In other states (running, idle), set auto mode directly
153161
await this.setAutoMode(data.autonomousMode);
154162
};
155163
process.on('workflow:mode-change', modeChangeHandler);
@@ -402,6 +410,14 @@ export class WorkflowRunner {
402410
return;
403411
}
404412

413+
// Handle special switch-to-auto signal
414+
if (result.type === 'input' && result.value === '__SWITCH_TO_AUTO__') {
415+
debug('[Runner] Switching to autonomous mode');
416+
await this.setAutoMode(true);
417+
// Re-run waiting with controller input
418+
return;
419+
}
420+
405421
// Handle result
406422
const step = this.moduleSteps[ctx.currentStepIndex];
407423
const uniqueAgentId = `${step.agentId}-step-${ctx.currentStepIndex}`;
@@ -419,7 +435,7 @@ export class WorkflowRunner {
419435
this.machine.send({ type: 'INPUT_RECEIVED', input: '' });
420436
} else {
421437
// Has input = resume current step with input, then wait again
422-
await this.resumeWithInput(result.value, result.resumeMonitoringId);
438+
await this.resumeWithInput(result.value, result.resumeMonitoringId, result.source);
423439
}
424440
break;
425441

@@ -442,12 +458,12 @@ export class WorkflowRunner {
442458
/**
443459
* Resume current step with input (for chained prompts or steering)
444460
*/
445-
private async resumeWithInput(input: string, monitoringId?: number): Promise<void> {
461+
private async resumeWithInput(input: string, monitoringId?: number, source?: 'user' | 'controller'): Promise<void> {
446462
const ctx = this.machine.context;
447463
const step = this.moduleSteps[ctx.currentStepIndex];
448464
const uniqueAgentId = `${step.agentId}-step-${ctx.currentStepIndex}`;
449465

450-
debug('[Runner] Resuming step with input: %s...', input.slice(0, 50));
466+
debug('[Runner] Resuming step with input: %s... (source=%s)', input.slice(0, 50), source ?? 'user');
451467

452468
// Get sessionId from step data for resume
453469
const stepData = await getStepData(this.cmRoot, ctx.currentStepIndex);
@@ -467,8 +483,8 @@ export class WorkflowRunner {
467483
}
468484
}
469485

470-
// Log custom user input (magenta)
471-
if (!isQueuedPrompt) {
486+
// Log custom user input (magenta) - skip for controller input (already logged during streaming)
487+
if (!isQueuedPrompt && source !== 'controller') {
472488
const formatted = formatUserInput(input);
473489
this.emitter.logMessage(uniqueAgentId, formatted);
474490
if (monitoringId !== undefined) {
@@ -480,6 +496,16 @@ export class WorkflowRunner {
480496
this.emitter.updateAgentStatus(uniqueAgentId, 'running');
481497
this.emitter.setWorkflowStatus('running');
482498

499+
// Track if mode switch was requested during execution
500+
let modeSwitchRequested: 'manual' | 'auto' | null = null;
501+
const modeChangeHandler = (data: { autonomousMode: boolean }) => {
502+
debug('[Runner] Mode change during resumeWithInput: autoMode=%s', data.autonomousMode);
503+
modeSwitchRequested = data.autonomousMode ? 'auto' : 'manual';
504+
// Abort the current step execution
505+
this.abortController?.abort();
506+
};
507+
process.on('workflow:mode-change', modeChangeHandler);
508+
483509
try {
484510
const output = await executeStep(step, this.cwd, {
485511
logger: () => {},
@@ -508,11 +534,21 @@ export class WorkflowRunner {
508534
if (error instanceof Error && error.name === 'AbortError') {
509535
if (this.pauseRequested) {
510536
this.machine.send({ type: 'PAUSE' });
537+
return;
538+
}
539+
// Handle mode switch during execution
540+
if (modeSwitchRequested) {
541+
debug('[Runner] Step aborted due to mode switch to %s', modeSwitchRequested);
542+
this.emitter.updateAgentStatus(uniqueAgentId, 'checkpoint');
543+
await this.setAutoMode(modeSwitchRequested === 'auto');
544+
// Return to let handleWaiting loop with new provider
545+
return;
511546
}
512547
return;
513548
}
514549
this.machine.send({ type: 'STEP_ERROR', error: error as Error });
515550
} finally {
551+
process.removeListener('workflow:mode-change', modeChangeHandler);
516552
this.abortController = null;
517553
}
518554
}

src/workflows/input/controller.ts

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export class ControllerInputProvider implements InputProvider {
4646
private getControllerConfig: () => Promise<ControllerConfig | null>;
4747
private cwd: string;
4848
private aborted = false;
49+
private abortController: AbortController | null = null;
4950
private modeChangeListener: ((data: { autonomousMode: boolean }) => void) | null = null;
5051

5152
constructor(options: ControllerInputProviderOptions) {
@@ -67,13 +68,17 @@ export class ControllerInputProvider implements InputProvider {
6768

6869
const loggerService = AgentLoggerService.getInstance();
6970

71+
// Set up abort controller for this execution
72+
this.abortController = new AbortController();
73+
7074
// Listen for mode change (user switches to manual)
7175
let switchToManual = false;
7276
this.modeChangeListener = (data) => {
7377
if (!data.autonomousMode) {
74-
debug('[Controller] Mode change to manual requested');
78+
debug('[Controller] Mode change to manual requested, aborting execution');
7579
switchToManual = true;
76-
// Note: We don't abort here - let current execution complete
80+
// Abort the current execution immediately
81+
this.abortController?.abort();
7782
}
7883
};
7984
process.on('workflow:mode-change', this.modeChangeListener);
@@ -87,6 +92,7 @@ export class ControllerInputProvider implements InputProvider {
8792
workingDir: this.cwd,
8893
resumeSessionId: config.sessionId,
8994
resumePrompt: prompt,
95+
abortSignal: this.abortController.signal,
9096
logger: (chunk) => {
9197
// Log controller output to step's log
9298
if (context.stepOutput.monitoringId !== undefined) {
@@ -105,7 +111,7 @@ export class ControllerInputProvider implements InputProvider {
105111
return { type: 'stop' };
106112
}
107113

108-
// Check if user switched to manual mode
114+
// Check if user switched to manual mode (shouldn't reach here if aborted, but just in case)
109115
if (switchToManual) {
110116
debug('[Controller] Switching to manual mode');
111117
this.emitter.emitCanceled();
@@ -144,9 +150,24 @@ export class ControllerInputProvider implements InputProvider {
144150
type: 'input',
145151
value: cleanedResponse,
146152
resumeMonitoringId: context.stepOutput.monitoringId,
153+
source: 'controller',
147154
};
155+
} catch (error) {
156+
// Handle abort (mode switch to manual)
157+
if (error instanceof Error && error.name === 'AbortError') {
158+
if (switchToManual) {
159+
debug('[Controller] Aborted due to mode switch to manual');
160+
this.emitter.emitCanceled();
161+
return { type: 'input', value: '__SWITCH_TO_MANUAL__' };
162+
}
163+
// General abort (stop workflow)
164+
debug('[Controller] Aborted');
165+
return { type: 'stop' };
166+
}
167+
throw error;
148168
} finally {
149-
// Clean up listener
169+
// Clean up
170+
this.abortController = null;
150171
if (this.modeChangeListener) {
151172
process.removeListener('workflow:mode-change', this.modeChangeListener);
152173
this.modeChangeListener = null;
@@ -168,6 +189,10 @@ export class ControllerInputProvider implements InputProvider {
168189
debug('[Controller] Aborting');
169190
this.aborted = true;
170191

192+
// Abort any running execution
193+
this.abortController?.abort();
194+
this.abortController = null;
195+
171196
if (this.modeChangeListener) {
172197
process.removeListener('workflow:mode-change', this.modeChangeListener);
173198
this.modeChangeListener = null;

src/workflows/input/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export interface InputContext {
3434
* Result from an input provider
3535
*/
3636
export type InputResult =
37-
| { type: 'input'; value: string; resumeMonitoringId?: number }
37+
| { type: 'input'; value: string; resumeMonitoringId?: number; source?: 'user' | 'controller' }
3838
| { type: 'skip' }
3939
| { type: 'stop' };
4040

src/workflows/input/user.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export class UserInputProvider implements InputProvider {
3333
private resolver: ((result: InputResult) => void) | null = null;
3434
private currentContext: InputContext | null = null;
3535
private inputListener: ((data?: { prompt?: string; skip?: boolean }) => void) | null = null;
36+
private modeChangeListener: ((data: { autonomousMode: boolean }) => void) | null = null;
3637

3738
constructor(options: UserInputProviderOptions) {
3839
this.emitter = options.emitter;
@@ -57,16 +58,32 @@ export class UserInputProvider implements InputProvider {
5758
};
5859
process.on('workflow:input', this.inputListener);
5960

61+
// Set up listener for mode change (switch to autonomous)
62+
this.modeChangeListener = (data) => {
63+
if (data.autonomousMode && this.resolver) {
64+
debug('[UserInput] Mode change to autonomous, signaling switch');
65+
this.emitter.emitCanceled();
66+
this.resolver({ type: 'input', value: '__SWITCH_TO_AUTO__' });
67+
this.resolver = null;
68+
this.currentContext = null;
69+
}
70+
};
71+
process.on('workflow:mode-change', this.modeChangeListener);
72+
6073
try {
6174
return await new Promise<InputResult>((resolve) => {
6275
this.resolver = resolve;
6376
});
6477
} finally {
65-
// Clean up listener
78+
// Clean up listeners
6679
if (this.inputListener) {
6780
process.removeListener('workflow:input', this.inputListener);
6881
this.inputListener = null;
6982
}
83+
if (this.modeChangeListener) {
84+
process.removeListener('workflow:mode-change', this.modeChangeListener);
85+
this.modeChangeListener = null;
86+
}
7087
}
7188
}
7289

@@ -141,6 +158,11 @@ export class UserInputProvider implements InputProvider {
141158
this.inputListener = null;
142159
}
143160

161+
if (this.modeChangeListener) {
162+
process.removeListener('workflow:mode-change', this.modeChangeListener);
163+
this.modeChangeListener = null;
164+
}
165+
144166
if (this.resolver) {
145167
debug('[UserInput] Aborting pending input wait');
146168
this.emitter.emitCanceled();

0 commit comments

Comments
 (0)