Skip to content

Commit ec9ea58

Browse files
conico974Nicolas Dorseuil
andauthored
Fix blockConcurrencyWhile in DO queue (#674)
* fix blockConcurrencyWhile blocking for 30s * add a warning and fix the test * changeset * review fix * use warn from logger --------- Co-authored-by: Nicolas Dorseuil <[email protected]>
1 parent e6040bc commit ec9ea58

File tree

3 files changed

+19
-12
lines changed

3 files changed

+19
-12
lines changed

.changeset/nine-months-enter.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@opennextjs/cloudflare": patch
3+
---
4+
5+
fix blockConcurrencyWhile on DO queue

packages/cloudflare/src/api/durable-objects/queue.spec.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,6 @@ describe("DurableObjectQueue", () => {
111111
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
112112
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);
113113

114-
// BlockConcurrencyWhile is called twice here, first time during creation of the object and second time when we try to revalidate
115-
// @ts-expect-error
116-
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(2);
117-
118114
// Here we await the blocked request to ensure it's resolved
119115
await blockedReq;
120116
// We then need to await for the actual revalidation to finish

packages/cloudflare/src/api/durable-objects/queue.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { debug, error } from "@opennextjs/aws/adapters/logger.js";
1+
import { debug, error, warn } from "@opennextjs/aws/adapters/logger.js";
22
import type { QueueMessage } from "@opennextjs/aws/types/overrides";
33
import {
44
FatalError,
@@ -73,6 +73,11 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> {
7373
}
7474

7575
async revalidate(msg: QueueMessage) {
76+
if (this.ongoingRevalidations.size > 2 * this.maxRevalidations) {
77+
warn(
78+
`Your durable object has 2 times the maximum number of revalidations (${this.maxRevalidations}) in progress. If this happens often, you should consider increasing the NEXT_CACHE_DO_QUEUE_MAX_REVALIDATION or the number of durable objects with the MAX_REVALIDATE_CONCURRENCY env var.`
79+
);
80+
}
7681
// If there is already an ongoing revalidation, we don't need to revalidate again
7782
if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return;
7883

@@ -87,22 +92,21 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> {
8792
debug(
8893
`The maximum number of revalidations (${this.maxRevalidations}) is reached. Blocking until one of the revalidations finishes.`
8994
);
90-
const ongoingRevalidations = this.ongoingRevalidations.values();
91-
// When there is more than the max revalidations, we block concurrency until one of the revalidations finishes
92-
// We still await the promise to ensure the revalidation is completed
93-
// This is fine because the queue itself run inside a waitUntil
94-
await this.ctx.blockConcurrencyWhile(async () => {
95+
// TODO: need more investigation
96+
// We don't use `blockConcurrencyWhile` here because it block the whole durable object for 30 seconds
97+
// if we exceed the max revalidations too fast
98+
while (this.ongoingRevalidations.size >= this.maxRevalidations) {
99+
const ongoingRevalidations = this.ongoingRevalidations.values();
95100
debug(`Waiting for one of the revalidations to finish`);
96101
await Promise.race(ongoingRevalidations);
97-
});
102+
}
98103
}
99104

100105
const revalidationPromise = this.executeRevalidation(msg);
101106

102107
// We store the promise to dedupe the revalidation
103108
this.ongoingRevalidations.set(msg.MessageDeduplicationId, revalidationPromise);
104109

105-
// TODO: check if the object stays up during waitUntil so that the internal state is maintained
106110
this.ctx.waitUntil(revalidationPromise);
107111
}
108112

@@ -121,6 +125,7 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> {
121125
"x-prerender-revalidate": process.env.__NEXT_PREVIEW_MODE_ID!,
122126
"x-isr": "1",
123127
},
128+
// This one is kind of problematic, it will always show the wall time of the revalidation to `this.revalidationTimeout`
124129
signal: AbortSignal.timeout(this.revalidationTimeout),
125130
});
126131
// Now we need to handle errors from the fetch
@@ -260,6 +265,7 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> {
260265
this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER, buildId TEXT)");
261266

262267
// Before doing anything else, we clear the DB for any potential old data
268+
// TODO: extract this to a function so that it could be called by the user at another time than init
263269
this.sql.exec("DELETE FROM failed_state WHERE buildId != ?", process.env.__NEXT_BUILD_ID);
264270
this.sql.exec("DELETE FROM sync WHERE buildId != ?", process.env.__NEXT_BUILD_ID);
265271

0 commit comments

Comments
 (0)