diff --git a/.changeset/nine-months-enter.md b/.changeset/nine-months-enter.md new file mode 100644 index 00000000..9c9e7ac0 --- /dev/null +++ b/.changeset/nine-months-enter.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/cloudflare": patch +--- + +fix blockConcurrencyWhile on DO queue diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts index 4f70f462..dab4ae95 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.spec.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -111,10 +111,6 @@ describe("DurableObjectQueue", () => { expect(queue.ongoingRevalidations.has("id6")).toBe(false); expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]); - // BlockConcurrencyWhile is called twice here, first time during creation of the object and second time when we try to revalidate - // @ts-expect-error - expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(2); - // Here we await the blocked request to ensure it's resolved await blockedReq; // We then need to await for the actual revalidation to finish diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 8d63c9e6..d4c59c30 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -1,4 +1,4 @@ -import { debug, error } from "@opennextjs/aws/adapters/logger.js"; +import { debug, error, warn } from "@opennextjs/aws/adapters/logger.js"; import type { QueueMessage } from "@opennextjs/aws/types/overrides"; import { FatalError, @@ -73,6 +73,11 @@ export class DOQueueHandler extends DurableObject { } async revalidate(msg: QueueMessage) { + if (this.ongoingRevalidations.size > 2 * this.maxRevalidations) { + warn( + `Your durable object has 2 times the maximum number of revalidations (${this.maxRevalidations}) in progress. If this happens often, you should consider increasing the NEXT_CACHE_DO_QUEUE_MAX_REVALIDATION or the number of durable objects with the MAX_REVALIDATE_CONCURRENCY env var.` + ); + } // If there is already an ongoing revalidation, we don't need to revalidate again if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return; @@ -87,14 +92,14 @@ export class DOQueueHandler extends DurableObject { debug( `The maximum number of revalidations (${this.maxRevalidations}) is reached. Blocking until one of the revalidations finishes.` ); - const ongoingRevalidations = this.ongoingRevalidations.values(); - // When there is more than the max revalidations, we block concurrency until one of the revalidations finishes - // We still await the promise to ensure the revalidation is completed - // This is fine because the queue itself run inside a waitUntil - await this.ctx.blockConcurrencyWhile(async () => { + // TODO: need more investigation + // We don't use `blockConcurrencyWhile` here because it block the whole durable object for 30 seconds + // if we exceed the max revalidations too fast + while (this.ongoingRevalidations.size >= this.maxRevalidations) { + const ongoingRevalidations = this.ongoingRevalidations.values(); debug(`Waiting for one of the revalidations to finish`); await Promise.race(ongoingRevalidations); - }); + } } const revalidationPromise = this.executeRevalidation(msg); @@ -102,7 +107,6 @@ export class DOQueueHandler extends DurableObject { // We store the promise to dedupe the revalidation this.ongoingRevalidations.set(msg.MessageDeduplicationId, revalidationPromise); - // TODO: check if the object stays up during waitUntil so that the internal state is maintained this.ctx.waitUntil(revalidationPromise); } @@ -121,6 +125,7 @@ export class DOQueueHandler extends DurableObject { "x-prerender-revalidate": process.env.__NEXT_PREVIEW_MODE_ID!, "x-isr": "1", }, + // This one is kind of problematic, it will always show the wall time of the revalidation to `this.revalidationTimeout` signal: AbortSignal.timeout(this.revalidationTimeout), }); // Now we need to handle errors from the fetch @@ -260,6 +265,7 @@ export class DOQueueHandler extends DurableObject { this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER, buildId TEXT)"); // Before doing anything else, we clear the DB for any potential old data + // TODO: extract this to a function so that it could be called by the user at another time than init this.sql.exec("DELETE FROM failed_state WHERE buildId != ?", process.env.__NEXT_BUILD_ID); this.sql.exec("DELETE FROM sync WHERE buildId != ?", process.env.__NEXT_BUILD_ID);