@@ -160,8 +160,7 @@ bucket_definitions:
160160 expect ( lines ) . toMatchSnapshot ( ) ;
161161 } ) ;
162162
163- // Temporarily skipped - interruption disabled
164- test . skip ( 'sync interrupts low-priority buckets on new checkpoints' , async ( ) => {
163+ test ( 'sync interrupts low-priority buckets on new checkpoints' , async ( ) => {
165164 await using f = await factory ( ) ;
166165
167166 const syncRules = await f . updateSyncRules ( {
@@ -271,6 +270,141 @@ bucket_definitions:
271270 expect ( sentRows ) . toBe ( 10002 ) ;
272271 } ) ;
273272
273+ test ( 'sync interruptions with unrelated data' , async ( ) => {
274+ await using f = await factory ( ) ;
275+
276+ const syncRules = await f . updateSyncRules ( {
277+ content : `
278+ bucket_definitions:
279+ b0:
280+ priority: 2
281+ data:
282+ - SELECT * FROM test WHERE LENGTH(id) <= 5;
283+ b1:
284+ priority: 1
285+ parameters: SELECT request.user_id() as user_id
286+ data:
287+ - SELECT * FROM test WHERE LENGTH(id) > 5 AND description = bucket.user_id;
288+ `
289+ } ) ;
290+
291+ const bucketStorage = f . getInstance ( syncRules ) ;
292+ await bucketStorage . autoActivate ( ) ;
293+
294+ await bucketStorage . startBatch ( test_utils . BATCH_OPTIONS , async ( batch ) => {
295+ // Initial data: Add one priority row and 10k low-priority rows.
296+ await batch . save ( {
297+ sourceTable : TEST_TABLE ,
298+ tag : storage . SaveOperationTag . INSERT ,
299+ after : {
300+ id : 'highprio' ,
301+ description : 'user_one'
302+ } ,
303+ afterReplicaId : 'highprio'
304+ } ) ;
305+ for ( let i = 0 ; i < 10_000 ; i ++ ) {
306+ await batch . save ( {
307+ sourceTable : TEST_TABLE ,
308+ tag : storage . SaveOperationTag . INSERT ,
309+ after : {
310+ id : `${ i } ` ,
311+ description : 'low prio'
312+ } ,
313+ afterReplicaId : `${ i } `
314+ } ) ;
315+ }
316+
317+ await batch . commit ( '0/1' ) ;
318+ } ) ;
319+
320+ const stream = sync . streamResponse ( {
321+ syncContext,
322+ bucketStorage,
323+ syncRules : bucketStorage . getParsedSyncRules ( test_utils . PARSE_OPTIONS ) ,
324+ params : {
325+ buckets : [ ] ,
326+ include_checksum : true ,
327+ raw_data : true
328+ } ,
329+ tracker,
330+ syncParams : new RequestParameters ( { sub : 'user_one' } , { } ) ,
331+ token : { sub : 'user_one' , exp : Date . now ( ) / 1000 + 100000 } as any
332+ } ) ;
333+
334+ let sentCheckpoints = 0 ;
335+ let completedCheckpoints = 0 ;
336+ let sentRows = 0 ;
337+
338+ // Expected flow:
339+ // 1. Stream starts, we receive a checkpoint followed by the one high-prio row and a partial completion.
340+ // 2. We insert a new row that is not part of a bucket relevant to this stream.
341+ // 3. This means that no interruption happens and we receive all the low-priority data, followed by a checkpoint.
342+ // 4. After the checkpoint, add a new row that _is_ relevant for this sync, which should trigger a new iteration.
343+
344+ for await ( let next of stream ) {
345+ if ( typeof next == 'string' ) {
346+ next = JSON . parse ( next ) ;
347+ }
348+ if ( typeof next === 'object' && next !== null ) {
349+ if ( 'partial_checkpoint_complete' in next ) {
350+ if ( sentCheckpoints == 1 ) {
351+ await bucketStorage . startBatch ( test_utils . BATCH_OPTIONS , async ( batch ) => {
352+ // Add a high-priority row that doesn't affect this sync stream.
353+ await batch . save ( {
354+ sourceTable : TEST_TABLE ,
355+ tag : storage . SaveOperationTag . INSERT ,
356+ after : {
357+ id : 'highprio2' ,
358+ description : 'user_two'
359+ } ,
360+ afterReplicaId : 'highprio2'
361+ } ) ;
362+
363+ await batch . commit ( '0/2' ) ;
364+ } ) ;
365+ } else {
366+ expect ( sentCheckpoints ) . toBe ( 2 ) ;
367+ expect ( sentRows ) . toBe ( 10002 ) ;
368+ }
369+ }
370+ if ( 'checkpoint' in next || 'checkpoint_diff' in next ) {
371+ sentCheckpoints += 1 ;
372+ }
373+
374+ if ( 'data' in next ) {
375+ sentRows += next . data . data . length ;
376+ }
377+ if ( 'checkpoint_complete' in next ) {
378+ completedCheckpoints ++ ;
379+ if ( completedCheckpoints == 2 ) {
380+ break ;
381+ }
382+ if ( completedCheckpoints == 1 ) {
383+ expect ( sentRows ) . toBe ( 10001 ) ;
384+
385+ await bucketStorage . startBatch ( test_utils . BATCH_OPTIONS , async ( batch ) => {
386+ // Add a high-priority row that affects this sync stream.
387+ await batch . save ( {
388+ sourceTable : TEST_TABLE ,
389+ tag : storage . SaveOperationTag . INSERT ,
390+ after : {
391+ id : 'highprio3' ,
392+ description : 'user_one'
393+ } ,
394+ afterReplicaId : 'highprio3'
395+ } ) ;
396+
397+ await batch . commit ( '0/3' ) ;
398+ } ) ;
399+ }
400+ }
401+ }
402+ }
403+
404+ expect ( sentCheckpoints ) . toBe ( 2 ) ;
405+ expect ( sentRows ) . toBe ( 10002 ) ;
406+ } ) ;
407+
274408 test ( 'sends checkpoint complete line for empty checkpoint' , async ( ) => {
275409 await using f = await factory ( ) ;
276410
0 commit comments