-
Notifications
You must be signed in to change notification settings - Fork 73
Fix blockConcurrencyWhile
in DO queue
#674
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"@opennextjs/cloudflare": patch | ||
--- | ||
|
||
fix blockConcurrencyWhile on DO queue |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -84,25 +84,29 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> { | |
if (this.checkSyncTable(msg)) return; | ||
|
||
if (this.ongoingRevalidations.size >= this.maxRevalidations) { | ||
if (this.ongoingRevalidations.size > 2 * this.maxRevalidations) { | ||
console.warn( | ||
`Your durable object has 2 times the maximum number of revalidations (${this.maxRevalidations}) in progress. If this happens often, you should consider increasing the NEXT_CACHE_DO_QUEUE_MAX_REVALIDATION or the number of durable objects with the MAX_REVALIDATE_CONCURRENCY env var.` | ||
); | ||
} | ||
debug( | ||
`The maximum number of revalidations (${this.maxRevalidations}) is reached. Blocking until one of the revalidations finishes.` | ||
); | ||
const ongoingRevalidations = this.ongoingRevalidations.values(); | ||
// When there is more than the max revalidations, we block concurrency until one of the revalidations finishes | ||
// We still await the promise to ensure the revalidation is completed | ||
// This is fine because the queue itself run inside a waitUntil | ||
await this.ctx.blockConcurrencyWhile(async () => { | ||
// TODO: need more investigation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be great if you can have a minimal repro and create an issue on the workerd repro and link it here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll try to work on that. It's a bit tricky to reproduce locally |
||
// We don't use `blockConcurrencyWhile` here because it block the whole durable object for 30 seconds | ||
// if we exceed the max revalidations too fast | ||
while (this.ongoingRevalidations.size >= this.maxRevalidations) { | ||
const ongoingRevalidations = this.ongoingRevalidations.values(); | ||
debug(`Waiting for one of the revalidations to finish`); | ||
await Promise.race(ongoingRevalidations); | ||
}); | ||
} | ||
} | ||
|
||
const revalidationPromise = this.executeRevalidation(msg); | ||
|
||
// We store the promise to dedupe the revalidation | ||
this.ongoingRevalidations.set(msg.MessageDeduplicationId, revalidationPromise); | ||
|
||
// TODO: check if the object stays up during waitUntil so that the internal state is maintained | ||
this.ctx.waitUntil(revalidationPromise); | ||
} | ||
|
||
|
@@ -121,6 +125,7 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> { | |
"x-prerender-revalidate": process.env.__NEXT_PREVIEW_MODE_ID!, | ||
"x-isr": "1", | ||
}, | ||
// This one is kind of problematic, it will always show the wall time of the revalidation to `this.revalidationTimeout` | ||
signal: AbortSignal.timeout(this.revalidationTimeout), | ||
}); | ||
// Now we need to handle errors from the fetch | ||
|
@@ -260,6 +265,7 @@ export class DOQueueHandler extends DurableObject<CloudflareEnv> { | |
this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER, buildId TEXT)"); | ||
|
||
// Before doing anything else, we clear the DB for any potential old data | ||
// TODO: extract this to a function so that it could be called by the user at another time than init | ||
this.sql.exec("DELETE FROM failed_state WHERE buildId != ?", process.env.__NEXT_BUILD_ID); | ||
this.sql.exec("DELETE FROM sync WHERE buildId != ?", process.env.__NEXT_BUILD_ID); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you want to import warning on line 1?