Skip to content

Commit f1e2bfa

Browse files
Make scheduleEvery() idempotent (#1052)
* fix: make scheduleEvery() idempotent on callback name scheduleEvery() now deduplicates by callback name: calling it multiple times with the same callback returns the existing schedule instead of creating a duplicate. If the interval or payload changed, the existing schedule is updated in place. This fixes the common pattern of calling scheduleEvery() inside onStart(), which runs on every Durable Object wake. Previously each wake created a new interval schedule, leading to duplicate executions. keepAlive() opts out of idempotency via an internal _idempotent: false flag so multiple concurrent keepAlive calls still get independent schedules with independent disposers. Closes #1049 * Use (callback, interval, payload) as idempotency key for scheduleEvery() Instead of updating the existing row when interval/payload differs, treat a different interval or payload as a distinct schedule that gets its own row. The idempotency key is now the full tuple of (callback, intervalSeconds, payload) rather than just callback. Uses SQL IS instead of = for payload comparison to correctly handle NULL payload values (NULL IS NULL is true in SQLite). --------- Co-authored-by: ask-bonk[bot] <ask-bonk[bot]@users.noreply.github.com>
1 parent 6157741 commit f1e2bfa

File tree

5 files changed

+323
-4
lines changed

5 files changed

+323
-4
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"agents": patch
3+
---
4+
5+
Make `scheduleEvery()` idempotent
6+
7+
`scheduleEvery()` now deduplicates by the combination of callback name, interval, and payload: calling it multiple times with the same arguments returns the existing schedule instead of creating a duplicate. A different interval or payload creates a separate, independent schedule.
8+
9+
This fixes the common pattern of calling `scheduleEvery()` inside `onStart()`, which runs on every Durable Object wake. Previously each wake created a new interval schedule, leading to a thundering herd of duplicate executions.

docs/scheduling.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,46 @@ await this.scheduleEvery(45, "healthCheck", {});
172172
await this.scheduleEvery(90, "syncData", { destination: "warehouse" });
173173
```
174174

175+
**Idempotency:**
176+
177+
`scheduleEvery()` is idempotent on the combination of callback name, interval, and payload — calling it multiple times with the same arguments does not create duplicate schedules. This makes it safe to call in `onStart()`, which runs on every Durable Object wake:
178+
179+
```typescript
180+
class MyAgent extends Agent {
181+
async onStart() {
182+
// Safe: only one schedule is created, no matter how many times the DO wakes
183+
await this.scheduleEvery(30, "tick");
184+
}
185+
186+
async tick() {
187+
console.log("tick", new Date().toISOString());
188+
}
189+
}
190+
```
191+
192+
Calling `scheduleEvery()` with a different interval or payload creates a separate schedule, even for the same callback:
193+
194+
```typescript
195+
// First call creates one schedule
196+
await this.scheduleEvery(30, "poll");
197+
198+
// Second call with a different interval creates a second schedule
199+
await this.scheduleEvery(60, "poll");
200+
// Two "poll" schedules exist: one every 30s and one every 60s
201+
202+
// Third call with the same arguments as the first is a no-op
203+
await this.scheduleEvery(30, "poll");
204+
// Still two schedules
205+
```
206+
207+
Different callbacks also get their own independent schedules:
208+
209+
```typescript
210+
// These create two separate schedules (different callbacks)
211+
await this.scheduleEvery(30, "poll");
212+
await this.scheduleEvery(30, "healthCheck");
213+
```
214+
175215
**Key differences from cron:**
176216

177217
| Feature | Cron | Interval |
@@ -757,6 +797,7 @@ Schedule a task to run repeatedly at a fixed interval.
757797

758798
**Behavior:**
759799

800+
- **Idempotent on (callback, interval, payload)** — calling with the same callback, interval, and payload returns the existing schedule instead of creating a duplicate. A different interval or payload creates a new, independent schedule.
760801
- First execution occurs after `intervalSeconds` (not immediately)
761802
- If callback is still running when next execution is due, it's skipped (overlap prevention)
762803
- If callback throws an error, the interval continues

packages/agents/src/index.ts

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2174,7 +2174,23 @@ export class Agent<
21742174
}
21752175

21762176
/**
2177-
* Schedule a task to run repeatedly at a fixed interval
2177+
* Schedule a task to run repeatedly at a fixed interval.
2178+
*
2179+
* This method is **idempotent** — calling it multiple times with the same
2180+
* `callback`, `intervalSeconds`, and `payload` returns the existing schedule
2181+
* instead of creating a duplicate. A different interval or payload is
2182+
* treated as a distinct schedule and creates a new row.
2183+
*
2184+
* This makes it safe to call in `onStart()`, which runs on every Durable
2185+
* Object wake:
2186+
*
2187+
* ```ts
2188+
* async onStart() {
2189+
* // Only one schedule is created, no matter how many times the DO wakes
2190+
* await this.scheduleEvery(30, "tick");
2191+
* }
2192+
* ```
2193+
*
21782194
* @template T Type of the payload data
21792195
* @param intervalSeconds Number of seconds between executions
21802196
* @param callback Name of the method to call
@@ -2187,7 +2203,7 @@ export class Agent<
21872203
intervalSeconds: number,
21882204
callback: keyof this,
21892205
payload?: T,
2190-
options?: { retry?: RetryOptions }
2206+
options?: { retry?: RetryOptions; _idempotent?: boolean }
21912207
): Promise<Schedule<T>> {
21922208
// DO alarms have a max schedule time of 30 days
21932209
const MAX_INTERVAL_SECONDS = 30 * 24 * 60 * 60; // 30 days in seconds
@@ -2214,6 +2230,46 @@ export class Agent<
22142230
validateRetryOptions(options.retry, this._resolvedOptions.retry);
22152231
}
22162232

2233+
const idempotent = options?._idempotent !== false;
2234+
const payloadJson = JSON.stringify(payload);
2235+
2236+
// Idempotency: check for an existing interval schedule with the same
2237+
// callback, interval, and payload. A different interval or payload is
2238+
// treated as a distinct schedule and gets its own row.
2239+
if (idempotent) {
2240+
const existing = this.sql<{
2241+
id: string;
2242+
callback: string;
2243+
payload: string;
2244+
type: string;
2245+
time: number;
2246+
intervalSeconds: number;
2247+
retry_options: string | null;
2248+
}>`
2249+
SELECT * FROM cf_agents_schedules
2250+
WHERE type = 'interval'
2251+
AND callback = ${callback}
2252+
AND intervalSeconds = ${intervalSeconds}
2253+
AND payload IS ${payloadJson}
2254+
LIMIT 1
2255+
`;
2256+
2257+
if (existing.length > 0) {
2258+
const row = existing[0];
2259+
2260+
// Exact match — return existing schedule as-is (no-op)
2261+
return {
2262+
callback: row.callback,
2263+
id: row.id,
2264+
intervalSeconds: row.intervalSeconds,
2265+
payload: JSON.parse(row.payload) as T,
2266+
retry: parseRetryOptions(row as unknown as Record<string, unknown>),
2267+
time: row.time,
2268+
type: "interval"
2269+
};
2270+
}
2271+
}
2272+
22172273
const id = nanoid(9);
22182274
const time = new Date(Date.now() + intervalSeconds * 1000);
22192275
const timestamp = Math.floor(time.getTime() / 1000);
@@ -2222,7 +2278,7 @@ export class Agent<
22222278

22232279
this.sql`
22242280
INSERT OR REPLACE INTO cf_agents_schedules (id, callback, payload, type, intervalSeconds, time, running, retry_options)
2225-
VALUES (${id}, ${callback}, ${JSON.stringify(payload)}, 'interval', ${intervalSeconds}, ${timestamp}, 0, ${retryJson})
2281+
VALUES (${id}, ${callback}, ${payloadJson}, 'interval', ${intervalSeconds}, ${timestamp}, 0, ${retryJson})
22262282
`;
22272283

22282284
await this._scheduleNextAlarm();
@@ -2357,7 +2413,9 @@ export class Agent<
23572413
const heartbeatSeconds = Math.ceil(KEEP_ALIVE_INTERVAL_MS / 1000);
23582414
const schedule = await this.scheduleEvery(
23592415
heartbeatSeconds,
2360-
"_cf_keepAliveHeartbeat" as keyof this
2416+
"_cf_keepAliveHeartbeat" as keyof this,
2417+
undefined,
2418+
{ _idempotent: false }
23612419
);
23622420

23632421
let disposed = false;

packages/agents/src/tests/agents/schedule.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,4 +222,52 @@ export class TestScheduleAgent extends Agent<Record<string, unknown>> {
222222

223223
return schedule.id;
224224
}
225+
226+
// --- Idempotency test helpers ---
227+
228+
// A second callback for testing that idempotency is per-callback
229+
secondIntervalCallback() {
230+
// Intentionally empty
231+
}
232+
233+
@callable()
234+
async createIntervalScheduleWithPayload(
235+
intervalSeconds: number,
236+
payload: string
237+
): Promise<string> {
238+
const schedule = await this.scheduleEvery(
239+
intervalSeconds,
240+
"intervalCallback",
241+
payload
242+
);
243+
return schedule.id;
244+
}
245+
246+
@callable()
247+
async createSecondIntervalSchedule(intervalSeconds: number): Promise<string> {
248+
const schedule = await this.scheduleEvery(
249+
intervalSeconds,
250+
"secondIntervalCallback"
251+
);
252+
return schedule.id;
253+
}
254+
255+
@callable()
256+
async countIntervalSchedules(): Promise<number> {
257+
const result = this.sql<{ count: number }>`
258+
SELECT COUNT(*) as count FROM cf_agents_schedules WHERE type = 'interval'
259+
`;
260+
return result[0].count;
261+
}
262+
263+
@callable()
264+
async countIntervalSchedulesForCallback(
265+
callbackName: string
266+
): Promise<number> {
267+
const result = this.sql<{ count: number }>`
268+
SELECT COUNT(*) as count FROM cf_agents_schedules
269+
WHERE type = 'interval' AND callback = ${callbackName}
270+
`;
271+
return result[0].count;
272+
}
225273
}

packages/agents/src/tests/schedule.test.ts

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,4 +257,167 @@ describe("schedule operations", () => {
257257
await agentStub.cancelScheduleById(scheduleId);
258258
});
259259
});
260+
261+
describe("scheduleEvery idempotency", () => {
262+
it("should return existing schedule when called with same callback and interval", async () => {
263+
const agentStub = await getAgentByName(
264+
env.TestScheduleAgent,
265+
"idempotent-same-args-test"
266+
);
267+
268+
// Create an interval schedule
269+
const firstId = await agentStub.createIntervalSchedule(30);
270+
271+
// Call again with the same callback and interval
272+
const secondId = await agentStub.createIntervalSchedule(30);
273+
274+
// Both calls should return the same schedule ID
275+
expect(secondId).toBe(firstId);
276+
277+
// Only one schedule should exist
278+
const count =
279+
await agentStub.countIntervalSchedulesForCallback("intervalCallback");
280+
expect(count).toBe(1);
281+
});
282+
283+
it("should return existing schedule when called with same callback, interval, and payload", async () => {
284+
const agentStub = await getAgentByName(
285+
env.TestScheduleAgent,
286+
"idempotent-same-payload-test"
287+
);
288+
289+
// Create with payload
290+
const firstId = await agentStub.createIntervalScheduleWithPayload(
291+
30,
292+
"hello"
293+
);
294+
295+
// Call again with the same arguments
296+
const secondId = await agentStub.createIntervalScheduleWithPayload(
297+
30,
298+
"hello"
299+
);
300+
301+
// Same schedule returned
302+
expect(secondId).toBe(firstId);
303+
304+
const count =
305+
await agentStub.countIntervalSchedulesForCallback("intervalCallback");
306+
expect(count).toBe(1);
307+
});
308+
309+
it("should create a new row when interval changes for same callback", async () => {
310+
const agentStub = await getAgentByName(
311+
env.TestScheduleAgent,
312+
"idempotent-interval-change-test"
313+
);
314+
315+
// Create with 30s interval
316+
const firstId = await agentStub.createIntervalSchedule(30);
317+
318+
// Call again with different interval
319+
const secondId = await agentStub.createIntervalSchedule(60);
320+
321+
// Different interval means a different schedule
322+
expect(secondId).not.toBe(firstId);
323+
324+
// Two schedules should exist for this callback
325+
const count =
326+
await agentStub.countIntervalSchedulesForCallback("intervalCallback");
327+
expect(count).toBe(2);
328+
329+
// The new schedule should have the new interval
330+
const schedule = await agentStub.getScheduleById(secondId);
331+
expect(schedule).toBeDefined();
332+
if (schedule?.type === "interval") {
333+
expect(schedule.intervalSeconds).toBe(60);
334+
}
335+
336+
// The original schedule should still have the old interval
337+
const original = await agentStub.getScheduleById(firstId);
338+
expect(original).toBeDefined();
339+
if (original?.type === "interval") {
340+
expect(original.intervalSeconds).toBe(30);
341+
}
342+
});
343+
344+
it("should create a new row when payload changes for same callback", async () => {
345+
const agentStub = await getAgentByName(
346+
env.TestScheduleAgent,
347+
"idempotent-payload-change-test"
348+
);
349+
350+
// Create with payload "foo"
351+
const firstId = await agentStub.createIntervalScheduleWithPayload(
352+
30,
353+
"foo"
354+
);
355+
356+
// Call again with different payload
357+
const secondId = await agentStub.createIntervalScheduleWithPayload(
358+
30,
359+
"bar"
360+
);
361+
362+
// Different payload means a different schedule
363+
expect(secondId).not.toBe(firstId);
364+
365+
// Two schedules should exist for this callback
366+
const count =
367+
await agentStub.countIntervalSchedulesForCallback("intervalCallback");
368+
expect(count).toBe(2);
369+
370+
// Each schedule should have its own payload
371+
const first = await agentStub.getScheduleById(firstId);
372+
expect(first).toBeDefined();
373+
expect(first?.payload).toBe("foo");
374+
375+
const second = await agentStub.getScheduleById(secondId);
376+
expect(second).toBeDefined();
377+
expect(second?.payload).toBe("bar");
378+
});
379+
380+
it("should allow different callbacks to have their own interval schedules", async () => {
381+
const agentStub = await getAgentByName(
382+
env.TestScheduleAgent,
383+
"idempotent-different-callbacks-test"
384+
);
385+
386+
// Create interval for callback A
387+
const firstId = await agentStub.createIntervalSchedule(30);
388+
389+
// Create interval for callback B
390+
const secondId = await agentStub.createSecondIntervalSchedule(30);
391+
392+
// Different callbacks should create different schedules
393+
expect(secondId).not.toBe(firstId);
394+
395+
// Two interval schedules should exist total
396+
const count = await agentStub.countIntervalSchedules();
397+
expect(count).toBe(2);
398+
});
399+
400+
it("should not create duplicates when called many times (simulating repeated onStart)", async () => {
401+
const agentStub = await getAgentByName(
402+
env.TestScheduleAgent,
403+
"idempotent-repeated-calls-test"
404+
);
405+
406+
// Simulate calling scheduleEvery in onStart many times
407+
const ids: string[] = [];
408+
for (let i = 0; i < 5; i++) {
409+
const id = await agentStub.createIntervalSchedule(30);
410+
ids.push(id);
411+
}
412+
413+
// All IDs should be the same
414+
const uniqueIds = [...new Set(ids)];
415+
expect(uniqueIds.length).toBe(1);
416+
417+
// Only one schedule should exist
418+
const count =
419+
await agentStub.countIntervalSchedulesForCallback("intervalCallback");
420+
expect(count).toBe(1);
421+
});
422+
});
260423
});

0 commit comments

Comments
 (0)