Skip to content

Commit a6d7e6a

Browse files
committed
Basic handling of eventual consistency
1 parent 3a2ef00 commit a6d7e6a

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

packages/cloudflare/src/api/durable-objects/queue.spec.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ const createDurableObjectQueue = ({
2929
setAlarm: vi.fn(),
3030
getAlarm: vi.fn(),
3131
sql: {
32-
exec: vi.fn(),
32+
exec: vi.fn().mockImplementation(() => ({
33+
one: vi.fn()
34+
})),
3335
},
3436
},
3537
};

packages/cloudflare/src/api/durable-objects/queue.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
5555
// The route is already in a failed state, it will be retried later
5656
if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return;
5757

58+
// If the last success is newer than the last modified, it's likely that the regional cache is out of date
59+
// We don't need to revalidate in this case
60+
if (this.checkSyncTable(msg)) return;
61+
5862
if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) {
5963
const ongoingRevalidations = this.ongoingRevalidations.values();
6064
// When there is more than the max revalidations, we block concurrency until one of the revalidations finishes
@@ -79,6 +83,7 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
7983
previewModeId,
8084
} = msg;
8185
const protocol = host.includes("localhost") ? "http" : "https";
86+
console.log('previewModeId', previewModeId);
8287

8388
const response = await this.service.fetch(`${protocol}://${host}${url}`, {
8489
method: "HEAD",
@@ -110,6 +115,13 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
110115
// An unknown error occurred, most likely from something in user code like missing auth in the middleware
111116
throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`);
112117
}
118+
// Everything went well, we can update the sync table
119+
// We use unixepoch here because without IO the date doesn't change and it will make the e2e tests fail
120+
this.sql.exec(
121+
"INSERT OR REPLACE INTO sync (id, lastSuccess) VALUES (?, unixepoch())",
122+
// We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different.
123+
`${host}${url}`
124+
);
113125
} catch (e) {
114126
// Do we want to propagate the error to the calling worker?
115127
if (!isOpenNextError(e)) {
@@ -136,7 +148,6 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
136148
: expiredEvents;
137149
for (const event of allEventsToRetry) {
138150
await this.executeRevalidation(event.msg);
139-
this.routeInFailedState.delete(event.msg.MessageDeduplicationId);
140151
}
141152
}
142153

@@ -201,6 +212,9 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
201212
// We store the failed state as a blob, we don't want to do anything with it anyway besides restoring
202213
this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT)");
203214

215+
// We create the sync table to handle eventually consistent incremental cache
216+
this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER)");
217+
204218
const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state");
205219
for (const row of failedStateCursor) {
206220
this.routeInFailedState.set(row.id, JSON.parse(row.data));
@@ -209,4 +223,24 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
209223
// Now that we have restored the failed state, we can restore the alarm as well
210224
await this.addAlarm();
211225
}
226+
227+
/**
228+
*
229+
* @param msg
230+
* @returns `true` if the route has been revalidated since the lastModified from the message, `false` otherwise
231+
*/
232+
checkSyncTable(msg: ExtendedQueueMessage) {
233+
try {
234+
const isNewer = this.sql.exec<{ isNewer: number }>(
235+
"SELECT COUNT(*) as isNewer FROM sync WHERE id = ? AND lastSuccess > ?",
236+
`${msg.MessageBody.host}${msg.MessageBody.url}`,
237+
Math.round(msg.MessageBody.lastModified/1000)
238+
).one().isNewer;
239+
240+
return isNewer > 0;
241+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
242+
}catch(e: unknown){
243+
return false;
244+
}
245+
}
212246
}

0 commit comments

Comments
 (0)