1+ import { describe , expect , it , vi } from "vitest" ;
2+
3+ import { DurableObjectQueueHandler } from "./queue" ;
4+
5+ vi . mock ( 'cloudflare:workers' , ( ) => ( {
6+ DurableObject : class {
7+ ctx : DurableObjectState ;
8+ env : CloudflareEnv ;
9+ constructor ( ctx : DurableObjectState , env : CloudflareEnv ) {
10+ this . ctx = ctx ;
11+ this . env = env ;
12+ }
13+ }
14+ } ) )
15+
16+ const createDurableObjectQueue = ( { fetchDuration, statusCode, headers} : { fetchDuration : number , statusCode ?: number , headers ?: Headers } ) => {
17+ const mockState = {
18+ waitUntil : vi . fn ( ) ,
19+ blockConcurrencyWhile : vi . fn ( ) . mockImplementation ( async ( fn ) => fn ( ) ) ,
20+ storage : {
21+ setAlarm : vi . fn ( ) ,
22+ getAlarm : vi . fn ( ) ,
23+ }
24+ } ;
25+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
26+ return new DurableObjectQueueHandler ( mockState as any , {
27+ NEXT_CACHE_REVALIDATION_WORKER : {
28+ fetch : vi . fn ( ) . mockReturnValue ( new Promise < Response > ( ( res ) => setTimeout ( ( ) => res ( new Response ( null , {
29+ status : statusCode ,
30+ headers : headers ?? new Headers ( [ [ "x-nextjs-cache" , "REVALIDATED" ] ] )
31+ } ) ) , fetchDuration ) ) ) ,
32+ connect : vi . fn ( ) ,
33+ }
34+ } ) ;
35+ } ;
36+
37+ const createMessage = ( dedupId : string ) => ( {
38+ MessageBody : { host : "test.local" , url : "/test" } ,
39+ MessageGroupId : "test.local/test" ,
40+ MessageDeduplicationId : dedupId ,
41+ previewModeId : "test" ,
42+ } )
43+
44+ describe ( 'DurableObjectQueue' , ( ) => {
45+ describe ( 'successful revalidation' , ( ) => {
46+ it ( "should process a single revalidation" , async ( ) => {
47+ const queue = createDurableObjectQueue ( { fetchDuration :10 } ) ;
48+ const firstRequest = await queue . revalidate ( createMessage ( "id" ) ) ;
49+ expect ( firstRequest ) . toBeUndefined ( ) ;
50+ expect ( queue . ongoingRevalidations . size ) . toBe ( 1 ) ;
51+ expect ( queue . ongoingRevalidations . has ( "id" ) ) . toBe ( true ) ;
52+
53+ await queue . ongoingRevalidations . get ( "id" ) ;
54+
55+ expect ( queue . ongoingRevalidations . size ) . toBe ( 0 ) ;
56+ expect ( queue . ongoingRevalidations . has ( "id" ) ) . toBe ( false ) ;
57+ expect ( queue . service . fetch ) . toHaveBeenCalledWith ( "https://test.local/test" , {
58+ method : "HEAD" ,
59+ headers : {
60+ "x-prerender-revalidate" : "test" ,
61+ "x-isr" : "1" ,
62+ } ,
63+ signal : expect . any ( AbortSignal )
64+ } ) ;
65+ } )
66+
67+ it ( 'should dedupe revalidations' , async ( ) => {
68+ const queue = createDurableObjectQueue ( { fetchDuration :10 } ) ;
69+ await queue . revalidate ( createMessage ( "id" ) ) ;
70+ await queue . revalidate ( createMessage ( "id" ) ) ;
71+ expect ( queue . ongoingRevalidations . size ) . toBe ( 1 ) ;
72+ expect ( queue . ongoingRevalidations . has ( "id" ) ) . toBe ( true ) ;
73+ } ) ;
74+
75+ it ( 'should block concurrency' , async ( ) => {
76+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
77+ await queue . revalidate ( createMessage ( "id" ) ) ;
78+ await queue . revalidate ( createMessage ( "id2" ) ) ;
79+ await queue . revalidate ( createMessage ( "id3" ) ) ;
80+ await queue . revalidate ( createMessage ( "id4" ) ) ;
81+ await queue . revalidate ( createMessage ( "id5" ) ) ;
82+ // the next one should block until one of the previous ones finishes
83+ const blockedReq = queue . revalidate ( createMessage ( "id6" ) ) ;
84+
85+ expect ( queue . ongoingRevalidations . size ) . toBe ( 5 ) ;
86+ expect ( queue . ongoingRevalidations . has ( "id6" ) ) . toBe ( false ) ;
87+ expect ( Array . from ( queue . ongoingRevalidations . keys ( ) ) ) . toEqual ( [ "id" , "id2" , "id3" , "id4" , "id5" ] ) ;
88+
89+ // @ts -expect-error
90+ expect ( queue . ctx . blockConcurrencyWhile ) . toHaveBeenCalledTimes ( 1 ) ;
91+
92+ // Here we await the blocked request to ensure it's resolved
93+ await blockedReq ;
94+ // We then need to await for the actual revalidation to finish
95+ await Promise . all ( Array . from ( queue . ongoingRevalidations . values ( ) ) ) ;
96+ expect ( queue . ongoingRevalidations . size ) . toBe ( 0 ) ;
97+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 6 ) ;
98+ } ) ;
99+ } )
100+
101+ describe ( 'failed revalidation' , ( ) => {
102+ it ( 'should not put it in failed state for an incorrect 200' , async ( ) => {
103+ const queue = createDurableObjectQueue ( {
104+ fetchDuration : 10 ,
105+ statusCode : 200 ,
106+ headers : new Headers ( [ [ "x-nextjs-cache" , "MISS" ] ] )
107+ } ) ;
108+ await queue . revalidate ( createMessage ( "id" ) ) ;
109+
110+ await queue . ongoingRevalidations . get ( "id" ) ;
111+
112+ expect ( queue . routeInFailedState . size ) . toBe ( 0 ) ;
113+ } )
114+
115+ it ( 'should not put it in failed state for a failed revalidation with 404' , async ( ) => {
116+ const queue = createDurableObjectQueue ( {
117+ fetchDuration : 10 ,
118+ statusCode : 404 ,
119+ } ) ;
120+ await queue . revalidate ( createMessage ( "id" ) ) ;
121+
122+ await queue . ongoingRevalidations . get ( "id" ) ;
123+
124+ expect ( queue . routeInFailedState . size ) . toBe ( 0 ) ;
125+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 1 ) ;
126+
127+ await queue . revalidate ( createMessage ( "id" ) ) ;
128+
129+ expect ( queue . routeInFailedState . size ) . toBe ( 0 ) ;
130+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 2 ) ;
131+ } ) ;
132+
133+ it ( 'should put it in failed state if revalidation fails with 500' , async ( ) => {
134+ const queue = createDurableObjectQueue ( {
135+ fetchDuration : 10 ,
136+ statusCode : 500 ,
137+ } ) ;
138+ await queue . revalidate ( createMessage ( "id" ) ) ;
139+
140+ await queue . ongoingRevalidations . get ( "id" ) ;
141+
142+ expect ( queue . routeInFailedState . size ) . toBe ( 1 ) ;
143+ expect ( queue . routeInFailedState . has ( "id" ) ) . toBe ( true ) ;
144+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 1 ) ;
145+
146+ await queue . revalidate ( createMessage ( "id" ) ) ;
147+
148+ expect ( queue . routeInFailedState . size ) . toBe ( 1 ) ;
149+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 1 ) ;
150+ } ) ;
151+
152+ it ( 'should put it in failed state if revalidation fetch throw' , async ( ) => {
153+ const queue = createDurableObjectQueue ( {
154+ fetchDuration : 10 ,
155+ } ) ;
156+ // @ts -expect-error - This is mocked above
157+ queue . service . fetch . mockImplementationOnce ( ( ) => Promise . reject ( new Error ( "fetch error" ) ) ) ;
158+ await queue . revalidate ( createMessage ( "id" ) ) ;
159+
160+ await queue . ongoingRevalidations . get ( "id" ) ;
161+
162+ expect ( queue . routeInFailedState . size ) . toBe ( 1 ) ;
163+ expect ( queue . routeInFailedState . has ( "id" ) ) . toBe ( true ) ;
164+ expect ( queue . ongoingRevalidations . size ) . toBe ( 0 ) ;
165+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 1 ) ;
166+
167+ await queue . revalidate ( createMessage ( "id" ) ) ;
168+
169+ expect ( queue . routeInFailedState . size ) . toBe ( 1 ) ;
170+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 1 ) ;
171+ } ) ;
172+
173+ } )
174+
175+ describe ( 'addAlarm' , ( ) => {
176+ const getStorage = ( queue : DurableObjectQueueHandler ) : DurableObjectStorage => {
177+ // @ts -expect-error - ctx is a protected field
178+ return queue . ctx . storage
179+ }
180+
181+ it ( 'should not add an alarm if there are no failed states' , async ( ) => {
182+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
183+ await queue . addAlarm ( ) ;
184+ expect ( getStorage ( queue ) . setAlarm ) . not . toHaveBeenCalled ( ) ;
185+ } ) ;
186+
187+ it ( 'should add an alarm if there are failed states' , async ( ) => {
188+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
189+ queue . routeInFailedState . set ( "id" , { msg : createMessage ( "id" ) , retryCount : 0 , nextAlarm : 1000 } ) ;
190+ await queue . addAlarm ( ) ;
191+ expect ( getStorage ( queue ) . setAlarm ) . toHaveBeenCalledWith ( 1000 ) ;
192+ } ) ;
193+
194+ it ( 'should not add an alarm if there is already an alarm set' , async ( ) => {
195+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
196+ queue . routeInFailedState . set ( "id" , { msg : createMessage ( "id" ) , retryCount : 0 , nextAlarm : 1000 } ) ;
197+ // @ts -expect-error
198+ queue . ctx . storage . getAlarm . mockResolvedValueOnce ( 1000 ) ;
199+ await queue . addAlarm ( ) ;
200+ expect ( getStorage ( queue ) . setAlarm ) . not . toHaveBeenCalled ( ) ;
201+ } ) ;
202+
203+ it ( 'should set the alarm to the lowest nextAlarm' , async ( ) => {
204+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
205+ queue . routeInFailedState . set ( "id" , { msg : createMessage ( "id" ) , retryCount : 0 , nextAlarm : 1000 } ) ;
206+ queue . routeInFailedState . set ( "id2" , { msg : createMessage ( "id2" ) , retryCount : 0 , nextAlarm : 500 } ) ;
207+ await queue . addAlarm ( ) ;
208+ expect ( getStorage ( queue ) . setAlarm ) . toHaveBeenCalledWith ( 500 ) ;
209+ } ) ;
210+ } ) ;
211+
212+
213+ describe ( 'addToFailedState' , ( ) => {
214+ it ( 'should add a failed state' , async ( ) => {
215+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
216+ await queue . addToFailedState ( createMessage ( "id" ) ) ;
217+ expect ( queue . routeInFailedState . size ) . toBe ( 1 ) ;
218+ expect ( queue . routeInFailedState . has ( "id" ) ) . toBe ( true ) ;
219+ expect ( queue . routeInFailedState . get ( "id" ) ?. retryCount ) . toBe ( 1 ) ;
220+ } ) ;
221+
222+ it ( 'should add a failed state with the correct nextAlarm' , async ( ) => {
223+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
224+ await queue . addToFailedState ( createMessage ( "id" ) ) ;
225+ expect ( queue . routeInFailedState . get ( "id" ) ?. nextAlarm ) . toBeGreaterThan ( Date . now ( ) ) ;
226+ expect ( queue . routeInFailedState . get ( "id" ) ?. retryCount ) . toBe ( 1 )
227+ } ) ;
228+
229+ it ( 'should add a failed state with the correct nextAlarm for a retry' , async ( ) => {
230+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
231+ await queue . addToFailedState ( createMessage ( "id" ) ) ;
232+ await queue . addToFailedState ( createMessage ( "id" ) ) ;
233+ expect ( queue . routeInFailedState . get ( "id" ) ?. nextAlarm ) . toBeGreaterThan ( Date . now ( ) ) ;
234+ expect ( queue . routeInFailedState . get ( "id" ) ?. retryCount ) . toBe ( 2 )
235+ } ) ;
236+
237+ it ( 'should not add a failed state if it has been retried 6 times' , async ( ) => {
238+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
239+ queue . routeInFailedState . set ( "id" , { msg : createMessage ( "id" ) , retryCount : 6 , nextAlarm : 1000 } ) ;
240+ await queue . addToFailedState ( createMessage ( "id" ) ) ;
241+ expect ( queue . routeInFailedState . size ) . toBe ( 0 ) ;
242+ } )
243+ } )
244+
245+ describe ( 'alarm' , ( ) => {
246+ it ( 'should execute revalidations for expired events' , async ( ) => {
247+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
248+ queue . routeInFailedState . set ( "id" , { msg : createMessage ( "id" ) , retryCount : 0 , nextAlarm : Date . now ( ) - 1000 } ) ;
249+ queue . routeInFailedState . set ( "id2" , { msg : createMessage ( "id2" ) , retryCount : 0 , nextAlarm : Date . now ( ) - 1000 } ) ;
250+ await queue . alarm ( ) ;
251+ expect ( queue . routeInFailedState . size ) . toBe ( 0 ) ;
252+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 2 ) ;
253+ } ) ;
254+
255+ it ( 'should execute revalidations for the next event to retry' , async ( ) => {
256+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
257+ queue . routeInFailedState . set ( "id" , { msg : createMessage ( "id" ) , retryCount : 0 , nextAlarm : Date . now ( ) + 1000 } ) ;
258+ queue . routeInFailedState . set ( "id2" , { msg : createMessage ( "id2" ) , retryCount : 0 , nextAlarm : Date . now ( ) + 500 } ) ;
259+ await queue . alarm ( ) ;
260+ expect ( queue . routeInFailedState . size ) . toBe ( 1 ) ;
261+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 1 ) ;
262+ expect ( queue . routeInFailedState . has ( "id2" ) ) . toBe ( false ) ;
263+ } ) ;
264+
265+ it ( 'should execute revalidations for the next event to retry and expired events' , async ( ) => {
266+ const queue = createDurableObjectQueue ( { fetchDuration : 10 } ) ;
267+ queue . routeInFailedState . set ( "id" , { msg : createMessage ( "id" ) , retryCount : 0 , nextAlarm : Date . now ( ) + 1000 } ) ;
268+ queue . routeInFailedState . set ( "id2" , { msg : createMessage ( "id2" ) , retryCount : 0 , nextAlarm : Date . now ( ) - 1000 } ) ;
269+ await queue . alarm ( ) ;
270+ expect ( queue . routeInFailedState . size ) . toBe ( 0 ) ;
271+ expect ( queue . service . fetch ) . toHaveBeenCalledTimes ( 2 ) ;
272+ } ) ;
273+ } )
274+ } )
0 commit comments