Skip to content

Commit 539780a

Browse files
committed
feat: add interrupt agent (PLA-7)
- Add WebSocket action interrupt_agent to stop current agent run - Strategies: optional interruptAgent() implemented for opencode, claude-code, gemini, openai-codex (kill process), mock (clear timeout) - Orchestrator: handle interrupt, finish stream with accumulated text and stream_end; extract finishStream() to avoid duplication - Chat: Stop button (muted style) when streaming, interruptAgent in hook - Docs: document interrupt_agent in API.md - Tests: orchestrator (interrupt when idle/processing), MockStrategy (interruptAgent rejects with INTERRUPTED), useChatWebSocket (interruptAgent) - Fix: opencode ToolEvent kind tool_call, remove unused _hasSession - Bump version to 1.6.8
1 parent 5a37c77 commit 539780a

15 files changed

+209
-49
lines changed

apps/api/src/app/orchestrator/orchestrator.service.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,4 +166,25 @@ describe('OrchestratorService', () => {
166166
expect(thinkingStep?.data.title).toBe('Generating response');
167167
expect(thinkingStep?.data.status).toBe('processing');
168168
});
169+
170+
test('handleClientMessage interrupt_agent when not processing does nothing', async () => {
171+
const orch = await createOrchestrator();
172+
const events: Array<{ type: string }> = [];
173+
orch.outbound.subscribe((ev) => events.push(ev));
174+
orch.handleClientMessage({ action: WS_ACTION.INTERRUPT_AGENT });
175+
expect(events.length).toBe(0);
176+
});
177+
178+
test('handleClientMessage interrupt_agent when processing sends stream_end with accumulated', async () => {
179+
const orch = await createOrchestrator();
180+
orch.isAuthenticated = true;
181+
const events: Array<{ type: string }> = [];
182+
orch.outbound.subscribe((ev) => events.push(ev));
183+
const promise = orch.handleClientMessage({ action: WS_ACTION.SEND_CHAT_MESSAGE, text: 'hi' });
184+
orch.handleClientMessage({ action: WS_ACTION.INTERRUPT_AGENT });
185+
await promise;
186+
expect(events.some((e) => e.type === WS_EVENT.STREAM_START)).toBe(true);
187+
expect(events.some((e) => e.type === WS_EVENT.STREAM_END)).toBe(true);
188+
expect(orch.isProcessing).toBe(false);
189+
});
169190
});

