@@ -192,8 +192,9 @@ public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws IOExc
192
192
193
193
@ Test
194
194
public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog () throws IOException {
195
+ Supplier <MemoryRecords > records = () -> TestUtils .records (List .of (new SimpleRecord ("value" .getBytes ())), 0 , 0 );
195
196
LogConfig config = new LogTestUtils .LogConfigBuilder ()
196
- .withSegmentBytes (10 * createRecords ( 0 , 0 ).sizeInBytes ())
197
+ .withSegmentBytes (10 * records . get ( ).sizeInBytes ())
197
198
.build ();
198
199
log = createLog (logDir , config );
199
200
LeaderEpochFileCache cache = epochCache (log );
@@ -587,7 +588,7 @@ public void testFirstUnstableOffsetWithTransactionalData() throws IOException {
587
588
assertEquals (Optional .of (firstAppendInfo .firstOffset ()), log .firstUnstableOffset ());
588
589
589
590
// now transaction is committed
590
- LogAppendInfo commitAppendInfo = LogTestUtils . appendEndTxnMarkerAsLeader (log , pid , epoch , ControlRecordType .COMMIT , mockTime .milliseconds (), 0 , 0 );
591
+ LogAppendInfo commitAppendInfo = appendEndTxnMarkerAsLeader (log , pid , epoch , ControlRecordType .COMMIT , mockTime .milliseconds ());
591
592
592
593
// first unstable offset is not updated until the high watermark is advanced
593
594
assertEquals (Optional .of (firstAppendInfo .firstOffset ()), log .firstUnstableOffset ());
@@ -598,20 +599,17 @@ public void testFirstUnstableOffsetWithTransactionalData() throws IOException {
598
599
}
599
600
600
601
private void append (int epoch , long startOffset , int count ) {
602
+ Function <Integer , MemoryRecords > records = i ->
603
+ TestUtils .records (List .of (new SimpleRecord ("value" .getBytes ())), startOffset + i , epoch );
601
604
for (int i = 0 ; i < count ; i ++) {
602
- log .appendAsFollower (createRecords ( startOffset + i , epoch ), epoch );
605
+ log .appendAsFollower (records . apply ( i ), epoch );
603
606
}
604
607
}
605
608
606
609
private LeaderEpochFileCache epochCache (UnifiedLog log ) {
607
610
return log .leaderEpochCache ();
608
611
}
609
612
610
- private void appendAsFollower (UnifiedLog log , MemoryRecords records , int leaderEpoch ) {
611
- records .batches ().forEach (batch -> batch .setPartitionLeaderEpoch (leaderEpoch ));
612
- log .appendAsFollower (records , leaderEpoch );
613
- }
614
-
615
613
private LogAppendInfo appendEndTxnMarkerAsLeader (UnifiedLog log , long producerId , short producerEpoch , ControlRecordType controlType , long timestamp ) throws IOException {
616
614
MemoryRecords records = MemoryRecords .withEndTransactionMarker (producerId , producerEpoch , new EndTransactionMarker (controlType , 0 ));
617
615
return log .appendAsLeader (records , 0 );
@@ -644,9 +642,4 @@ private UnifiedLog createLog(
644
642
this .logsToClose .add (log );
645
643
return log ;
646
644
}
647
-
648
- // FIXME: remove
649
- private MemoryRecords createRecords (long startOffset , int epoch ) {
650
- return TestUtils .records (List .of (new SimpleRecord ("value" .getBytes ())), startOffset , epoch );
651
- }
652
645
}
0 commit comments