Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nine-months-enter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
---

fix blockConcurrencyWhile on DO queue
4 changes: 0 additions & 4 deletions packages/cloudflare/src/api/durable-objects/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ describe("DurableObjectQueue", () => {
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);

// BlockConcurrencyWhile is called twice here, first time during creation of the object and second time when we try to revalidate
// @ts-expect-error
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(2);

// Here we await the blocked request to ensure it's resolved
await blockedReq;
// We then need to await for the actual revalidation to finish
Expand Down
22 changes: 14 additions & 8 deletions packages/cloudflare/src/api/durable-objects/queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { debug, error } from "@opennextjs/aws/adapters/logger.js";
import { debug, error, warn } from "@opennextjs/aws/adapters/logger.js";
import type { QueueMessage } from "@opennextjs/aws/types/overrides";
import {
FatalError,
Expand Down Expand Up @@ -73,6 +73,11 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> {
}

async revalidate(msg: QueueMessage) {
if (this.ongoingRevalidations.size > 2 * this.maxRevalidations) {
warn(
`Your durable object has 2 times the maximum number of revalidations (${this.maxRevalidations}) in progress. If this happens often, you should consider increasing the NEXT_CACHE_DO_QUEUE_MAX_REVALIDATION or the number of durable objects with the MAX_REVALIDATE_CONCURRENCY env var.`
);
}
// If there is already an ongoing revalidation, we don't need to revalidate again
if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return;

Expand All @@ -87,22 +92,21 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> {
debug(
`The maximum number of revalidations (${this.maxRevalidations}) is reached. Blocking until one of the revalidations finishes.`
);
const ongoingRevalidations = this.ongoingRevalidations.values();
// When there is more than the max revalidations, we block concurrency until one of the revalidations finishes
// We still await the promise to ensure the revalidation is completed
// This is fine because the queue itself run inside a waitUntil
await this.ctx.blockConcurrencyWhile(async () => {
// TODO: need more investigation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if you can have a minimal repro and create an issue on the workerd repro and link it here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to work on that. It's a bit tricky to reproduce locally

// We don't use `blockConcurrencyWhile` here because it block the whole durable object for 30 seconds
// if we exceed the max revalidations too fast
while (this.ongoingRevalidations.size >= this.maxRevalidations) {
const ongoingRevalidations = this.ongoingRevalidations.values();
debug(`Waiting for one of the revalidations to finish`);
await Promise.race(ongoingRevalidations);
});
}
}

const revalidationPromise = this.executeRevalidation(msg);

// We store the promise to dedupe the revalidation
this.ongoingRevalidations.set(msg.MessageDeduplicationId, revalidationPromise);

// TODO: check if the object stays up during waitUntil so that the internal state is maintained
this.ctx.waitUntil(revalidationPromise);
}

Expand All @@ -121,6 +125,7 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> {
"x-prerender-revalidate": process.env.__NEXT_PREVIEW_MODE_ID!,
"x-isr": "1",
},
// This one is kind of problematic, it will always show the wall time of the revalidation to `this.revalidationTimeout`
signal: AbortSignal.timeout(this.revalidationTimeout),
});
// Now we need to handle errors from the fetch
Expand Down Expand Up @@ -260,6 +265,7 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> {
this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER, buildId TEXT)");

// Before doing anything else, we clear the DB for any potential old data
// TODO: extract this to a function so that it could be called by the user at another time than init
this.sql.exec("DELETE FROM failed_state WHERE buildId != ?", process.env.__NEXT_BUILD_ID);
this.sql.exec("DELETE FROM sync WHERE buildId != ?", process.env.__NEXT_BUILD_ID);

Expand Down