Skip to content

Commit 3a2ef00

Browse files
committed
restore failed state
1 parent a8776ad commit 3a2ef00

File tree

3 files changed

+64
-18
lines changed

3 files changed

+64
-18
lines changed

examples/e2e/app-router/wrangler.jsonc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"migrations": [
2020
{
2121
"tag": "v1",
22-
"new_classes": ["DurableObjectQueueHandler"]
22+
"new_sqlite_classes": ["DurableObjectQueueHandler"]
2323
}
2424
],
2525
"kv_namespaces": [

packages/cloudflare/src/api/durable-objects/queue.spec.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ const createDurableObjectQueue = ({
2828
storage: {
2929
setAlarm: vi.fn(),
3030
getAlarm: vi.fn(),
31+
sql: {
32+
exec: vi.fn(),
33+
},
3134
},
3235
};
3336
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -104,8 +107,9 @@ describe("DurableObjectQueue", () => {
104107
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
105108
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);
106109

110+
// BlockConcurrencyWhile is called twice here, first time during creation of the object and second time when we try to revalidate
107111
// @ts-expect-error
108-
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1);
112+
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(2);
109113

110114
// Here we await the blocked request to ensure it's resolved
111115
await blockedReq;
@@ -203,9 +207,10 @@ describe("DurableObjectQueue", () => {
203207

204208
it("should add an alarm if there are failed states", async () => {
205209
const queue = createDurableObjectQueue({ fetchDuration: 10 });
206-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 });
210+
const nextAlarm = Date.now() + 1000;
211+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm });
207212
await queue.addAlarm();
208-
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000);
213+
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(nextAlarm);
209214
});
210215

211216
it("should not add an alarm if there is already an alarm set", async () => {
@@ -219,10 +224,16 @@ describe("DurableObjectQueue", () => {
219224

220225
it("should set the alarm to the lowest nextAlarm", async () => {
221226
const queue = createDurableObjectQueue({ fetchDuration: 10 });
222-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 });
223-
queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarm: 500 });
227+
const nextAlarm = Date.now() + 1000;
228+
const firstAlarm = Date.now() + 500;
229+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm });
230+
queue.routeInFailedState.set("id2", {
231+
msg: createMessage("id2"),
232+
retryCount: 0,
233+
nextAlarm: firstAlarm,
234+
});
224235
await queue.addAlarm();
225-
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500);
236+
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(firstAlarm);
226237
});
227238
});
228239

packages/cloudflare/src/api/durable-objects/queue.ts

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,21 @@ interface ExtendedQueueMessage extends QueueMessage {
1515
previewModeId: string;
1616
}
1717

18+
interface FailedState {
19+
msg: ExtendedQueueMessage;
20+
retryCount: number;
21+
nextAlarm: number;
22+
}
23+
1824
export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
1925
// Ongoing revalidations are deduped by the deduplication id
2026
// Since this is running in waitUntil, we expect the durable object state to persist this during the duration of the revalidation
2127
// TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top)
2228
ongoingRevalidations = new Map<string, Promise<void>>();
2329

24-
// TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage
25-
routeInFailedState = new Map<
26-
string,
27-
{ msg: ExtendedQueueMessage; retryCount: number; nextAlarm: number }
28-
>();
30+
sql: SqlStorage;
31+
32+
routeInFailedState = new Map<string, FailedState>();
2933

3034
service: NonNullable<CloudflareEnv["NEXT_CACHE_REVALIDATION_WORKER"]>;
3135

@@ -38,6 +42,10 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
3842
// If there is no service binding, we throw an error because we can't revalidate without it
3943
if (!service) throw new IgnorableError("No service binding for cache revalidation worker");
4044
this.service = service;
45+
this.sql = ctx.storage.sql;
46+
47+
// We restore the state
48+
ctx.blockConcurrencyWhile(() => this.initState());
4149
}
4250

4351
async revalidate(msg: ExtendedQueueMessage) {
@@ -72,7 +80,6 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
7280
} = msg;
7381
const protocol = host.includes("localhost") ? "http" : "https";
7482

75-
//TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc)
7683
const response = await this.service.fetch(`${protocol}://${host}${url}`, {
7784
method: "HEAD",
7885
headers: {
@@ -137,6 +144,8 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
137144
const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId);
138145
let nextAlarm = Date.now() + 2_000;
139146

147+
let updatedFailedState: FailedState;
148+
140149
if (existingFailedState) {
141150
if (existingFailedState.retryCount >= 6) {
142151
// We give up after 6 retries and log the error
@@ -147,18 +156,24 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
147156
return;
148157
}
149158
nextAlarm = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000;
150-
this.routeInFailedState.set(msg.MessageDeduplicationId, {
159+
updatedFailedState = {
151160
...existingFailedState,
152161
retryCount: existingFailedState.retryCount + 1,
153162
nextAlarm,
154-
});
163+
};
155164
} else {
156-
this.routeInFailedState.set(msg.MessageDeduplicationId, {
165+
updatedFailedState = {
157166
msg,
158167
retryCount: 1,
159168
nextAlarm,
160-
});
169+
};
161170
}
171+
this.routeInFailedState.set(msg.MessageDeduplicationId, updatedFailedState);
172+
this.sql.exec(
173+
"INSERT OR REPLACE INTO failed_state (id, data) VALUES (?, ?)",
174+
msg.MessageDeduplicationId,
175+
JSON.stringify(updatedFailedState)
176+
);
162177
// We probably want to do something if routeInFailedState is becoming too big, at least log it
163178
await this.addAlarm();
164179
}
@@ -168,10 +183,30 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
168183
if (existingAlarm) return;
169184
if (this.routeInFailedState.size === 0) return;
170185

171-
const nextAlarmToSetup = Array.from(this.routeInFailedState.values()).reduce(
186+
let nextAlarmToSetup = Array.from(this.routeInFailedState.values()).reduce(
172187
(acc, { nextAlarm }) => Math.min(acc, nextAlarm),
173188
Infinity
174189
);
190+
if (nextAlarmToSetup < Date.now()) {
191+
// We don't want to set an alarm in the past
192+
nextAlarmToSetup = Date.now() + 2_000;
193+
}
175194
await this.ctx.storage.setAlarm(nextAlarmToSetup);
176195
}
196+
197+
// This function is used to restore the state of the durable object
198+
// We don't restore the ongoing revalidations because we cannot know in which state they are
199+
// We only restore the failed state and the alarm
200+
async initState() {
201+
// We store the failed state as a blob, we don't want to do anything with it anyway besides restoring
202+
this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT)");
203+
204+
const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state");
205+
for (const row of failedStateCursor) {
206+
this.routeInFailedState.set(row.id, JSON.parse(row.data));
207+
}
208+
209+
// Now that we have restored the failed state, we can restore the alarm as well
210+
await this.addAlarm();
211+
}
177212
}

0 commit comments

Comments
 (0)