Skip to content

Commit 60171f5

Browse files
authored
Durable Queue Storage (#460)
* restore failed state * Basic handling of eventual consistency * fix issue with restored state * precompile the durable object * remove old unused data * Add some customization * added debug info * fix rebase * lint fix * changeset * make params readonly * review fix
1 parent 9f32a0f commit 60171f5

File tree

10 files changed

+231
-49
lines changed

10 files changed

+231
-49
lines changed

.changeset/smart-bugs-play.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@opennextjs/cloudflare": patch
3+
---
4+
5+
feat: durable object de-duping revalidation queue

examples/e2e/app-router/wrangler.jsonc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
"migrations": [
2020
{
2121
"tag": "v1",
22-
"new_classes": ["DurableObjectQueueHandler"]
22+
"new_sqlite_classes": ["DurableObjectQueueHandler"]
2323
}
2424
],
2525
"kv_namespaces": [

packages/cloudflare/src/api/cloudflare-context.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,17 @@ declare global {
1818
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
1919
// Asset binding
2020
ASSETS?: Fetcher;
21+
22+
// Below are the potential environment variables that can be set by the user to configure the durable object queue handler
23+
// The max number of revalidations that can be processed by the durable worker at the same time
24+
MAX_REVALIDATION_BY_DURABLE_OBJECT?: string;
25+
// The max time in milliseconds that a revalidation can take before being considered as failed
26+
REVALIDATION_TIMEOUT_MS?: string;
27+
// The amount of time after which a revalidation will be attempted again if it failed
28+
// If it fails again it will exponentially back off until it reaches the max retry interval
29+
REVALIDATION_RETRY_INTERVAL_MS?: string;
30+
// The maximum number of attempts that can be made to revalidate a path
31+
MAX_REVALIDATION_ATTEMPTS?: string;
2132
}
2233
}
2334

packages/cloudflare/src/api/durable-objects/queue.spec.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ const createDurableObjectQueue = ({
2626
storage: {
2727
setAlarm: vi.fn(),
2828
getAlarm: vi.fn(),
29+
sql: {
30+
exec: vi.fn().mockImplementation(() => ({
31+
one: vi.fn(),
32+
})),
33+
},
2934
},
3035
};
3136
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -60,6 +65,7 @@ const createMessage = (dedupId: string, lastModified = Date.now()) => ({
6065
describe("DurableObjectQueue", () => {
6166
describe("successful revalidation", () => {
6267
it("should process a single revalidation", async () => {
68+
process.env.__NEXT_PREVIEW_MODE_ID = "test";
6369
const queue = createDurableObjectQueue({ fetchDuration: 10 });
6470
const firstRequest = await queue.revalidate(createMessage("id"));
6571
expect(firstRequest).toBeUndefined();
@@ -102,8 +108,9 @@ describe("DurableObjectQueue", () => {
102108
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
103109
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);
104110

111+
// BlockConcurrencyWhile is called twice here, first time during creation of the object and second time when we try to revalidate
105112
// @ts-expect-error
106-
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1);
113+
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(2);
107114

108115
// Here we await the blocked request to ensure it's resolved
109116
await blockedReq;
@@ -201,9 +208,10 @@ describe("DurableObjectQueue", () => {
201208

202209
it("should add an alarm if there are failed states", async () => {
203210
const queue = createDurableObjectQueue({ fetchDuration: 10 });
204-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
211+
const nextAlarmMs = Date.now() + 1000;
212+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs });
205213
await queue.addAlarm();
206-
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000);
214+
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(nextAlarmMs);
207215
});
208216

209217
it("should not add an alarm if there is already an alarm set", async () => {
@@ -217,10 +225,16 @@ describe("DurableObjectQueue", () => {
217225

218226
it("should set the alarm to the lowest nextAlarm", async () => {
219227
const queue = createDurableObjectQueue({ fetchDuration: 10 });
220-
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
221-
queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 });
228+
const nextAlarmMs = Date.now() + 1000;
229+
const firstAlarm = Date.now() + 500;
230+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs });
231+
queue.routeInFailedState.set("id2", {
232+
msg: createMessage("id2"),
233+
retryCount: 0,
234+
nextAlarmMs: firstAlarm,
235+
});
222236
await queue.addAlarm();
223-
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500);
237+
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(firstAlarm);
224238
});
225239
});
226240

packages/cloudflare/src/api/durable-objects/queue.ts

Lines changed: 148 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { error } from "@opennextjs/aws/adapters/logger.js";
1+
import { debug, error } from "@opennextjs/aws/adapters/logger.js";
22
import type { QueueMessage } from "@opennextjs/aws/types/overrides";
33
import {
44
FatalError,
@@ -8,11 +8,15 @@ import {
88
} from "@opennextjs/aws/utils/error.js";
99
import { DurableObject } from "cloudflare:workers";
1010

11-
const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5;
11+
const DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT = 5;
1212
const DEFAULT_REVALIDATION_TIMEOUT_MS = 10_000;
13+
const DEFAULT_REVALIDATION_RETRY_INTERVAL_MS = 2_000;
14+
const DEFAULT_MAX_REVALIDATION_ATTEMPTS = 6;
1315

14-
interface ExtendedQueueMessage extends QueueMessage {
15-
previewModeId: string;
16+
interface FailedState {
17+
msg: QueueMessage;
18+
retryCount: number;
19+
nextAlarmMs: number;
1620
}
1721

1822
export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
@@ -21,37 +25,73 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
2125
// TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top)
2226
ongoingRevalidations = new Map<string, Promise<void>>();
2327

24-
// TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage
25-
routeInFailedState = new Map<
26-
string,
27-
{ msg: ExtendedQueueMessage; retryCount: number; nextAlarmMs: number }
28-
>();
28+
sql: SqlStorage;
29+
30+
routeInFailedState = new Map<string, FailedState>();
2931

3032
service: NonNullable<CloudflareEnv["NEXT_CACHE_REVALIDATION_WORKER"]>;
3133

32-
// TODO: allow this to be configurable - How do we want todo that? env variable? passed down from the queue override ?
33-
maxRevalidations = MAX_REVALIDATION_BY_DURABLE_OBJECT;
34+
// Configurable params
35+
readonly maxRevalidations: number;
36+
readonly revalidationTimeout: number;
37+
readonly revalidationRetryInterval: number;
38+
readonly maxRevalidationAttempts: number;
3439

3540
constructor(ctx: DurableObjectState, env: CloudflareEnv) {
3641
super(ctx, env);
3742
this.service = env.NEXT_CACHE_REVALIDATION_WORKER!;
3843
// If there is no service binding, we throw an error because we can't revalidate without it
3944
if (!this.service) throw new IgnorableError("No service binding for cache revalidation worker");
45+
this.sql = ctx.storage.sql;
46+
47+
// We restore the state
48+
ctx.blockConcurrencyWhile(async () => {
49+
debug(`Restoring the state of the durable object`);
50+
await this.initState();
51+
});
52+
53+
this.maxRevalidations = env.MAX_REVALIDATION_BY_DURABLE_OBJECT
54+
? parseInt(env.MAX_REVALIDATION_BY_DURABLE_OBJECT)
55+
: DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT;
56+
57+
this.revalidationTimeout = env.REVALIDATION_TIMEOUT_MS
58+
? parseInt(env.REVALIDATION_TIMEOUT_MS)
59+
: DEFAULT_REVALIDATION_TIMEOUT_MS;
60+
61+
this.revalidationRetryInterval = env.REVALIDATION_RETRY_INTERVAL_MS
62+
? parseInt(env.REVALIDATION_RETRY_INTERVAL_MS)
63+
: DEFAULT_REVALIDATION_RETRY_INTERVAL_MS;
64+
65+
this.maxRevalidationAttempts = env.MAX_REVALIDATION_ATTEMPTS
66+
? parseInt(env.MAX_REVALIDATION_ATTEMPTS)
67+
: DEFAULT_MAX_REVALIDATION_ATTEMPTS;
68+
69+
debug(`Durable object initialized`);
4070
}
4171

42-
async revalidate(msg: ExtendedQueueMessage) {
72+
async revalidate(msg: QueueMessage) {
4373
// If there is already an ongoing revalidation, we don't need to revalidate again
4474
if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return;
4575

4676
// The route is already in a failed state, it will be retried later
4777
if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return;
4878

49-
if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) {
79+
// If the last success is newer than the last modified, it's likely that the regional cache is out of date
80+
// We don't need to revalidate in this case
81+
if (this.checkSyncTable(msg)) return;
82+
83+
if (this.ongoingRevalidations.size >= this.maxRevalidations) {
84+
debug(
85+
`The maximum number of revalidations (${this.maxRevalidations}) is reached. Blocking until one of the revalidations finishes.`
86+
);
5087
const ongoingRevalidations = this.ongoingRevalidations.values();
5188
// When there is more than the max revalidations, we block concurrency until one of the revalidations finishes
5289
// We still await the promise to ensure the revalidation is completed
5390
// This is fine because the queue itself run inside a waitUntil
54-
await this.ctx.blockConcurrencyWhile(() => Promise.race(ongoingRevalidations));
91+
await this.ctx.blockConcurrencyWhile(async () => {
92+
debug(`Waiting for one of the revalidations to finish`);
93+
await Promise.race(ongoingRevalidations);
94+
});
5595
}
5696

5797
const revalidationPromise = this.executeRevalidation(msg);
@@ -63,31 +103,33 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
63103
this.ctx.waitUntil(revalidationPromise);
64104
}
65105

66-
private async executeRevalidation(msg: ExtendedQueueMessage) {
106+
private async executeRevalidation(msg: QueueMessage) {
67107
try {
108+
debug(`Revalidating ${msg.MessageBody.host}${msg.MessageBody.url}`);
68109
const {
69110
MessageBody: { host, url },
70-
previewModeId,
71111
} = msg;
72112
const protocol = host.includes("localhost") ? "http" : "https";
73113

74-
//TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc)
75114
const response = await this.service.fetch(`${protocol}://${host}${url}`, {
76115
method: "HEAD",
77116
headers: {
78-
"x-prerender-revalidate": previewModeId,
117+
// This is defined during build
118+
"x-prerender-revalidate": process.env.__NEXT_PREVIEW_MODE_ID!,
79119
"x-isr": "1",
80120
},
81-
signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS),
121+
signal: AbortSignal.timeout(this.revalidationTimeout),
82122
});
83123
// Now we need to handle errors from the fetch
84124
if (response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") {
85-
// Something is very wrong here, it means that either the page is not ISR/SSG (and we shouldn't be here) or the `x-prerender-revalidate` header is not correct (and it should not happen either)
125+
this.routeInFailedState.delete(msg.MessageDeduplicationId);
86126
throw new FatalError(
87127
`The revalidation for ${host}${url} cannot be done. This error should never happen.`
88128
);
89129
} else if (response.status === 404) {
90130
// The page is not found, we should not revalidate it
131+
// We remove the route from the failed state because it might be expected (i.e. a route that was deleted)
132+
this.routeInFailedState.delete(msg.MessageDeduplicationId);
91133
throw new IgnorableError(
92134
`The revalidation for ${host}${url} cannot be done because the page is not found. It's either expected or an error in user code itself`
93135
);
@@ -100,8 +142,23 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
100142
} else if (response.status !== 200) {
101143
// TODO: check if we need to handle cloudflare specific status codes/errors
102144
// An unknown error occurred, most likely from something in user code like missing auth in the middleware
145+
146+
// We probably want to retry in this case as well
147+
await this.addToFailedState(msg);
148+
103149
throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`);
104150
}
151+
// Everything went well, we can update the sync table
152+
// We use unixepoch here,it also works with Date.now()/1000, but not with Date.now() alone.
153+
// TODO: This needs to be investigated
154+
this.sql.exec(
155+
"INSERT OR REPLACE INTO sync (id, lastSuccess, buildId) VALUES (?, unixepoch(), ?)",
156+
// We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different.
157+
`${host}${url}`,
158+
process.env.__NEXT_BUILD_ID
159+
);
160+
// If everything went well, we can remove the route from the failed state
161+
this.routeInFailedState.delete(msg.MessageDeduplicationId);
105162
} catch (e) {
106163
// Do we want to propagate the error to the calling worker?
107164
if (!isOpenNextError(e)) {
@@ -125,36 +182,47 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
125182
);
126183
const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents;
127184
for (const event of allEventsToRetry) {
185+
debug(`Retrying revalidation for ${event.msg.MessageBody.host}${event.msg.MessageBody.url}`);
128186
await this.executeRevalidation(event.msg);
129-
this.routeInFailedState.delete(event.msg.MessageDeduplicationId);
130187
}
131188
}
132189

133-
async addToFailedState(msg: ExtendedQueueMessage) {
190+
async addToFailedState(msg: QueueMessage) {
191+
debug(`Adding ${msg.MessageBody.host}${msg.MessageBody.url} to the failed state`);
134192
const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId);
135193

194+
let updatedFailedState: FailedState;
195+
136196
if (existingFailedState) {
137-
if (existingFailedState.retryCount >= 6) {
197+
if (existingFailedState.retryCount >= this.maxRevalidationAttempts) {
138198
// We give up after 6 retries and log the error
139199
error(
140200
`The revalidation for ${msg.MessageBody.host}${msg.MessageBody.url} has failed after 6 retries. It will not be tried again, but subsequent ISR requests will retry.`
141201
);
142202
this.routeInFailedState.delete(msg.MessageDeduplicationId);
143203
return;
144204
}
145-
const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000;
146-
this.routeInFailedState.set(msg.MessageDeduplicationId, {
205+
const nextAlarmMs =
206+
Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * this.revalidationRetryInterval;
207+
updatedFailedState = {
147208
...existingFailedState,
148209
retryCount: existingFailedState.retryCount + 1,
149210
nextAlarmMs,
150-
});
211+
};
151212
} else {
152-
this.routeInFailedState.set(msg.MessageDeduplicationId, {
213+
updatedFailedState = {
153214
msg,
154215
retryCount: 1,
155216
nextAlarmMs: Date.now() + 2_000,
156-
});
217+
};
157218
}
219+
this.routeInFailedState.set(msg.MessageDeduplicationId, updatedFailedState);
220+
this.sql.exec(
221+
"INSERT OR REPLACE INTO failed_state (id, data, buildId) VALUES (?, ?, ?)",
222+
msg.MessageDeduplicationId,
223+
JSON.stringify(updatedFailedState),
224+
process.env.__NEXT_BUILD_ID
225+
);
158226
// We probably want to do something if routeInFailedState is becoming too big, at least log it
159227
await this.addAlarm();
160228
}
@@ -164,9 +232,60 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
164232
if (existingAlarm) return;
165233
if (this.routeInFailedState.size === 0) return;
166234

167-
const nextAlarmToSetup = Math.min(
235+
let nextAlarmToSetup = Math.min(
168236
...Array.from(this.routeInFailedState.values()).map(({ nextAlarmMs }) => nextAlarmMs)
169237
);
238+
if (nextAlarmToSetup < Date.now()) {
239+
// We don't want to set an alarm in the past
240+
nextAlarmToSetup = Date.now() + this.revalidationRetryInterval;
241+
}
170242
await this.ctx.storage.setAlarm(nextAlarmToSetup);
171243
}
244+
245+
// This function is used to restore the state of the durable object
246+
// We don't restore the ongoing revalidations because we cannot know in which state they are
247+
// We only restore the failed state and the alarm
248+
async initState() {
249+
// We store the failed state as a blob, we don't want to do anything with it anyway besides restoring
250+
this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT, buildId TEXT)");
251+
252+
// We create the sync table to handle eventually consistent incremental cache
253+
this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER, buildId TEXT)");
254+
255+
// Before doing anything else, we clear the DB for any potential old data
256+
this.sql.exec("DELETE FROM failed_state WHERE buildId != ?", process.env.__NEXT_BUILD_ID);
257+
this.sql.exec("DELETE FROM sync WHERE buildId != ?", process.env.__NEXT_BUILD_ID);
258+
259+
const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state");
260+
for (const row of failedStateCursor) {
261+
this.routeInFailedState.set(row.id, JSON.parse(row.data));
262+
}
263+
264+
// Now that we have restored the failed state, we can restore the alarm as well
265+
await this.addAlarm();
266+
}
267+
268+
/**
269+
*
270+
* @param msg
271+
* @returns `true` if the route has been revalidated since the lastModified from the message, `false` otherwise
272+
*/
273+
checkSyncTable(msg: QueueMessage) {
274+
try {
275+
const numNewer = this.sql
276+
.exec<{
277+
numNewer: number;
278+
}>(
279+
"SELECT COUNT(*) as numNewer FROM sync WHERE id = ? AND lastSuccess > ? LIMIT 1",
280+
`${msg.MessageBody.host}${msg.MessageBody.url}`,
281+
Math.round(msg.MessageBody.lastModified / 1000)
282+
)
283+
.one().numNewer;
284+
285+
return numNewer > 0;
286+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
287+
} catch (e: unknown) {
288+
return false;
289+
}
290+
}
172291
}

packages/cloudflare/src/api/durable-queue.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@ export default {
1111

1212
const id = durableObject.idFromName(msg.MessageGroupId);
1313
const stub = durableObject.get(id);
14-
const previewModeId = process.env.__NEXT_PREVIEW_MODE_ID!;
1514
await stub.revalidate({
1615
...msg,
17-
previewModeId,
1816
});
1917
},
2018
} satisfies Queue;

0 commit comments

Comments
 (0)