apps/api/src/app/orchestrator/orchestrator.service.ts

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import type {
2020
ThinkingStep,
2121
ToolEvent,
2222
} from '../strategies/strategy.types';
23+
import { INTERRUPTED_MESSAGE } from '../strategies/strategy.types';
2324
import { StrategyRegistryService } from '../strategies/strategy-registry.service';
2425
import {
2526
AUTH_STATUS as AUTH_STATUS_VAL,
@@ -83,6 +84,24 @@ export class OrchestratorService implements OnModuleInit {
8384
this.outbound$.next({ type, data });
8485
}
8586

87+
private finishStream(
88+
accumulated: string,
89+
stepId: string,
90+
step: ThinkingStep
91+
): void {
92+
const finalText = accumulated || 'The agent produced no visible output.';
93+
this.messageStore.add('assistant', finalText);
94+
void this.phoenixSync.syncMessages(JSON.stringify(this.messageStore.all()));
95+
this._send(WS_EVENT.THINKING_STEP, {
96+
id: stepId,
97+
title: step.title,
98+
status: 'complete',
99+
details: step.details,
100+
timestamp: new Date().toISOString(),
101+
});
102+
this._send(WS_EVENT.STREAM_END, {});
103+
}
104+
86105
ensureStrategySettings(): void {
87106
this.strategy.ensureSettings?.();
88107
}
@@ -138,6 +157,11 @@ export class OrchestratorService implements OnModuleInit {
138157
case WS_ACTION.SET_MODEL:
139158
this.handleSetModel(msg.model ?? '');
140159
break;
160+
case WS_ACTION.INTERRUPT_AGENT:
161+
if (this.isProcessing) {
162+
this.strategy.interruptAgent?.();
163+
}
164+
break;
141165
default:
142166
this.logger.warn(`Unknown action: ${action}`);
143167
}
@@ -374,24 +398,14 @@ export class OrchestratorService implements OnModuleInit {
374398
accumulated += chunk;
375399
this._send(WS_EVENT.STREAM_CHUNK, { text: chunk });
376400
}, callbacks, systemPrompt || undefined);
377-
378-
const finalText =
379-
accumulated || 'The agent produced no visible output.';
380-
this.messageStore.add('assistant', finalText);
381-
void this.phoenixSync.syncMessages(
382-
JSON.stringify(this.messageStore.all())
383-
);
384-
this._send(WS_EVENT.THINKING_STEP, {
385-
id: syntheticStepId,
386-
title: syntheticStep.title,
387-
status: 'complete',
388-
details: syntheticStep.details,
389-
timestamp: new Date().toISOString(),
390-
});
391-
this._send(WS_EVENT.STREAM_END, {});
401+
this.finishStream(accumulated, syntheticStepId, syntheticStep);
392402
} catch (err) {
393403
const message = err instanceof Error ? err.message : String(err);
394-
this._send(WS_EVENT.ERROR, { message });
404+
if (message === INTERRUPTED_MESSAGE) {
405+
this.finishStream(accumulated, syntheticStepId, syntheticStep);
406+
} else {
407+
this._send(WS_EVENT.ERROR, { message });
408+
}
395409
} finally {
396410
this.isProcessing = false;
397411
}

apps/api/src/app/strategies/claude-code.strategy.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { spawn } from 'node:child_process';
1+
import { spawn, type ChildProcess } from 'node:child_process';
22
import { existsSync, mkdirSync, readdirSync, readFileSync, rmSync, writeFileSync } from 'node:fs';
33
import { join } from 'node:path';
44
import type { AuthConnection, LogoutConnection, ToolEvent } from './strategy.types';
5-
import type { AgentStrategy } from './strategy.types';
5+
import { INTERRUPTED_MESSAGE, type AgentStrategy } from './strategy.types';
66

