Skip to content

Commit 6b6416d

Browse files
committed
restore failed state
1 parent b18d4fe commit 6b6416d

File tree

3 files changed

+64
-18
lines changed

3 files changed

+64
-18
lines changed

examples/e2e/app-router/wrangler.jsonc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"migrations": [
2020
{
2121
"tag": "v1",
22-
"new_classes": ["DurableObjectQueueHandler"]
22+
"new_sqlite_classes": ["DurableObjectQueueHandler"]
2323
}
2424
],
2525
"kv_namespaces": [

packages/cloudflare/src/api/durable-objects/queue.spec.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ const createDurableObjectQueue = ({
2626
storage: {
2727
setAlarm: vi.fn(),
2828
getAlarm: vi.fn(),
29+
sql: {
30+
exec: vi.fn(),
31+
},
2932
},
3033
};
3134
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -102,8 +105,9 @@ describe("DurableObjectQueue", () => {
102105
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
103106
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);
104107

108+
// BlockConcurrencyWhile is called twice here, first time during creation of the object and second time when we try to revalidate
105109
// @ts-expect-error
106-
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1);
110+
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(2);
107111

108112
// Here we await the blocked request to ensure it's resolved
109113
await blockedReq;
@@ -201,9 +205,10 @@ describe("DurableObjectQueue", () => {
201205

202206
it("should add an alarm if there are failed states", async () => {
203207
const queue = createDurableObjectQueue({ fetchDuration: 10 });
204-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
208+
const nextAlarm = Date.now() + 1000;
209+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: nextAlarm });
205210
await queue.addAlarm();
206-
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000);
211+
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(nextAlarm);
207212
});
208213

209214
it("should not add an alarm if there is already an alarm set", async () => {
@@ -217,10 +222,16 @@ describe("DurableObjectQueue", () => {
217222

218223
it("should set the alarm to the lowest nextAlarm", async () => {
219224
const queue = createDurableObjectQueue({ fetchDuration: 10 });
220-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
221-
queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 });
225+
const nextAlarm = Date.now() + 1000;
226+
const firstAlarm = Date.now() + 500;
227+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: nextAlarm });
228+
queue.routeInFailedState.set("id2", {
229+
msg: createMessage("id2"),
230+
retryCount: 0,
231+
nextAlarmMs: firstAlarm,
232+
});
222233
await queue.addAlarm();
223-
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500);
234+
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(firstAlarm);
224235
});
225236
});
226237

packages/cloudflare/src/api/durable-objects/queue.ts

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,21 @@ interface ExtendedQueueMessage extends QueueMessage {
1515
previewModeId: string;
1616
}
1717

18+
interface FailedState {
19+
msg: ExtendedQueueMessage;
20+
retryCount: number;
21+
nextAlarmMs: number;
22+
}
23+
1824
export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
1925
// Ongoing revalidations are deduped by the deduplication id
2026
// Since this is running in waitUntil, we expect the durable object state to persist this during the duration of the revalidation
2127
// TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top)
2228
ongoingRevalidations = new Map<string, Promise<void>>();
2329

24-
// TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage
25-
routeInFailedState = new Map<
26-
string,
27-
{ msg: ExtendedQueueMessage; retryCount: number; nextAlarmMs: number }
28-
>();
30+
sql: SqlStorage;
31+
32+
routeInFailedState = new Map<string, FailedState>();
2933

3034
service: NonNullable<CloudflareEnv["NEXT_CACHE_REVALIDATION_WORKER"]>;
3135

@@ -37,6 +41,10 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
3741
this.service = env.NEXT_CACHE_REVALIDATION_WORKER!;
3842
// If there is no service binding, we throw an error because we can't revalidate without it
3943
if (!this.service) throw new IgnorableError("No service binding for cache revalidation worker");
44+
this.sql = ctx.storage.sql;
45+
46+
// We restore the state
47+
ctx.blockConcurrencyWhile(() => this.initState());
4048
}
4149

4250
async revalidate(msg: ExtendedQueueMessage) {
@@ -71,7 +79,6 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
7179
} = msg;
7280
const protocol = host.includes("localhost") ? "http" : "https";
7381

74-
//TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc)
7582
const response = await this.service.fetch(`${protocol}://${host}${url}`, {
7683
method: "HEAD",
7784
headers: {
@@ -133,6 +140,8 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
133140
async addToFailedState(msg: ExtendedQueueMessage) {
134141
const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId);
135142

143+
let updatedFailedState: FailedState;
144+
136145
if (existingFailedState) {
137146
if (existingFailedState.retryCount >= 6) {
138147
// We give up after 6 retries and log the error
@@ -143,18 +152,24 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
143152
return;
144153
}
145154
const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000;
146-
this.routeInFailedState.set(msg.MessageDeduplicationId, {
155+
updatedFailedState = {
147156
...existingFailedState,
148157
retryCount: existingFailedState.retryCount + 1,
149158
nextAlarmMs,
150-
});
159+
};
151160
} else {
152-
this.routeInFailedState.set(msg.MessageDeduplicationId, {
161+
updatedFailedState = {
153162
msg,
154163
retryCount: 1,
155164
nextAlarmMs: Date.now() + 2_000,
156-
});
165+
};
157166
}
167+
this.routeInFailedState.set(msg.MessageDeduplicationId, updatedFailedState);
168+
this.sql.exec(
169+
"INSERT OR REPLACE INTO failed_state (id, data) VALUES (?, ?)",
170+
msg.MessageDeduplicationId,
171+
JSON.stringify(updatedFailedState)
172+
);
158173
// We probably want to do something if routeInFailedState is becoming too big, at least log it
159174
await this.addAlarm();
160175
}
@@ -164,9 +179,29 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
164179
if (existingAlarm) return;
165180
if (this.routeInFailedState.size === 0) return;
166181

167-
const nextAlarmToSetup = Math.min(
182+
let nextAlarmToSetup = Math.min(
168183
...Array.from(this.routeInFailedState.values()).map(({ nextAlarmMs }) => nextAlarmMs)
169184
);
185+
if (nextAlarmToSetup < Date.now()) {
186+
// We don't want to set an alarm in the past
187+
nextAlarmToSetup = Date.now() + 2_000;
188+
}
170189
await this.ctx.storage.setAlarm(nextAlarmToSetup);
171190
}
191+
192+
// This function is used to restore the state of the durable object
193+
// We don't restore the ongoing revalidations because we cannot know in which state they are
194+
// We only restore the failed state and the alarm
195+
async initState() {
196+
// We store the failed state as a blob, we don't want to do anything with it anyway besides restoring
197+
this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT)");
198+
199+
const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state");
200+
for (const row of failedStateCursor) {
201+
this.routeInFailedState.set(row.id, JSON.parse(row.data));
202+
}
203+
204+
// Now that we have restored the failed state, we can restore the alarm as well
205+
await this.addAlarm();
206+
}
172207
}

0 commit comments

Comments
 (0)