@@ -62,7 +62,8 @@ class CoordinatorLoaderImplTest {
62
62
time = Time .SYSTEM ,
63
63
replicaManager = replicaManager,
64
64
deserializer = serde,
65
- loadBufferSize = 1000
65
+ loadBufferSize = 1000 ,
66
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
66
67
)) { loader =>
67
68
when(replicaManager.getLog(tp)).thenReturn(None )
68
69
@@ -82,7 +83,8 @@ class CoordinatorLoaderImplTest {
82
83
time = Time .SYSTEM ,
83
84
replicaManager = replicaManager,
84
85
deserializer = serde,
85
- loadBufferSize = 1000
86
+ loadBufferSize = 1000 ,
87
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
86
88
)) { loader =>
87
89
loader.close()
88
90
@@ -103,7 +105,8 @@ class CoordinatorLoaderImplTest {
103
105
time = Time .SYSTEM ,
104
106
replicaManager = replicaManager,
105
107
deserializer = serde,
106
- loadBufferSize = 1000
108
+ loadBufferSize = 1000 ,
109
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
107
110
)) { loader =>
108
111
when(replicaManager.getLog(tp)).thenReturn(Some (log))
109
112
when(log.logStartOffset).thenReturn(0L )
@@ -186,7 +189,8 @@ class CoordinatorLoaderImplTest {
186
189
time = Time .SYSTEM ,
187
190
replicaManager = replicaManager,
188
191
deserializer = serde,
189
- loadBufferSize = 1000
192
+ loadBufferSize = 1000 ,
193
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
190
194
)) { loader =>
191
195
when(replicaManager.getLog(tp)).thenReturn(Some (log))
192
196
when(log.logStartOffset).thenReturn(0L )
@@ -229,7 +233,8 @@ class CoordinatorLoaderImplTest {
229
233
time = Time .SYSTEM ,
230
234
replicaManager = replicaManager,
231
235
deserializer = serde,
232
- loadBufferSize = 1000
236
+ loadBufferSize = 1000 ,
237
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
233
238
)) { loader =>
234
239
when(replicaManager.getLog(tp)).thenReturn(Some (log))
235
240
when(log.logStartOffset).thenReturn(0L )
@@ -265,7 +270,8 @@ class CoordinatorLoaderImplTest {
265
270
time = Time .SYSTEM ,
266
271
replicaManager = replicaManager,
267
272
deserializer = serde,
268
- loadBufferSize = 1000
273
+ loadBufferSize = 1000 ,
274
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
269
275
)) { loader =>
270
276
when(replicaManager.getLog(tp)).thenReturn(Some (log))
271
277
when(log.logStartOffset).thenReturn(0L )
@@ -303,7 +309,8 @@ class CoordinatorLoaderImplTest {
303
309
time = Time .SYSTEM ,
304
310
replicaManager = replicaManager,
305
311
deserializer = serde,
306
- loadBufferSize = 1000
312
+ loadBufferSize = 1000 ,
313
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
307
314
)) { loader =>
308
315
when(replicaManager.getLog(tp)).thenReturn(Some (log))
309
316
when(log.logStartOffset).thenReturn(0L )
@@ -331,7 +338,8 @@ class CoordinatorLoaderImplTest {
331
338
time,
332
339
replicaManager = replicaManager,
333
340
deserializer = serde,
334
- loadBufferSize = 1000
341
+ loadBufferSize = 1000 ,
342
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
335
343
)) { loader =>
336
344
val startTimeMs = time.milliseconds()
337
345
when(replicaManager.getLog(tp)).thenReturn(Some (log))
@@ -378,7 +386,8 @@ class CoordinatorLoaderImplTest {
378
386
time = Time .SYSTEM ,
379
387
replicaManager = replicaManager,
380
388
deserializer = serde,
381
- loadBufferSize = 1000
389
+ loadBufferSize = 1000 ,
390
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
382
391
)) { loader =>
383
392
when(replicaManager.getLog(tp)).thenReturn(Some (log))
384
393
when(log.logStartOffset).thenReturn(0L )
@@ -441,7 +450,8 @@ class CoordinatorLoaderImplTest {
441
450
time = Time .SYSTEM ,
442
451
replicaManager = replicaManager,
443
452
deserializer = serde,
444
- loadBufferSize = 1000
453
+ loadBufferSize = 1000 ,
454
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
445
455
)) { loader =>
446
456
when(replicaManager.getLog(tp)).thenReturn(Some (log))
447
457
when(log.logStartOffset).thenReturn(0L )
@@ -467,7 +477,8 @@ class CoordinatorLoaderImplTest {
467
477
time = Time .SYSTEM ,
468
478
replicaManager = replicaManager,
469
479
deserializer = serde,
470
- loadBufferSize = 1000
480
+ loadBufferSize = 1000 ,
481
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
471
482
)) { loader =>
472
483
when(replicaManager.getLog(tp)).thenReturn(Some (log))
473
484
when(log.logStartOffset).thenReturn(0L )
@@ -531,7 +542,8 @@ class CoordinatorLoaderImplTest {
531
542
time = Time .SYSTEM ,
532
543
replicaManager = replicaManager,
533
544
deserializer = serde,
534
- loadBufferSize = 1000
545
+ loadBufferSize = 1000 ,
546
+ commitIntervalOffsets = CoordinatorLoaderImpl .DEFAULT_COMMIT_INTERVAL_OFFSETS
535
547
)) { loader =>
536
548
when(replicaManager.getLog(tp)).thenReturn(Some (log))
537
549
when(log.logStartOffset).thenReturn(0L )
@@ -559,6 +571,79 @@ class CoordinatorLoaderImplTest {
559
571
}
560
572
}
561
573
574
+ @ Test
575
+ def testUpdateLastWrittenOffsetCommitInterval (): Unit = {
576
+ val tp = new TopicPartition (" foo" , 0 )
577
+ val replicaManager = mock(classOf [ReplicaManager ])
578
+ val serde = new StringKeyValueDeserializer
579
+ val log = mock(classOf [UnifiedLog ])
580
+ val coordinator = mock(classOf [CoordinatorPlayback [(String , String )]])
581
+
582
+ Using .resource(new CoordinatorLoaderImpl [(String , String )](
583
+ time = Time .SYSTEM ,
584
+ replicaManager = replicaManager,
585
+ deserializer = serde,
586
+ loadBufferSize = 1000 ,
587
+ commitIntervalOffsets = 2L
588
+ )) { loader =>
589
+ when(replicaManager.getLog(tp)).thenReturn(Some (log))
590
+ when(log.logStartOffset).thenReturn(0L )
591
+ when(log.highWatermark).thenReturn(7L )
592
+ when(replicaManager.getLogEndOffset(tp)).thenReturn(Some (7L ))
593
+
594
+ val readResult1 = logReadResult(startOffset = 0 , records = Seq (
595
+ new SimpleRecord (" k1" .getBytes, " v1" .getBytes),
596
+ new SimpleRecord (" k2" .getBytes, " v2" .getBytes)
597
+ ))
598
+
599
+ when(log.read(0L , 1000 , FetchIsolation .LOG_END , true
600
+ )).thenReturn(readResult1)
601
+
602
+ val readResult2 = logReadResult(startOffset = 2 , records = Seq (
603
+ new SimpleRecord (" k3" .getBytes, " v3" .getBytes),
604
+ new SimpleRecord (" k4" .getBytes, " v4" .getBytes),
605
+ new SimpleRecord (" k5" .getBytes, " v5" .getBytes)
606
+ ))
607
+
608
+ when(log.read(2L , 1000 , FetchIsolation .LOG_END , true
609
+ )).thenReturn(readResult2)
610
+
611
+ val readResult3 = logReadResult(startOffset = 5 , records = Seq (
612
+ new SimpleRecord (" k6" .getBytes, " v6" .getBytes)
613
+ ))
614
+
615
+ when(log.read(5L , 1000 , FetchIsolation .LOG_END , true
616
+ )).thenReturn(readResult3)
617
+
618
+ val readResult4 = logReadResult(startOffset = 6 , records = Seq (
619
+ new SimpleRecord (" k7" .getBytes, " v7" .getBytes)
620
+ ))
621
+
622
+ when(log.read(6L , 1000 , FetchIsolation .LOG_END , true
623
+ )).thenReturn(readResult4)
624
+
625
+ assertNotNull(loader.load(tp, coordinator).get(10 , TimeUnit .SECONDS ))
626
+
627
+ verify(coordinator).replay(0L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k1" , " v1" ))
628
+ verify(coordinator).replay(1L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k2" , " v2" ))
629
+ verify(coordinator).replay(2L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k3" , " v3" ))
630
+ verify(coordinator).replay(3L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k4" , " v4" ))
631
+ verify(coordinator).replay(4L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k5" , " v5" ))
632
+ verify(coordinator).replay(5L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k6" , " v6" ))
633
+ verify(coordinator).replay(6L , RecordBatch .NO_PRODUCER_ID , RecordBatch .NO_PRODUCER_EPOCH , (" k7" , " v7" ))
634
+ verify(coordinator, times(0 )).updateLastWrittenOffset(0L )
635
+ verify(coordinator, times(1 )).updateLastWrittenOffset(2L )
636
+ verify(coordinator, times(1 )).updateLastWrittenOffset(5L )
637
+ verify(coordinator, times(0 )).updateLastWrittenOffset(6L )
638
+ verify(coordinator, times(1 )).updateLastWrittenOffset(7L )
639
+ verify(coordinator, times(0 )).updateLastCommittedOffset(0L )
640
+ verify(coordinator, times(1 )).updateLastCommittedOffset(2L )
641
+ verify(coordinator, times(1 )).updateLastCommittedOffset(5L )
642
+ verify(coordinator, times(0 )).updateLastCommittedOffset(6L )
643
+ verify(coordinator, times(1 )).updateLastCommittedOffset(7L )
644
+ }
645
+ }
646
+
562
647
private def logReadResult (
563
648
startOffset : Long ,
564
649
producerId : Long = RecordBatch .NO_PRODUCER_ID ,
0 commit comments