Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/silver-baboons-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
---

add an option for disabling sqlite on the durable object queue
3 changes: 3 additions & 0 deletions packages/cloudflare/src/api/cloudflare-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ declare global {
REVALIDATION_RETRY_INTERVAL_MS?: string;
// The maximum number of attempts that can be made to revalidate a path
MAX_REVALIDATION_ATTEMPTS?: string;
// Disable SQLite for the durable object queue handler
// This can be safely used if you don't use an eventually consistent incremental cache (i.e. R2 without the regional cache for example)
REVALIDATION_DO_DISABLE_SQLITE?: string;
}
}

Expand Down
28 changes: 28 additions & 0 deletions packages/cloudflare/src/api/durable-objects/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ const createDurableObjectQueue = ({
fetchDuration,
statusCode,
headers,
disableSQLite,
}: {
fetchDuration: number;
statusCode?: number;
headers?: Headers;
disableSQLite?: boolean;
}) => {
const mockState = {
waitUntil: vi.fn(),
Expand Down Expand Up @@ -52,6 +54,7 @@ const createDurableObjectQueue = ({
),
connect: vi.fn(),
},
REVALIDATION_DO_DISABLE_SQLITE: disableSQLite ? "true" : undefined,
});
};

Expand Down Expand Up @@ -323,4 +326,29 @@ describe("DurableObjectQueue", () => {
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
});
});

describe("disableSQLite", () => {
it("should not initialize the sqlite storage", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10, disableSQLite: true });
expect(queue.sql.exec).not.toHaveBeenCalled();
});

it("should not write to the sqlite storage on failed state", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10, disableSQLite: true });
await queue.addToFailedState(createMessage("id"));
expect(queue.sql.exec).not.toHaveBeenCalled();
});

it("should not read from the sqlite storage on checkSyncTable", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10, disableSQLite: true });
queue.checkSyncTable(createMessage("id"));
expect(queue.sql.exec).not.toHaveBeenCalled();
});

it("should not write to sql on successful revalidation", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10, disableSQLite: true });
await queue.revalidate(createMessage("id"));
expect(queue.sql.exec).not.toHaveBeenCalled();
});
});
});
47 changes: 28 additions & 19 deletions packages/cloudflare/src/api/durable-objects/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
readonly revalidationTimeout: number;
readonly revalidationRetryInterval: number;
readonly maxRevalidationAttempts: number;
readonly disableSQLite: boolean;

constructor(ctx: DurableObjectState, env: CloudflareEnv) {
super(ctx, env);
Expand All @@ -44,12 +45,6 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
if (!this.service) throw new IgnorableError("No service binding for cache revalidation worker");
this.sql = ctx.storage.sql;

// We restore the state
ctx.blockConcurrencyWhile(async () => {
debug(`Restoring the state of the durable object`);
await this.initState();
});

this.maxRevalidations = env.MAX_REVALIDATION_BY_DURABLE_OBJECT
? parseInt(env.MAX_REVALIDATION_BY_DURABLE_OBJECT)
: DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT;
Expand All @@ -66,6 +61,14 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
? parseInt(env.MAX_REVALIDATION_ATTEMPTS)
: DEFAULT_MAX_REVALIDATION_ATTEMPTS;

this.disableSQLite = env.REVALIDATION_DO_DISABLE_SQLITE === "true";

// We restore the state
ctx.blockConcurrencyWhile(async () => {
debug(`Restoring the state of the durable object`);
await this.initState();
});

debug(`Durable object initialized`);
}

Expand Down Expand Up @@ -103,7 +106,7 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
this.ctx.waitUntil(revalidationPromise);
}

private async executeRevalidation(msg: QueueMessage) {
async executeRevalidation(msg: QueueMessage) {
try {
debug(`Revalidating ${msg.MessageBody.host}${msg.MessageBody.url}`);
const {
Expand Down Expand Up @@ -151,12 +154,14 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
// Everything went well, we can update the sync table
// We use unixepoch here,it also works with Date.now()/1000, but not with Date.now() alone.
// TODO: This needs to be investigated
this.sql.exec(
"INSERT OR REPLACE INTO sync (id, lastSuccess, buildId) VALUES (?, unixepoch(), ?)",
// We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different.
`${host}${url}`,
process.env.__NEXT_BUILD_ID
);
if (!this.disableSQLite) {
this.sql.exec(
"INSERT OR REPLACE INTO sync (id, lastSuccess, buildId) VALUES (?, unixepoch(), ?)",
// We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different.
`${host}${url}`,
process.env.__NEXT_BUILD_ID
);
}
// If everything went well, we can remove the route from the failed state
this.routeInFailedState.delete(msg.MessageDeduplicationId);
} catch (e) {
Expand Down Expand Up @@ -217,12 +222,14 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
};
}
this.routeInFailedState.set(msg.MessageDeduplicationId, updatedFailedState);
this.sql.exec(
"INSERT OR REPLACE INTO failed_state (id, data, buildId) VALUES (?, ?, ?)",
msg.MessageDeduplicationId,
JSON.stringify(updatedFailedState),
process.env.__NEXT_BUILD_ID
);
if (!this.disableSQLite) {
this.sql.exec(
"INSERT OR REPLACE INTO failed_state (id, data, buildId) VALUES (?, ?, ?)",
msg.MessageDeduplicationId,
JSON.stringify(updatedFailedState),
process.env.__NEXT_BUILD_ID
);
}
// We probably want to do something if routeInFailedState is becoming too big, at least log it
await this.addAlarm();
}
Expand All @@ -246,6 +253,7 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
// We don't restore the ongoing revalidations because we cannot know in which state they are
// We only restore the failed state and the alarm
async initState() {
if (this.disableSQLite) return;
// We store the failed state as a blob, we don't want to do anything with it anyway besides restoring
this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT, buildId TEXT)");

Expand All @@ -272,6 +280,7 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
*/
checkSyncTable(msg: QueueMessage) {
try {
if (this.disableSQLite) return false;
const numNewer = this.sql
.exec<{
numNewer: number;
Expand Down