Skip to content

Commit d35181a

Browse files
committed
fix: make scheduleEvery() idempotent on callback name
scheduleEvery() now deduplicates by callback name: calling it multiple times with the same callback returns the existing schedule instead of creating a duplicate. If the interval or payload changed, the existing schedule is updated in place. This fixes the common pattern of calling scheduleEvery() inside onStart(), which runs on every Durable Object wake. Previously each wake created a new interval schedule, leading to duplicate executions. keepAlive() opts out of idempotency via an internal _idempotent: false flag so multiple concurrent keepAlive calls still get independent schedules with independent disposers. Closes #1049
1 parent 6157741 commit d35181a

File tree

5 files changed

+341
-4
lines changed

5 files changed

+341
-4
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"agents": patch
3+
---
4+
5+
Make `scheduleEvery()` idempotent on callback name
6+
7+
`scheduleEvery()` now deduplicates by callback name: calling it multiple times with the same callback returns the existing schedule instead of creating a duplicate. If the interval or payload changed, the existing schedule is updated in place.
8+
9+
This fixes the common pattern of calling `scheduleEvery()` inside `onStart()`, which runs on every Durable Object wake. Previously each wake created a new interval schedule, leading to a thundering herd of duplicate executions.

docs/scheduling.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,42 @@ await this.scheduleEvery(45, "healthCheck", {});
172172
await this.scheduleEvery(90, "syncData", { destination: "warehouse" });
173173
```
174174

175+
**Idempotency:**
176+
177+
`scheduleEvery()` is idempotent on the callback name — calling it multiple times with the same callback does not create duplicate schedules. This makes it safe to call in `onStart()`, which runs on every Durable Object wake:
178+
179+
```typescript
180+
class MyAgent extends Agent {
181+
async onStart() {
182+
// Safe: only one schedule is created, no matter how many times the DO wakes
183+
await this.scheduleEvery(30, "tick");
184+
}
185+
186+
async tick() {
187+
console.log("tick", new Date().toISOString());
188+
}
189+
}
190+
```
191+
192+
If you call `scheduleEvery()` again with the same callback but a different interval or payload, the existing schedule is updated in place (rather than creating a duplicate):
193+
194+
```typescript
195+
// First call creates the schedule
196+
await this.scheduleEvery(30, "poll");
197+
198+
// Second call with different interval updates the existing schedule
199+
await this.scheduleEvery(60, "poll");
200+
// Only one "poll" schedule exists, now running every 60 seconds
201+
```
202+
203+
Different callbacks always get their own independent schedules:
204+
205+
```typescript
206+
// These create two separate schedules (different callbacks)
207+
await this.scheduleEvery(30, "poll");
208+
await this.scheduleEvery(30, "healthCheck");
209+
```
210+
175211
**Key differences from cron:**
176212

177213
| Feature | Cron | Interval |
@@ -757,6 +793,7 @@ Schedule a task to run repeatedly at a fixed interval.
757793

758794
**Behavior:**
759795

796+
- **Idempotent on callback name** — calling with the same callback returns the existing schedule instead of creating a duplicate. If the interval or payload changed, the existing schedule is updated in place.
760797
- First execution occurs after `intervalSeconds` (not immediately)
761798
- If callback is still running when next execution is due, it's skipped (overlap prevention)
762799
- If callback throws an error, the interval continues

packages/agents/src/index.ts

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2174,7 +2174,23 @@ export class Agent<
21742174
}
21752175

21762176
/**
2177-
* Schedule a task to run repeatedly at a fixed interval
2177+
* Schedule a task to run repeatedly at a fixed interval.
2178+
*
2179+
* This method is **idempotent** — calling it multiple times with the same
2180+
* `callback`, `intervalSeconds`, and `payload` returns the existing schedule
2181+
* instead of creating a duplicate. If the interval or payload has changed,
2182+
* the existing schedule is updated in place.
2183+
*
2184+
* This makes it safe to call in `onStart()`, which runs on every Durable
2185+
* Object wake:
2186+
*
2187+
* ```ts
2188+
* async onStart() {
2189+
* // Only one schedule is created, no matter how many times the DO wakes
2190+
* await this.scheduleEvery(30, "tick");
2191+
* }
2192+
* ```
2193+
*
21782194
* @template T Type of the payload data
21792195
* @param intervalSeconds Number of seconds between executions
21802196
* @param callback Name of the method to call
@@ -2187,7 +2203,7 @@ export class Agent<
21872203
intervalSeconds: number,
21882204
callback: keyof this,
21892205
payload?: T,
2190-
options?: { retry?: RetryOptions }
2206+
options?: { retry?: RetryOptions; _idempotent?: boolean }
21912207
): Promise<Schedule<T>> {
21922208
// DO alarms have a max schedule time of 30 days
21932209
const MAX_INTERVAL_SECONDS = 30 * 24 * 60 * 60; // 30 days in seconds
@@ -2214,6 +2230,80 @@ export class Agent<
22142230
validateRetryOptions(options.retry, this._resolvedOptions.retry);
22152231
}
22162232

2233+
const idempotent = options?._idempotent !== false;
2234+
const payloadJson = JSON.stringify(payload);
2235+
2236+
// Idempotency: check for an existing interval schedule with the same callback
2237+
if (idempotent) {
2238+
const existing = this.sql<{
2239+
id: string;
2240+
callback: string;
2241+
payload: string;
2242+
type: string;
2243+
time: number;
2244+
intervalSeconds: number;
2245+
retry_options: string | null;
2246+
}>`
2247+
SELECT * FROM cf_agents_schedules
2248+
WHERE type = 'interval' AND callback = ${callback}
2249+
LIMIT 1
2250+
`;
2251+
2252+
if (existing.length > 0) {
2253+
const row = existing[0];
2254+
const retryJson = options?.retry ? JSON.stringify(options.retry) : null;
2255+
2256+
// If interval or payload changed, update in place
2257+
if (
2258+
row.intervalSeconds !== intervalSeconds ||
2259+
row.payload !== payloadJson
2260+
) {
2261+
const newTime = Math.floor(
2262+
(Date.now() + intervalSeconds * 1000) / 1000
2263+
);
2264+
this.sql`
2265+
UPDATE cf_agents_schedules
2266+
SET intervalSeconds = ${intervalSeconds},
2267+
payload = ${payloadJson},
2268+
time = ${newTime},
2269+
running = 0,
2270+
retry_options = ${retryJson}
2271+
WHERE id = ${row.id}
2272+
`;
2273+
2274+
await this._scheduleNextAlarm();
2275+
2276+
const schedule: Schedule<T> = {
2277+
callback: callback as string,
2278+
id: row.id,
2279+
intervalSeconds,
2280+
payload: payload as T,
2281+
retry: options?.retry,
2282+
time: newTime,
2283+
type: "interval"
2284+
};
2285+
2286+
this._emit("schedule:create", {
2287+
callback: callback as string,
2288+
id: row.id
2289+
});
2290+
2291+
return schedule;
2292+
}
2293+
2294+
// Exact match — return existing schedule as-is (no-op)
2295+
return {
2296+
callback: row.callback,
2297+
id: row.id,
2298+
intervalSeconds: row.intervalSeconds,
2299+
payload: JSON.parse(row.payload) as T,
2300+
retry: parseRetryOptions(row as unknown as Record<string, unknown>),
2301+
time: row.time,
2302+
type: "interval"
2303+
};
2304+
}
2305+
}
2306+
22172307
const id = nanoid(9);
22182308
const time = new Date(Date.now() + intervalSeconds * 1000);
22192309
const timestamp = Math.floor(time.getTime() / 1000);
@@ -2222,7 +2312,7 @@ export class Agent<
22222312

22232313
this.sql`
22242314
INSERT OR REPLACE INTO cf_agents_schedules (id, callback, payload, type, intervalSeconds, time, running, retry_options)
2225-
VALUES (${id}, ${callback}, ${JSON.stringify(payload)}, 'interval', ${intervalSeconds}, ${timestamp}, 0, ${retryJson})
2315+
VALUES (${id}, ${callback}, ${payloadJson}, 'interval', ${intervalSeconds}, ${timestamp}, 0, ${retryJson})
22262316
`;
22272317

22282318
await this._scheduleNextAlarm();
@@ -2357,7 +2447,9 @@ export class Agent<
23572447
const heartbeatSeconds = Math.ceil(KEEP_ALIVE_INTERVAL_MS / 1000);
23582448
const schedule = await this.scheduleEvery(
23592449
heartbeatSeconds,
2360-
"_cf_keepAliveHeartbeat" as keyof this
2450+
"_cf_keepAliveHeartbeat" as keyof this,
2451+
undefined,
2452+
{ _idempotent: false }
23612453
);
23622454

23632455
let disposed = false;

packages/agents/src/tests/agents/schedule.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,4 +222,52 @@ export class TestScheduleAgent extends Agent<Record<string, unknown>> {
222222

223223
return schedule.id;
224224
}
225+
226+
// --- Idempotency test helpers ---
227+
228+
// A second callback for testing that idempotency is per-callback
229+
secondIntervalCallback() {
230+
// Intentionally empty
231+
}
232+
233+
@callable()
234+
async createIntervalScheduleWithPayload(
235+
intervalSeconds: number,
236+
payload: string
237+
): Promise<string> {
238+
const schedule = await this.scheduleEvery(
239+
intervalSeconds,
240+
"intervalCallback",
241+
payload
242+
);
243+
return schedule.id;
244+
}
245+
246+
@callable()
247+
async createSecondIntervalSchedule(intervalSeconds: number): Promise<string> {
248+
const schedule = await this.scheduleEvery(
249+
intervalSeconds,
250+
"secondIntervalCallback"
251+
);
252+
return schedule.id;
253+
}
254+
255+
@callable()
256+
async countIntervalSchedules(): Promise<number> {
257+
const result = this.sql<{ count: number }>`
258+
SELECT COUNT(*) as count FROM cf_agents_schedules WHERE type = 'interval'
259+
`;
260+
return result[0].count;
261+
}
262+
263+
@callable()
264+
async countIntervalSchedulesForCallback(
265+
callbackName: string
266+
): Promise<number> {
267+
const result = this.sql<{ count: number }>`
268+
SELECT COUNT(*) as count FROM cf_agents_schedules
269+
WHERE type = 'interval' AND callback = ${callbackName}
270+
`;
271+
return result[0].count;
272+
}
225273
}

0 commit comments

Comments
 (0)