Skip to content

Commit 8285548

Browse files
authored
fix: immediate stop feedback and synthetic execution_complete (#79)
* fix: immediate stop feedback and synthetic execution_complete stopExecution() now marks the processing message as failed, broadcasts a synthetic execution_complete (so all clients flush buffered tokens), and sends processing_status: false immediately. processSandboxEvent() guards execution_complete with isStillProcessing so the bridge's late execution_complete doesn't double-update a stopped message. Queue draining and snapshots still run regardless. * fix: address PR review feedback - Remove unnecessary `as ServerMessage` cast on synthetic execution_complete broadcast — the type already matches the ServerMessage union - Broadcast execution_complete to clients in the normal (non-stopped) completion path — the early return skipped the generic broadcast, so normal completions were never sent to WebSocket clients - Update unit test header comment to accurately describe what's tested (repository methods, not DO behavior) * chore: clarify why stopExecution notifies slack-bot directly * refactor: simplify completionMessageId and add queue-drain integration test - Remove dead `?? event.messageId` fallback (messageId already resolves it) - Add integration test verifying queue drain dispatches next message when sandbox WS is connected after stop + late execution_complete
1 parent 1bb4ecc commit 8285548

File tree

3 files changed

+553
-23
lines changed

3 files changed

+553
-23
lines changed

packages/control-plane/src/session/durable-object.ts

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,14 +1042,18 @@ export class SessionDO extends DurableObject<Env> {
10421042

10431043
// Handle specific event types
10441044
if (event.type === "execution_complete") {
1045-
// Use the resolved messageId (which now correctly prioritizes event.messageId)
1046-
const completionMessageId = messageId ?? event.messageId;
1047-
const status = event.success ? "completed" : "failed";
1045+
// messageId already incorporates event.messageId (line above), so no extra fallback needed
1046+
const completionMessageId = messageId;
10481047

1049-
if (completionMessageId) {
1048+
// Only update message status if it's still processing (not already stopped)
1049+
const isStillProcessing =
1050+
completionMessageId != null && processingMessage?.id === completionMessageId;
1051+
1052+
if (isStillProcessing) {
1053+
// Normal path: message still processing, complete it as before
1054+
const status = event.success ? "completed" : "failed";
10501055
this.repository.updateMessageCompletion(completionMessageId, status, now);
10511056

1052-
// Emit prompt.complete wide event with duration metrics
10531057
const timestamps = this.repository.getMessageTimestamps(completionMessageId);
10541058
const totalDurationMs = timestamps ? now - timestamps.created_at : undefined;
10551059
const processingDurationMs =
@@ -1069,30 +1073,24 @@ export class SessionDO extends DurableObject<Env> {
10691073
queue_duration_ms: queueDurationMs,
10701074
});
10711075

1072-
// Broadcast processing status change (after DB update so getIsProcessing is accurate)
1076+
this.broadcast({ type: "sandbox_event", event });
10731077
this.broadcast({ type: "processing_status", isProcessing: this.getIsProcessing() });
1074-
1075-
// Notify slack-bot of completion (fire-and-forget with retry)
10761078
this.ctx.waitUntil(this.notifySlackBot(completionMessageId, event.success));
10771079
} else {
1078-
this.log.warn("prompt.complete", {
1080+
// Stopped path: message was already marked failed by stopExecution()
1081+
this.log.info("prompt.complete", {
10791082
event: "prompt.complete",
1080-
outcome: "error",
1081-
error_reason: "no_message_id",
1083+
message_id: completionMessageId,
1084+
outcome: "already_stopped",
10821085
});
10831086
}
10841087

1085-
// Take snapshot after execution completes (per Ramp spec)
1086-
// "When the agent is finished making changes, we take another snapshot"
1087-
// Use fire-and-forget so snapshot doesn't block the response to the user
1088+
// Always run these regardless of stop (snapshot, activity, queue drain)
10881089
this.ctx.waitUntil(this.triggerSnapshot("execution_complete"));
1089-
1090-
// Reset activity timer - give user time to review output before inactivity timeout
10911090
this.updateLastActivity(now);
10921091
await this.scheduleInactivityCheck();
1093-
1094-
// Process next in queue
10951092
await this.processMessageQueue();
1093+
return; // execution_complete handling is done; skip the generic broadcast below
10961094
}
10971095

10981096
if (event.type === "git_sync") {
@@ -1108,7 +1106,7 @@ export class SessionDO extends DurableObject<Env> {
11081106
this.handlePushEvent(event);
11091107
}
11101108

1111-
// Broadcast to clients
1109+
// Broadcast to clients (all non-execution_complete events)
11121110
this.broadcast({ type: "sandbox_event", event });
11131111
}
11141112

@@ -1305,10 +1303,42 @@ export class SessionDO extends DurableObject<Env> {
13051303

13061304
/**
13071305
* Stop current execution.
1308-
* Sends stop command to sandbox, which should respond with execution_complete.
1309-
* The processing status will be updated when execution_complete is received.
1306+
* Marks the processing message as failed, broadcasts synthetic execution_complete
1307+
* so all clients flush buffered tokens, and forwards stop to the sandbox.
13101308
*/
13111309
private async stopExecution(): Promise<void> {
1310+
const now = Date.now();
1311+
const processingMessage = this.repository.getProcessingMessage();
1312+
1313+
if (processingMessage) {
1314+
this.repository.updateMessageCompletion(processingMessage.id, "failed", now);
1315+
this.log.info("prompt.stopped", {
1316+
event: "prompt.stopped",
1317+
message_id: processingMessage.id,
1318+
});
1319+
1320+
// Broadcast synthetic execution_complete so ALL clients flush buffered tokens.
1321+
// (The stop-clicking client flushes locally, but other connected clients don't.)
1322+
this.broadcast({
1323+
type: "sandbox_event",
1324+
event: {
1325+
type: "execution_complete",
1326+
messageId: processingMessage.id,
1327+
success: false,
1328+
sandboxId: "",
1329+
timestamp: now / 1000,
1330+
},
1331+
});
1332+
1333+
// Notify slack-bot now because the bridge's late execution_complete will hit
1334+
// the "already_stopped" branch in processSandboxEvent() which skips notification.
1335+
this.ctx.waitUntil(this.notifySlackBot(processingMessage.id, false));
1336+
}
1337+
1338+
// Immediate client feedback
1339+
this.broadcast({ type: "processing_status", isProcessing: false });
1340+
1341+
// Forward stop to sandbox (bridge cancels its task)
13121342
const sandboxWs = this.wsManager.getSandboxSocket();
13131343
if (sandboxWs) {
13141344
this.wsManager.send(sandboxWs, { type: "stop" });
@@ -2031,8 +2061,8 @@ export class SessionDO extends DurableObject<Env> {
20312061
}
20322062
}
20332063

2034-
private handleStop(): Response {
2035-
this.stopExecution();
2064+
private async handleStop(): Promise<Response> {
2065+
await this.stopExecution();
20362066
return Response.json({ status: "stopping" });
20372067
}
20382068

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* Unit tests for the stop-execution–related repository behavior.
3+
*
4+
* These tests exercise SessionRepository methods (e.g. getProcessingMessage()
5+
* and updateMessageCompletion()) that are used by stopExecution() and the
6+
* execution_complete guard in processSandboxEvent().
7+
*
8+
* We focus here on the repository-level interactions and state transitions
9+
* by directly calling the repository methods and verifying their effects.
10+
*/
11+
12+
import { describe, it, expect, beforeEach } from "vitest";
13+
import { SessionRepository, type SqlStorage, type SqlResult } from "./repository";
14+
15+
/**
16+
* Create a mock SqlStorage that tracks calls and can return configurable data.
17+
*/
18+
function createMockSql() {
19+
const calls: Array<{ query: string; params: unknown[] }> = [];
20+
const mockData: Map<string, unknown[]> = new Map();
21+
let oneValue: unknown = null;
22+
23+
const sql: SqlStorage = {
24+
exec(query: string, ...params: unknown[]): SqlResult {
25+
calls.push({ query, params });
26+
const data = mockData.get(query) ?? [];
27+
return {
28+
toArray: () => data,
29+
one: () => oneValue,
30+
};
31+
},
32+
};
33+
34+
return {
35+
sql,
36+
calls,
37+
setData(query: string, data: unknown[]) {
38+
mockData.set(query, data);
39+
},
40+
setOne(value: unknown) {
41+
oneValue = value;
42+
},
43+
reset() {
44+
calls.length = 0;
45+
mockData.clear();
46+
oneValue = null;
47+
},
48+
};
49+
}
50+
51+
describe("Stop execution - repository interactions", () => {
52+
let mock: ReturnType<typeof createMockSql>;
53+
let repo: SessionRepository;
54+
55+
beforeEach(() => {
56+
mock = createMockSql();
57+
repo = new SessionRepository(mock.sql);
58+
});
59+
60+
describe("getProcessingMessage", () => {
61+
it("returns message when one is processing", () => {
62+
mock.setData(`SELECT id FROM messages WHERE status = 'processing' LIMIT 1`, [
63+
{ id: "msg-1" },
64+
]);
65+
const result = repo.getProcessingMessage();
66+
expect(result).toEqual({ id: "msg-1" });
67+
});
68+
69+
it("returns null when no message is processing", () => {
70+
mock.setData(`SELECT id FROM messages WHERE status = 'processing' LIMIT 1`, []);
71+
expect(repo.getProcessingMessage()).toBeNull();
72+
});
73+
});
74+
75+
describe("updateMessageCompletion", () => {
76+
it("calls SQL with correct parameters for failed status", () => {
77+
repo.updateMessageCompletion("msg-1", "failed", 1000);
78+
79+
const call = mock.calls.find((c) => c.query.includes("UPDATE messages SET status"));
80+
expect(call).toBeDefined();
81+
expect(call!.params).toContain("failed");
82+
expect(call!.params).toContain("msg-1");
83+
expect(call!.params).toContain(1000);
84+
});
85+
86+
it("calls SQL with correct parameters for completed status", () => {
87+
repo.updateMessageCompletion("msg-2", "completed", 2000);
88+
89+
const call = mock.calls.find((c) => c.query.includes("UPDATE messages SET status"));
90+
expect(call).toBeDefined();
91+
expect(call!.params).toContain("completed");
92+
expect(call!.params).toContain("msg-2");
93+
});
94+
});
95+
96+
describe("stopExecution state machine", () => {
97+
it("marks processing message as failed, then getProcessingMessage returns null", () => {
98+
// First call: message is processing
99+
mock.setData(`SELECT id FROM messages WHERE status = 'processing' LIMIT 1`, [
100+
{ id: "msg-1" },
101+
]);
102+
const processing = repo.getProcessingMessage();
103+
expect(processing).toEqual({ id: "msg-1" });
104+
105+
// Mark as failed
106+
repo.updateMessageCompletion("msg-1", "failed", Date.now());
107+
108+
// After update, simulate no processing messages
109+
mock.setData(`SELECT id FROM messages WHERE status = 'processing' LIMIT 1`, []);
110+
expect(repo.getProcessingMessage()).toBeNull();
111+
});
112+
113+
it("does not error when no processing message exists", () => {
114+
mock.setData(`SELECT id FROM messages WHERE status = 'processing' LIMIT 1`, []);
115+
116+
const processing = repo.getProcessingMessage();
117+
expect(processing).toBeNull();
118+
// No updateMessageCompletion call needed - this is the idempotent case
119+
});
120+
});
121+
});

0 commit comments

Comments
 (0)