Skip to content

Commit 412321b

Browse files
Schedule fixes (#653)
* add destroyed flag and yield abort * Implement destroyed flag and context abort Add a destroyed flag and yield context abort functionality. * add test and fix immediate schedules * remove manual alarm call in constructor * update test + better changeset
1 parent c67d4b4 commit 412321b

File tree

5 files changed

+97
-23
lines changed

5 files changed

+97
-23
lines changed

.changeset/neat-beds-cheat.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"agents": patch
3+
---
4+
5+
Allow `this.destroy` inside a schedule by including a `destroyed` flag and yielding `ctx.abort` instead of calling it directly
6+
Fix issue where schedules would not be able to run for more 30 seconds due to `blockConccurencyWhile`. `alarm()` isn't manually called anymore, getting rid of the bCW.
7+
Fix an issue where immediate schedules (e.g. `this.schedule(0, "foo"))`) would not get immediately scheduled.

packages/agents/src/index.ts

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ export class Agent<
321321
private _state = DEFAULT_STATE as State;
322322
private _disposables = new DisposableStore();
323323
private _mcpStateRestored = false;
324+
private _destroyed = false;
324325

325326
private _ParentClass: typeof Agent<Env, State> =
326327
Object.getPrototypeOf(this).constructor;
@@ -457,26 +458,18 @@ export class Agent<
457458
)
458459
`;
459460

460-
void this.ctx.blockConcurrencyWhile(async () => {
461-
return this._tryCatch(async () => {
462-
// Create alarms table if it doesn't exist
463-
this.sql`
464-
CREATE TABLE IF NOT EXISTS cf_agents_schedules (
465-
id TEXT PRIMARY KEY NOT NULL DEFAULT (randomblob(9)),
466-
callback TEXT,
467-
payload TEXT,
468-
type TEXT NOT NULL CHECK(type IN ('scheduled', 'delayed', 'cron')),
469-
time INTEGER,
470-
delayInSeconds INTEGER,
471-
cron TEXT,
472-
created_at INTEGER DEFAULT (unixepoch())
473-
)
474-
`;
475-
476-
// execute any pending alarms and schedule the next alarm
477-
await this.alarm();
478-
});
479-
});
461+
this.sql`
462+
CREATE TABLE IF NOT EXISTS cf_agents_schedules (
463+
id TEXT PRIMARY KEY NOT NULL DEFAULT (randomblob(9)),
464+
callback TEXT,
465+
payload TEXT,
466+
type TEXT NOT NULL CHECK(type IN ('scheduled', 'delayed', 'cron')),
467+
time INTEGER,
468+
delayInSeconds INTEGER,
469+
cron TEXT,
470+
created_at INTEGER DEFAULT (unixepoch())
471+
)
472+
`;
480473

481474
this.sql`
482475
CREATE TABLE IF NOT EXISTS cf_agents_mcp_servers (
@@ -1251,7 +1244,7 @@ export class Agent<
12511244
// Find the next schedule that needs to be executed
12521245
const result = this.sql`
12531246
SELECT time FROM cf_agents_schedules
1254-
WHERE time > ${Math.floor(Date.now() / 1000)}
1247+
WHERE time >= ${Math.floor(Date.now() / 1000)}
12551248
ORDER BY time ASC
12561249
LIMIT 1
12571250
`;
@@ -1321,6 +1314,7 @@ export class Agent<
13211314
}
13221315
);
13231316
if (row.type === "cron") {
1317+
if (this._destroyed) return;
13241318
// Update next execution time for cron schedules
13251319
const nextExecutionTime = getNextCronTime(row.cron);
13261320
const nextTimestamp = Math.floor(nextExecutionTime.getTime() / 1000);
@@ -1329,13 +1323,15 @@ export class Agent<
13291323
UPDATE cf_agents_schedules SET time = ${nextTimestamp} WHERE id = ${row.id}
13301324
`;
13311325
} else {
1326+
if (this._destroyed) return;
13321327
// Delete one-time schedules after execution
13331328
this.sql`
13341329
DELETE FROM cf_agents_schedules WHERE id = ${row.id}
13351330
`;
13361331
}
13371332
}
13381333
}
1334+
if (this._destroyed) return;
13391335

13401336
// Schedule the next alarm
13411337
await this._scheduleNextAlarm();
@@ -1356,7 +1352,13 @@ export class Agent<
13561352
await this.ctx.storage.deleteAll();
13571353
this._disposables.dispose();
13581354
await this.mcp.dispose?.();
1359-
this.ctx.abort("destroyed"); // enforce that the agent is evicted
1355+
this._destroyed = true;
1356+
1357+
// `ctx.abort` throws an uncatchable error, so we yield to the event loop
1358+
// to avoid capturing it and let handlers finish cleaning up
1359+
setTimeout(() => {
1360+
this.ctx.abort("destroyed");
1361+
}, 0);
13601362

13611363
this.observability?.emit(
13621364
{
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { env } from "cloudflare:test";
2+
import { describe, expect, it } from "vitest";
3+
import type { Env } from "./worker";
4+
import { getAgentByName } from "..";
5+
6+
declare module "cloudflare:test" {
7+
interface ProvidedEnv extends Env {}
8+
}
9+
10+
describe("scheduled destroys", () => {
11+
it("should not throw when a scheduled callback nukes storage", async () => {
12+
let agentStub = await getAgentByName(
13+
env.TestDestroyScheduleAgent,
14+
"alarm-destroy-repro"
15+
);
16+
17+
// Alarm should fire immediately
18+
await agentStub.scheduleSelfDestructingAlarm();
19+
await expect(agentStub.getStatus()).resolves.toBe("scheduled");
20+
21+
// Let the alarm run
22+
await new Promise((resolve) => setTimeout(resolve, 50));
23+
24+
agentStub = await getAgentByName(
25+
env.TestDestroyScheduleAgent,
26+
"alarm-destroy-repro"
27+
);
28+
29+
await expect(agentStub.getStatus()).resolves.toBe("unscheduled");
30+
});
31+
});

packages/agents/src/tests/worker.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export type Env = {
3030
TestChatAgent: DurableObjectNamespace<TestChatAgent>;
3131
TestOAuthAgent: DurableObjectNamespace<TestOAuthAgent>;
3232
TEST_MCP_JURISDICTION: DurableObjectNamespace<TestMcpJurisdiction>;
33+
TestDestroyScheduleAgent: DurableObjectNamespace<TestDestroyScheduleAgent>;
3334
};
3435

3536
type State = unknown;
@@ -39,6 +40,7 @@ type Props = {
3940
};
4041

4142
export class TestMcpAgent extends McpAgent<Env, State, Props> {
43+
observability = undefined;
4244
private tempToolHandle?: { remove: () => void };
4345

4446
server = new McpServer(
@@ -130,6 +132,7 @@ export class TestMcpAgent extends McpAgent<Env, State, Props> {
130132

131133
// Test email agents
132134
export class TestEmailAgent extends Agent<Env> {
135+
observability = undefined;
133136
emailsReceived: AgentEmail[] = [];
134137

135138
async onEmail(email: AgentEmail) {
@@ -144,6 +147,7 @@ export class TestEmailAgent extends Agent<Env> {
144147
}
145148

146149
export class TestCaseSensitiveAgent extends Agent<Env> {
150+
observability = undefined;
147151
emailsReceived: AgentEmail[] = [];
148152

149153
async onEmail(email: AgentEmail) {
@@ -156,6 +160,7 @@ export class TestCaseSensitiveAgent extends Agent<Env> {
156160
}
157161

158162
export class TestUserNotificationAgent extends Agent<Env> {
163+
observability = undefined;
159164
emailsReceived: AgentEmail[] = [];
160165

161166
async onEmail(email: AgentEmail) {
@@ -167,12 +172,30 @@ export class TestUserNotificationAgent extends Agent<Env> {
167172
}
168173
}
169174

175+
export class TestDestroyScheduleAgent extends Agent<Env, { status: string }> {
176+
observability = undefined;
177+
initialState = {
178+
status: "unscheduled"
179+
};
180+
181+
async scheduleSelfDestructingAlarm() {
182+
this.setState({ status: "scheduled" });
183+
await this.schedule(0, "destroy");
184+
}
185+
186+
getStatus() {
187+
return this.state.status;
188+
}
189+
}
190+
170191
// An Agent that tags connections in onConnect,
171192
// then echoes whether the tag was observed in onMessage
172193
export class TestRaceAgent extends Agent<Env> {
173194
initialState = { hello: "world" };
174195
static options = { hibernate: true };
175196

197+
observability = undefined;
198+
176199
async onConnect(conn: Connection<{ tagged: boolean }>) {
177200
// Simulate real async setup to widen the window a bit
178201
conn.setState({ tagged: true });
@@ -187,6 +210,8 @@ export class TestRaceAgent extends Agent<Env> {
187210

188211
// Test Agent for OAuth client side flows
189212
export class TestOAuthAgent extends Agent<Env> {
213+
observability = undefined;
214+
190215
async onRequest(_request: Request): Promise<Response> {
191216
return new Response("Test OAuth Agent");
192217
}
@@ -311,6 +336,8 @@ export class TestOAuthAgent extends Agent<Env> {
311336
}
312337

313338
export class TestChatAgent extends AIChatAgent<Env> {
339+
observability = undefined;
340+
314341
async onChatMessage() {
315342
// Simple echo response for testing
316343
return new Response("Hello from chat agent!", {
@@ -373,6 +400,8 @@ export class TestChatAgent extends AIChatAgent<Env> {
373400

374401
// Test MCP Agent for jurisdiction feature
375402
export class TestMcpJurisdiction extends McpAgent<Env> {
403+
observability = undefined;
404+
376405
server = new McpServer(
377406
{ name: "test-jurisdiction-server", version: "1.0.0" },
378407
{ capabilities: { tools: {} } }

packages/agents/src/tests/wrangler.jsonc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@
3838
{
3939
"class_name": "TestMcpJurisdiction",
4040
"name": "TEST_MCP_JURISDICTION"
41+
},
42+
{
43+
"class_name": "TestDestroyScheduleAgent",
44+
"name": "TestDestroyScheduleAgent"
4145
}
4246
]
4347
},
@@ -52,7 +56,8 @@
5256
"TestRaceAgent",
5357
"TestChatAgent",
5458
"TestOAuthAgent",
55-
"TestMcpJurisdiction"
59+
"TestMcpJurisdiction",
60+
"TestDestroyScheduleAgent"
5661
],
5762
"tag": "v1"
5863
}

0 commit comments

Comments
 (0)