@@ -15,17 +15,21 @@ interface ExtendedQueueMessage extends QueueMessage {
1515 previewModeId : string ;
1616}
1717
18+ interface FailedState {
19+ msg : ExtendedQueueMessage ;
20+ retryCount : number ;
21+ nextAlarmMs : number ;
22+ }
23+
1824export class DurableObjectQueueHandler extends DurableObject < CloudflareEnv > {
1925 // Ongoing revalidations are deduped by the deduplication id
2026 // Since this is running in waitUntil, we expect the durable object state to persist this during the duration of the revalidation
2127 // TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top)
2228 ongoingRevalidations = new Map < string , Promise < void > > ( ) ;
2329
24- // TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage
25- routeInFailedState = new Map <
26- string ,
27- { msg : ExtendedQueueMessage ; retryCount : number ; nextAlarmMs : number }
28- > ( ) ;
30+ sql : SqlStorage ;
31+
32+ routeInFailedState = new Map < string , FailedState > ( ) ;
2933
3034 service : NonNullable < CloudflareEnv [ "NEXT_CACHE_REVALIDATION_WORKER" ] > ;
3135
@@ -37,6 +41,10 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
3741 this . service = env . NEXT_CACHE_REVALIDATION_WORKER ! ;
3842 // If there is no service binding, we throw an error because we can't revalidate without it
3943 if ( ! this . service ) throw new IgnorableError ( "No service binding for cache revalidation worker" ) ;
44+ this . sql = ctx . storage . sql ;
45+
46+ // We restore the state
47+ ctx . blockConcurrencyWhile ( ( ) => this . initState ( ) ) ;
4048 }
4149
4250 async revalidate ( msg : ExtendedQueueMessage ) {
@@ -71,7 +79,6 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
7179 } = msg ;
7280 const protocol = host . includes ( "localhost" ) ? "http" : "https" ;
7381
74- //TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc)
7582 const response = await this . service . fetch ( `${ protocol } ://${ host } ${ url } ` , {
7683 method : "HEAD" ,
7784 headers : {
@@ -133,6 +140,8 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
133140 async addToFailedState ( msg : ExtendedQueueMessage ) {
134141 const existingFailedState = this . routeInFailedState . get ( msg . MessageDeduplicationId ) ;
135142
143+ let updatedFailedState : FailedState ;
144+
136145 if ( existingFailedState ) {
137146 if ( existingFailedState . retryCount >= 6 ) {
138147 // We give up after 6 retries and log the error
@@ -143,18 +152,24 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
143152 return ;
144153 }
145154 const nextAlarmMs = Date . now ( ) + Math . pow ( 2 , existingFailedState . retryCount + 1 ) * 2_000 ;
146- this . routeInFailedState . set ( msg . MessageDeduplicationId , {
155+ updatedFailedState = {
147156 ...existingFailedState ,
148157 retryCount : existingFailedState . retryCount + 1 ,
149158 nextAlarmMs,
150- } ) ;
159+ } ;
151160 } else {
152- this . routeInFailedState . set ( msg . MessageDeduplicationId , {
161+ updatedFailedState = {
153162 msg,
154163 retryCount : 1 ,
155164 nextAlarmMs : Date . now ( ) + 2_000 ,
156- } ) ;
165+ } ;
157166 }
167+ this . routeInFailedState . set ( msg . MessageDeduplicationId , updatedFailedState ) ;
168+ this . sql . exec (
169+ "INSERT OR REPLACE INTO failed_state (id, data) VALUES (?, ?)" ,
170+ msg . MessageDeduplicationId ,
171+ JSON . stringify ( updatedFailedState )
172+ ) ;
158173 // We probably want to do something if routeInFailedState is becoming too big, at least log it
159174 await this . addAlarm ( ) ;
160175 }
@@ -164,9 +179,29 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
164179 if ( existingAlarm ) return ;
165180 if ( this . routeInFailedState . size === 0 ) return ;
166181
167- const nextAlarmToSetup = Math . min (
182+ let nextAlarmToSetup = Math . min (
168183 ...Array . from ( this . routeInFailedState . values ( ) ) . map ( ( { nextAlarmMs } ) => nextAlarmMs )
169184 ) ;
185+ if ( nextAlarmToSetup < Date . now ( ) ) {
186+ // We don't want to set an alarm in the past
187+ nextAlarmToSetup = Date . now ( ) + 2_000 ;
188+ }
170189 await this . ctx . storage . setAlarm ( nextAlarmToSetup ) ;
171190 }
191+
192+ // This function is used to restore the state of the durable object
193+ // We don't restore the ongoing revalidations because we cannot know in which state they are
194+ // We only restore the failed state and the alarm
195+ async initState ( ) {
196+ // We store the failed state as a blob, we don't want to do anything with it anyway besides restoring
197+ this . sql . exec ( "CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT)" ) ;
198+
199+ const failedStateCursor = this . sql . exec < { id : string ; data : string } > ( "SELECT * FROM failed_state" ) ;
200+ for ( const row of failedStateCursor ) {
201+ this . routeInFailedState . set ( row . id , JSON . parse ( row . data ) ) ;
202+ }
203+
204+ // Now that we have restored the failed state, we can restore the alarm as well
205+ await this . addAlarm ( ) ;
206+ }
172207}
0 commit comments