77
const CLAUDE_CONFIG_DIR = join(process.env.HOME ?? '/home/node', '.claude');
88
const TOKEN_FILE_PATH = join(CLAUDE_CONFIG_DIR, 'agent_token.txt');
@@ -76,6 +76,8 @@ export function toolUseToEvent(
7676
export class ClaudeCodeStrategy implements AgentStrategy {
7777
private currentConnection: AuthConnection | null = null;
7878
private _hasSession = false;
79+
private currentStreamProcess: ChildProcess | null = null;
80+
private streamInterrupted = false;
7981

8082
private getToken(): string | null {
8183
if (existsSync(TOKEN_FILE_PATH)) {
@@ -195,6 +197,11 @@ export class ClaudeCodeStrategy implements AgentStrategy {
195197
});
196198
}
197199

200+
interruptAgent(): void {
201+
this.streamInterrupted = true;
202+
this.currentStreamProcess?.kill();
203+
}
204+
198205
executePromptStreaming(
199206
prompt: string,
200207
_model: string,
@@ -203,6 +210,7 @@ export class ClaudeCodeStrategy implements AgentStrategy {
203210
systemPrompt?: string
204211
): Promise<void> {
205212
return new Promise((resolve, reject) => {
213+
this.streamInterrupted = false;
206214
if (!existsSync(PLAYGROUND_DIR)) {
207215
mkdirSync(PLAYGROUND_DIR, { recursive: true });
208216
}
@@ -233,6 +241,7 @@ export class ClaudeCodeStrategy implements AgentStrategy {
233241
cwd: PLAYGROUND_DIR,
234242
shell: false,
235243
});
244+
this.currentStreamProcess = claudeProcess;
236245
claudeProcess.stdin?.end();
237246

238247
let errorResult = '';
@@ -304,7 +313,12 @@ export class ClaudeCodeStrategy implements AgentStrategy {
304313
});
305314

306315
claudeProcess.on('close', (code) => {
316+
this.currentStreamProcess = null;
307317
if (useStreamJson && stdoutBuffer.trim()) handleStreamJsonLine(stdoutBuffer);
318+
if (this.streamInterrupted) {
319+
reject(new Error(INTERRUPTED_MESSAGE));
320+
return;
321+
}
308322
if (code !== 0 && errorResult.trim()) {
309323
reject(new Error(errorResult || `Process exited with code ${code}`));
310324
} else {
@@ -313,7 +327,10 @@ export class ClaudeCodeStrategy implements AgentStrategy {
313327
}
314328
});
315329

316-
claudeProcess.on('error', reject);
330+
claudeProcess.on('error', (err) => {
331+
this.currentStreamProcess = null;
332+
reject(err);
333+
});
317334
});
318335
}
319336

apps/api/src/app/strategies/gemini.strategy.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { Logger } from '@nestjs/common';
2-
import { spawn } from 'node:child_process';
2+
import { spawn, type ChildProcess } from 'node:child_process';
33
import { existsSync, mkdirSync, readFileSync, rmSync, unlinkSync, writeFileSync } from 'node:fs';
44
import { join } from 'node:path';
55
import type { AuthConnection, LogoutConnection } from './strategy.types';
6-
import type { AgentStrategy } from './strategy.types';
6+
import { INTERRUPTED_MESSAGE, type AgentStrategy } from './strategy.types';
77
import { runAuthProcess } from './auth-process-helper';
88

99
const GEMINI_CONFIG_DIR = join(process.env.HOME ?? '/home/node', '.gemini');
@@ -14,6 +14,8 @@ export class GeminiStrategy implements AgentStrategy {
1414
private currentConnection: AuthConnection | null = null;
1515
private authCancel: (() => void) | null = null;
1616
private _hasSession = false;
17+
private currentStreamProcess: ChildProcess | null = null;
18+
private streamInterrupted = false;
1719

1820
ensureSettings(): void {
1921
if (!existsSync(GEMINI_CONFIG_DIR)) {
@@ -193,6 +195,11 @@ export class GeminiStrategy implements AgentStrategy {
193195
return ['-m', model];
194196
}
195197

198+
interruptAgent(): void {
199+
this.streamInterrupted = true;
200+
this.currentStreamProcess?.kill();
201+
}
202+
196203
executePromptStreaming(
197204
prompt: string,
198205
model: string,
@@ -201,6 +208,7 @@ export class GeminiStrategy implements AgentStrategy {
201208
systemPrompt?: string
202209
): Promise<void> {
203210
return new Promise((resolve, reject) => {
211+
this.streamInterrupted = false;
204212
this.ensureSettings();
205213
const playgroundDir = join(process.cwd(), 'playground');
206214
if (!existsSync(playgroundDir)) {
@@ -221,6 +229,7 @@ export class GeminiStrategy implements AgentStrategy {
221229
cwd: playgroundDir,
222230
shell: false,
223231
});
232+
this.currentStreamProcess = geminiProcess;
224233

225234
let errorResult = '';
226235

@@ -234,6 +243,11 @@ export class GeminiStrategy implements AgentStrategy {
234243
});
235244

236245
geminiProcess.on('close', (code) => {
246+
this.currentStreamProcess = null;
247+
if (this.streamInterrupted) {
248+
reject(new Error(INTERRUPTED_MESSAGE));
249+
return;
250+
}
237251
const modelNotFound =
238252
errorResult.includes('ModelNotFoundError') ||
239253
errorResult.includes('Requested entity was not found');
@@ -267,7 +281,10 @@ export class GeminiStrategy implements AgentStrategy {
267281
}
268282
});
269283

270-
geminiProcess.on('error', reject);
284+
geminiProcess.on('error', (err) => {
285+
this.currentStreamProcess = null;
286+
reject(err);
287+
});
271288
});
272289
}
273290
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { describe, test, expect, beforeEach } from 'bun:test';
2+
import { MockStrategy } from './mock.strategy';
3+
import { INTERRUPTED_MESSAGE } from './strategy.types';
4+
5+
describe('MockStrategy', () => {
6+
let strategy: MockStrategy;
7+
8+
beforeEach(() => {
9+
strategy = new MockStrategy();
10+
});
11+
12+
test('interruptAgent rejects executePromptStreaming promise with INTERRUPTED', async () => {
13+
const chunks: string[] = [];
14+
const promise = strategy.executePromptStreaming(
15+
'prompt',
16+
'model',
17+
(chunk) => chunks.push(chunk),
18+
undefined,
19+
undefined
20+
);
21+
strategy.interruptAgent();
22+
await expect(promise).rejects.toThrow(INTERRUPTED_MESSAGE);
23+
});
24+
25+
test('interruptAgent when no stream is running does not throw', () => {
26+
expect(() => strategy.interruptAgent()).not.toThrow();
27+
});
28+
});

apps/api/src/app/strategies/mock.strategy.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import { Logger } from '@nestjs/common';
22
import type { AuthConnection, LogoutConnection } from './strategy.types';
3-
import type { AgentStrategy } from './strategy.types';
3+
import { INTERRUPTED_MESSAGE, type AgentStrategy } from './strategy.types';
44

55
const MOCK_AUTH_DELAY_MS = 1000;
66
const MOCK_LOGOUT_DELAY_MS = 500;
77

88
export class MockStrategy implements AgentStrategy {
99
private readonly logger = new Logger(MockStrategy.name);
10+
private streamCancel: (() => void) | null = null;
1011

1112
executeAuth(connection: AuthConnection): void {
1213
this.logger.log('executeAuth: Mocking auth success in 1s');
@@ -40,14 +41,24 @@ export class MockStrategy implements AgentStrategy {
4041
return Promise.resolve(true);
4142
}
4243

44+
interruptAgent(): void {
45+
this.streamCancel?.();
46+
}
47+
4348
executePromptStreaming(
4449
_prompt: string,
4550
_model: string,
4651
onChunk: (chunk: string) => void,
4752
callbacks?: import('./strategy.types').StreamingCallbacks,
4853
_systemPrompt?: string
4954
): Promise<void> {
50-
return new Promise((resolve) => {
55+
return new Promise((resolve, reject) => {
56+
const timeoutRef: { id: ReturnType<typeof setTimeout> | null } = { id: null };
57+
this.streamCancel = () => {
58+
this.streamCancel = null;
59+
if (timeoutRef.id !== null) clearTimeout(timeoutRef.id);
60+
reject(new Error(INTERRUPTED_MESSAGE));
61+
};
5162
callbacks?.onReasoningChunk?.('Considering the request...\n');
5263
callbacks?.onReasoningChunk?.('Preparing a helpful response.\n');
5364
callbacks?.onReasoningEnd?.();
@@ -57,7 +68,9 @@ export class MockStrategy implements AgentStrategy {
5768
path: 'example-output.ts',
5869
summary: 'Sample file created by mock agent',
5970
});
60-
setTimeout(() => {
71+
timeoutRef.id = setTimeout(() => {
72+
this.streamCancel = null;
73+
timeoutRef.id = null;
6174
const timestamp = new Date().toISOString();
6275
onChunk('[MOCKED RESPONSE] Hello! ');
6376
onChunk(`The current timestamp is ${timestamp}`);

0 commit comments

Comments
 (0)