Skip to content

Commit 628c25e

Browse files
committed
Basic handling of eventual consistency
1 parent 6b6416d commit 628c25e

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

packages/cloudflare/src/api/durable-objects/queue.spec.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ const createDurableObjectQueue = ({
2727
setAlarm: vi.fn(),
2828
getAlarm: vi.fn(),
2929
sql: {
30-
exec: vi.fn(),
30+
exec: vi.fn().mockImplementation(() => ({
31+
one: vi.fn()
32+
})),
3133
},
3234
},
3335
};

packages/cloudflare/src/api/durable-objects/queue.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
5454
// The route is already in a failed state, it will be retried later
5555
if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return;
5656

57+
// If the last success is newer than the last modified, it's likely that the regional cache is out of date
58+
// We don't need to revalidate in this case
59+
if (this.checkSyncTable(msg)) return;
60+
5761
if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) {
5862
const ongoingRevalidations = this.ongoingRevalidations.values();
5963
// When there is more than the max revalidations, we block concurrency until one of the revalidations finishes
@@ -78,6 +82,7 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
7882
previewModeId,
7983
} = msg;
8084
const protocol = host.includes("localhost") ? "http" : "https";
85+
console.log('previewModeId', previewModeId);
8186

8287
const response = await this.service.fetch(`${protocol}://${host}${url}`, {
8388
method: "HEAD",
@@ -109,6 +114,13 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
109114
// An unknown error occurred, most likely from something in user code like missing auth in the middleware
110115
throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`);
111116
}
117+
// Everything went well, we can update the sync table
118+
// We use unixepoch here because without IO the date doesn't change and it will make the e2e tests fail
119+
this.sql.exec(
120+
"INSERT OR REPLACE INTO sync (id, lastSuccess) VALUES (?, unixepoch())",
121+
// We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different.
122+
`${host}${url}`
123+
);
112124
} catch (e) {
113125
// Do we want to propagate the error to the calling worker?
114126
if (!isOpenNextError(e)) {
@@ -133,7 +145,6 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
133145
const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents;
134146
for (const event of allEventsToRetry) {
135147
await this.executeRevalidation(event.msg);
136-
this.routeInFailedState.delete(event.msg.MessageDeduplicationId);
137148
}
138149
}
139150

@@ -196,6 +207,9 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
196207
// We store the failed state as a blob, we don't want to do anything with it anyway besides restoring
197208
this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT)");
198209

210+
// We create the sync table to handle eventually consistent incremental cache
211+
this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER)");
212+
199213
const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state");
200214
for (const row of failedStateCursor) {
201215
this.routeInFailedState.set(row.id, JSON.parse(row.data));
@@ -204,4 +218,24 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
204218
// Now that we have restored the failed state, we can restore the alarm as well
205219
await this.addAlarm();
206220
}
221+
222+
/**
223+
*
224+
* @param msg
225+
* @returns `true` if the route has been revalidated since the lastModified from the message, `false` otherwise
226+
*/
227+
checkSyncTable(msg: ExtendedQueueMessage) {
228+
try {
229+
const isNewer = this.sql.exec<{ isNewer: number }>(
230+
"SELECT COUNT(*) as isNewer FROM sync WHERE id = ? AND lastSuccess > ?",
231+
`${msg.MessageBody.host}${msg.MessageBody.url}`,
232+
Math.round(msg.MessageBody.lastModified/1000)
233+
).one().isNewer;
234+
235+
return isNewer > 0;
236+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
237+
}catch(e: unknown){
238+
return false;
239+
}
240+
}
207241
}

0 commit comments

Comments
 (0)