1+ import { error } from "@opennextjs/aws/adapters/logger.js" ;
2+ import type { QueueMessage } from "@opennextjs/aws/types/overrides" ;
3+ import { IgnorableError } from "@opennextjs/aws/utils/error.js" ;
4+ import { DurableObject } from "cloudflare:workers" ;
5+
6+ const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5 ;
7+ const DEFAULT_REVALIDATION_TIMEOUT_MS = 10_000 ;
8+
9+ interface ExtendedQueueMessage extends QueueMessage {
10+ previewModeId : string ;
11+ }
12+
13+ export class DurableObjectQueueHandler extends DurableObject < CloudflareEnv > {
14+ // Ongoing revalidations are deduped by the deduplication id
15+ // Since this is running in waitUntil, we expect the durable object state to persist this during the duration of the revalidation
16+ // TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top)
17+ ongoingRevalidations = new Map < string , Promise < void > > ( ) ;
18+
19+ service : NonNullable < CloudflareEnv [ "NEXT_CACHE_REVALIDATION_WORKER" ] > ;
20+
21+ // TODO: allow this to be configurable
22+ maxRevalidations = MAX_REVALIDATION_BY_DURABLE_OBJECT ;
23+
24+ constructor ( ctx : DurableObjectState , env : CloudflareEnv ) {
25+ super ( ctx , env ) ;
26+ const service = env . NEXT_CACHE_REVALIDATION_WORKER ;
27+ // If there is no service binding, we throw an error because we can't revalidate without it
28+ if ( ! service ) throw new IgnorableError ( "No service binding for cache revalidation worker" ) ;
29+ this . service = service ;
30+
31+ }
32+
33+ async revalidate ( msg : ExtendedQueueMessage ) {
34+ // If there is already an ongoing revalidation, we don't need to revalidate again
35+ if ( this . ongoingRevalidations . has ( msg . MessageDeduplicationId ) ) return ;
36+
37+ if ( this . ongoingRevalidations . size >= MAX_REVALIDATION_BY_DURABLE_OBJECT ) {
38+ const ongoingRevalidations = this . ongoingRevalidations . values ( )
39+ await this . ctx . blockConcurrencyWhile ( ( ) => Promise . race ( ongoingRevalidations ) ) ;
40+ }
41+
42+ const revalidationPromise = this . executeRevalidation ( msg ) ;
43+
44+ // We store the promise to dedupe the revalidation
45+ this . ongoingRevalidations . set (
46+ msg . MessageDeduplicationId ,
47+ revalidationPromise
48+ ) ;
49+
50+ this . ctx . waitUntil ( revalidationPromise ) ;
51+ }
52+
53+ private async executeRevalidation ( { MessageBody : { host, url} , MessageDeduplicationId, previewModeId} : ExtendedQueueMessage ) {
54+ try {
55+ const protocol = host . includes ( "localhost" ) ? "http" : "https" ;
56+
57+ //TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc)
58+ await this . service . fetch ( `${ protocol } ://${ host } ${ url } ` , {
59+ method : "HEAD" ,
60+ headers : {
61+ "x-prerender-revalidate" : previewModeId ,
62+ "x-isr" : "1" ,
63+ } ,
64+ signal : AbortSignal . timeout ( DEFAULT_REVALIDATION_TIMEOUT_MS )
65+ } )
66+ } catch ( e ) {
67+ error ( e ) ;
68+ } finally {
69+ this . ongoingRevalidations . delete ( MessageDeduplicationId ) ;
70+ }
71+ }
72+
73+ }
0 commit comments