@@ -63,7 +63,8 @@ class CoordinatorLoaderImplTest {
63
63
time = Time .SYSTEM ,
64
64
replicaManager = replicaManager,
65
65
deserializer = serde,
66
- loadBufferSize = 1000
66
+ loadBufferSize = 1000 ,
67
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
67
68
)) { loader =>
68
69
when(replicaManager.getLog(tp)).thenReturn(None )
69
70
@@ -83,7 +84,8 @@ class CoordinatorLoaderImplTest {
83
84
time = Time .SYSTEM ,
84
85
replicaManager = replicaManager,
85
86
deserializer = serde,
86
- loadBufferSize = 1000
87
+ loadBufferSize = 1000 ,
88
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
87
89
)) { loader =>
88
90
loader.close()
89
91
@@ -104,7 +106,8 @@ class CoordinatorLoaderImplTest {
104
106
time = Time .SYSTEM ,
105
107
replicaManager = replicaManager,
106
108
deserializer = serde,
107
- loadBufferSize = 1000
109
+ loadBufferSize = 1000 ,
110
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
108
111
)) { loader =>
109
112
when(replicaManager.getLog(tp)).thenReturn(Some (log))
110
113
when(log.logStartOffset).thenReturn(0L )
@@ -207,7 +210,8 @@ class CoordinatorLoaderImplTest {
207
210
time = Time .SYSTEM ,
208
211
replicaManager = replicaManager,
209
212
deserializer = serde,
210
- loadBufferSize = 1000
213
+ loadBufferSize = 1000 ,
214
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
211
215
)) { loader =>
212
216
when(replicaManager.getLog(tp)).thenReturn(Some (log))
213
217
when(log.logStartOffset).thenReturn(0L )
@@ -250,7 +254,8 @@ class CoordinatorLoaderImplTest {
250
254
time = Time .SYSTEM ,
251
255
replicaManager = replicaManager,
252
256
deserializer = serde,
253
- loadBufferSize = 1000
257
+ loadBufferSize = 1000 ,
258
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
254
259
)) { loader =>
255
260
when(replicaManager.getLog(tp)).thenReturn(Some (log))
256
261
when(log.logStartOffset).thenReturn(0L )
@@ -290,7 +295,8 @@ class CoordinatorLoaderImplTest {
290
295
time = Time .SYSTEM ,
291
296
replicaManager = replicaManager,
292
297
deserializer = serde,
293
- loadBufferSize = 1000
298
+ loadBufferSize = 1000 ,
299
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
294
300
)) { loader =>
295
301
when(replicaManager.getLog(tp)).thenReturn(Some (log))
296
302
when(log.logStartOffset).thenReturn(0L )
@@ -332,7 +338,8 @@ class CoordinatorLoaderImplTest {
332
338
time = Time .SYSTEM ,
333
339
replicaManager = replicaManager,
334
340
deserializer = serde,
335
- loadBufferSize = 1000
341
+ loadBufferSize = 1000 ,
342
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
336
343
)) { loader =>
337
344
when(replicaManager.getLog(tp)).thenReturn(Some (log))
338
345
when(log.logStartOffset).thenReturn(0L )
@@ -364,7 +371,8 @@ class CoordinatorLoaderImplTest {
364
371
time,
365
372
replicaManager = replicaManager,
366
373
deserializer = serde,
367
- loadBufferSize = 1000
374
+ loadBufferSize = 1000 ,
375
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
368
376
)) { loader =>
369
377
val startTimeMs = time.milliseconds()
370
378
when(replicaManager.getLog(tp)).thenReturn(Some (log))
@@ -419,7 +427,8 @@ class CoordinatorLoaderImplTest {
419
427
time = Time .SYSTEM ,
420
428
replicaManager = replicaManager,
421
429
deserializer = serde,
422
- loadBufferSize = 1000
430
+ loadBufferSize = 1000 ,
431
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
423
432
)) { loader =>
424
433
when(replicaManager.getLog(tp)).thenReturn(Some (log))
425
434
when(log.logStartOffset).thenReturn(0L )
@@ -494,7 +503,8 @@ class CoordinatorLoaderImplTest {
494
503
time = Time .SYSTEM ,
495
504
replicaManager = replicaManager,
496
505
deserializer = serde,
497
- loadBufferSize = 1000
506
+ loadBufferSize = 1000 ,
507
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
498
508
)) { loader =>
499
509
when(replicaManager.getLog(tp)).thenReturn(Some (log))
500
510
when(log.logStartOffset).thenReturn(0L )
@@ -520,7 +530,8 @@ class CoordinatorLoaderImplTest {
520
530
time = Time .SYSTEM ,
521
531
replicaManager = replicaManager,
522
532
deserializer = serde,
523
- loadBufferSize = 1000
533
+ loadBufferSize = 1000 ,
534
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
524
535
)) { loader =>
525
536
when(replicaManager.getLog(tp)).thenReturn(Some (log))
526
537
when(log.logStartOffset).thenReturn(0L )
@@ -596,7 +607,8 @@ class CoordinatorLoaderImplTest {
596
607
time = Time .SYSTEM ,
597
608
replicaManager = replicaManager,
598
609
deserializer = serde,
599
- loadBufferSize = 1000
610
+ loadBufferSize = 1000 ,
611
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
600
612
)) { loader =>
601
613
when(replicaManager.getLog(tp)).thenReturn(Some (log))
602
614
when(log.logStartOffset).thenReturn(0L )
@@ -632,6 +644,79 @@ class CoordinatorLoaderImplTest {
632
644
}
633
645
}
634
646
647
+ @ Test
648
+ def testUpdateLastWrittenOffsetCommitInterval (): Unit = {
649
+ val tp = new TopicPartition (" foo" , 0 )
650
+ val replicaManager = mock(classOf [ReplicaManager ])
651
+ val serde = new StringKeyValueDeserializer
652
+ val log = mock(classOf [UnifiedLog ])
653
+ val coordinator = mock(classOf [CoordinatorPlayback [(String , String )]])
654
+
655
+ Using .resource(new CoordinatorLoaderImpl [(String , String )](
656
+ time = Time .SYSTEM ,
657
+ replicaManager = replicaManager,
658
+ deserializer = serde,
659
+ loadBufferSize = 1000 ,
660
+ commitIntervalOffsets = 2L
661
+ )) { loader =>
662
+ when(replicaManager.getLog(tp)).thenReturn(Some (log))
663
+ when(log.logStartOffset).thenReturn(0L )
664
+ when(log.highWatermark).thenReturn(7L )
665
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some (7L ))
666
+
667
+ val readResult1 = logReadResult(startOffset = 0 , records = Seq (
668
+ new SimpleRecord (" k1" .getBytes, " v1" .getBytes),
669
+ new SimpleRecord (" k2" .getBytes, " v2" .getBytes)
670
+ ))
671
+
672
+ when(log.read(0L , 1000 , FetchIsolation .LOG_END , true
673
+ )).thenReturn(readResult1)
674
+
675
+ val readResult2 = logReadResult(startOffset = 2 , records = Seq (
676
+ new SimpleRecord (" k3" .getBytes, " v3" .getBytes),
677
+ new SimpleRecord (" k4" .getBytes, " v4" .getBytes),
678
+ new SimpleRecord (" k5" .getBytes, " v5" .getBytes)
679
+ ))
680
+
681
+ when(log.read(2L , 1000 , FetchIsolation .LOG_END , true
682
+ )).thenReturn(readResult2)
683
+
684
+ val readResult3 = logReadResult(startOffset = 5 , records = Seq (
685
+ new SimpleRecord (" k6" .getBytes, " v6" .getBytes)
686
+ ))
687
+
688
+ when(log.read(5L , 1000 , FetchIsolation .LOG_END , true
689
+ )).thenReturn(readResult3)
690
+
691
+ val readResult4 = logReadResult(startOffset = 6 , records = Seq (
692
+ new SimpleRecord (" k7" .getBytes, " v7" .getBytes)
693
+ ))
694
+
695
+ when(log.read(6L , 1000 , FetchIsolation .LOG_END , true
696
+ )).thenReturn(readResult4)
697
+
698
+ assertNotNull(loader.load(tp, coordinator).get(10 , TimeUnit .SECONDS ))
699
+
700
+ verify(coordinator).replay(0L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k1" , " v1" ))
701
+ verify(coordinator).replay(1L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k2" , " v2" ))
702
+ verify(coordinator).replay(2L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k3" , " v3" ))
703
+ verify(coordinator).replay(3L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k4" , " v4" ))
704
+ verify(coordinator).replay(4L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k5" , " v5" ))
705
+ verify(coordinator).replay(5L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k6" , " v6" ))
706
+ verify(coordinator).replay(6L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k7" , " v7" ))
707
+ verify(coordinator, times(0 )).updateLastWrittenOffset(0L )
708
+ verify(coordinator, times(1 )).updateLastWrittenOffset(2L )
709
+ verify(coordinator, times(1 )).updateLastWrittenOffset(5L )
710
+ verify(coordinator, times(0 )).updateLastWrittenOffset(6L )
711
+ verify(coordinator, times(1 )).updateLastWrittenOffset(7L )
712
+ verify(coordinator, times(0 )).updateLastCommittedOffset(0L )
713
+ verify(coordinator, times(1 )).updateLastCommittedOffset(2L )
714
+ verify(coordinator, times(1 )).updateLastCommittedOffset(5L )
715
+ verify(coordinator, times(0 )).updateLastCommittedOffset(6L )
716
+ verify(coordinator, times(1 )).updateLastCommittedOffset(7L )
717
+ }
718
+ }
719
+
635
720
private def logReadResult (
636
721
startOffset : Long ,
637
722
producerId : Long = RecordBatch .NO_PRODUCER_ID ,
0 commit comments