24
24
import org .apache .kafka .common .record .RecordBatch ;
25
25
import org .apache .kafka .common .record .SimpleRecord ;
26
26
import org .apache .kafka .common .test .TestUtils ;
27
- import org .apache .kafka .common .utils .PrimitiveRef ;
28
27
import org .apache .kafka .common .utils .Utils ;
29
28
import org .apache .kafka .coordinator .transaction .TransactionLogConfig ;
30
29
import org .apache .kafka .server .storage .log .FetchIsolation ;
44
43
import java .util .Optional ;
45
44
import java .util .Properties ;
46
45
import java .util .concurrent .ConcurrentHashMap ;
46
+ import java .util .function .Function ;
47
47
import java .util .function .Supplier ;
48
48
49
49
import static org .junit .jupiter .api .Assertions .assertEquals ;
@@ -109,16 +109,20 @@ public void followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCa
109
109
.mapToObj (id -> new SimpleRecord (String .valueOf (id ).getBytes ()))
110
110
.toArray (SimpleRecord []::new );
111
111
112
+ Function <Integer , MemoryRecords > recordsForEpoch = i -> {
113
+ MemoryRecords recs = MemoryRecords .withRecords (messageIds [i ], Compression .NONE , records [i ]);
114
+ recs .batches ().forEach (record -> {
115
+ record .setPartitionLeaderEpoch (42 );
116
+ record .setLastOffset (i );
117
+ });
118
+ return recs ;
119
+ };
120
+
121
+ // Given each message has an offset & epoch, as msgs from leader would
112
122
try (UnifiedLog log = createLog (logDir , new LogConfig (new Properties ()))) {
113
123
// Given each message has an offset & epoch, as msgs from leader would
114
124
for (int i = 0 ; i < records .length ; i ++) {
115
- long finalI = i ;
116
- MemoryRecords recordsForEpoch = MemoryRecords .withRecords (messageIds [i ], Compression .NONE , records [i ]);
117
- recordsForEpoch .batches ().forEach (batch -> {
118
- batch .setPartitionLeaderEpoch (42 );
119
- batch .setLastOffset (finalI );
120
- });
121
- appendAsFollower (log , recordsForEpoch , i );
125
+ log .appendAsFollower (recordsForEpoch .apply (i ), i );
122
126
}
123
127
124
128
assertEquals (Optional .of (42 ), log .latestEpoch ());
@@ -154,17 +158,17 @@ public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws IOException
154
158
155
159
@ Test
156
160
public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments () throws IOException {
157
- MemoryRecords records = TestUtils .singletonRecords ("test" .getBytes ());
161
+ Supplier < MemoryRecords > records = () -> TestUtils .singletonRecords ("test" .getBytes ());
158
162
LogConfig config = new LogTestUtils .LogConfigBuilder ()
159
- .withSegmentBytes (records .sizeInBytes () * 5 )
160
- .withRetentionBytes (records .sizeInBytes () * 10L )
163
+ .withSegmentBytes (records .get (). sizeInBytes () * 5 )
164
+ .withRetentionBytes (records .get (). sizeInBytes () * 10L )
161
165
.build ();
162
166
163
167
log = createLog (logDir , config );
164
168
LeaderEpochFileCache cache = epochCache (log );
165
169
166
170
for (int i = 0 ; i < 15 ; i ++) {
167
- log .appendAsLeader (records , 0 );
171
+ log .appendAsLeader (records . get () , 0 );
168
172
}
169
173
170
174
// Given epochs
@@ -214,15 +218,16 @@ public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() throws IO
214
218
215
219
@ Test
216
220
public void shouldDeleteSizeBasedSegments () throws IOException {
217
- MemoryRecords records = TestUtils .singletonRecords ("test" .getBytes ());
221
+ Supplier < MemoryRecords > records = () -> TestUtils .singletonRecords ("test" .getBytes ());
218
222
LogConfig config = new LogTestUtils .LogConfigBuilder ()
219
- .withSegmentBytes (1024 * 1024 * 5 )
223
+ .withSegmentBytes (records .get ().sizeInBytes () * 5 )
224
+ .withRetentionBytes (records .get ().sizeInBytes () * 10L )
220
225
.build ();
221
226
log = createLog (logDir , config );
222
227
223
228
// append some messages to create some segments
224
229
for (int i = 0 ; i < 15 ; i ++) {
225
- log .appendAsLeader (records , 0 );
230
+ log .appendAsLeader (records . get () , 0 );
226
231
}
227
232
228
233
log .updateHighWatermark (log .logEndOffset ());
@@ -232,17 +237,17 @@ public void shouldDeleteSizeBasedSegments() throws IOException {
232
237
233
238
@ Test
234
239
public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize () throws IOException {
235
- MemoryRecords records = TestUtils .singletonRecords ("test" .getBytes ());
240
+ Supplier < MemoryRecords > records = () -> TestUtils .singletonRecords ("test" .getBytes ());
236
241
LogConfig config = new LogTestUtils .LogConfigBuilder ()
237
- .withSegmentBytes (records .sizeInBytes () * 5 )
238
- .withRetentionBytes (records .sizeInBytes () * 15L )
242
+ .withSegmentBytes (records .get (). sizeInBytes () * 5 )
243
+ .withRetentionBytes (records .get (). sizeInBytes () * 15L )
239
244
.build ();
240
245
241
246
log = createLog (logDir , config );
242
247
243
248
// append some messages to create some segments
244
249
for (int i = 0 ; i < 15 ; i ++) {
245
- log .appendAsLeader (records , 0 );
250
+ log .appendAsLeader (records . get () , 0 );
246
251
}
247
252
248
253
log .updateHighWatermark (log .logEndOffset ());
@@ -252,15 +257,15 @@ public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() throws IOEx
252
257
253
258
@ Test
254
259
public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted () throws IOException {
255
- MemoryRecords records = TestUtils .singletonRecords ("test" .getBytes (), 10L );
260
+ Supplier < MemoryRecords > records = () -> TestUtils .singletonRecords ("test" .getBytes (), 10L );
256
261
LogConfig config = new LogTestUtils .LogConfigBuilder ()
257
- .withSegmentBytes (records .sizeInBytes () * 15 )
262
+ .withSegmentBytes (records .get (). sizeInBytes () * 15 )
258
263
.withRetentionMs (10000L )
259
264
.build ();
260
265
log = createLog (logDir , config );
261
266
262
267
for (int i = 0 ; i < 15 ; i ++) {
263
- log .appendAsLeader (records , 0 );
268
+ log .appendAsLeader (records . get () , 0 );
264
269
}
265
270
266
271
log .updateHighWatermark (log .logEndOffset ());
@@ -426,15 +431,16 @@ public void testLogDeletionAfterClose() throws IOException {
426
431
.build ();
427
432
log = createLog (logDir , logConfig );
428
433
434
+ // append some messages to create some segments
429
435
log .appendAsLeader (records .get (), 0 );
430
436
431
- assertEquals (1 , log .numberOfSegments ());
432
- assertEquals (1 , epochCache (log ).epochEntries ().size ());
437
+ assertEquals (1 , log .numberOfSegments (), "The deleted segments should be gone." );
438
+ assertEquals (1 , epochCache (log ).epochEntries ().size (), "Epoch entries should have gone." );
433
439
434
440
log .close ();
435
441
log .delete ();
436
442
assertEquals (0 , log .numberOfSegments ());
437
- assertEquals (0 , epochCache (log ).epochEntries ().size ());
443
+ assertEquals (0 , epochCache (log ).epochEntries ().size (), "Epoch entries should have gone." );
438
444
}
439
445
440
446
@ Test
@@ -459,6 +465,7 @@ public void testDeleteOldSegments() throws IOException {
459
465
assertEquals (numSegments , log .numberOfSegments ());
460
466
assertEquals (0L , log .logStartOffset ());
461
467
468
+ // only segments with offset before the current high watermark are eligible for deletion
462
469
for (long hw = 25 ; hw <= 30 ; hw ++) {
463
470
log .updateHighWatermark (hw );
464
471
log .deleteOldSegments ();
@@ -531,41 +538,54 @@ public void shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelet
531
538
532
539
@ Test
533
540
public void testFirstUnstableOffsetNoTransactionalData () throws IOException {
534
- Supplier <MemoryRecords > records = () -> MemoryRecords .withRecords (Compression .NONE ,
541
+ LogConfig logConfig = new LogTestUtils .LogConfigBuilder ()
542
+ .withSegmentBytes (1024 * 1024 * 5 )
543
+ .build ();
544
+ log = createLog (logDir , logConfig );
545
+
546
+ MemoryRecords records = MemoryRecords .withRecords (Compression .NONE ,
535
547
new SimpleRecord ("foo" .getBytes ()),
536
548
new SimpleRecord ("bar" .getBytes ()),
537
549
new SimpleRecord ("baz" .getBytes ()));
538
550
539
- log .appendAsLeader (records . get () , 0 );
551
+ log .appendAsLeader (records , 0 );
540
552
assertEquals (Optional .empty (), log .firstUnstableOffset ());
541
553
}
542
554
543
555
@ Test
544
556
public void testFirstUnstableOffsetWithTransactionalData () throws IOException {
557
+ LogConfig logConfig = new LogTestUtils .LogConfigBuilder ()
558
+ .withSegmentBytes (1024 * 1024 * 5 ).build ();
559
+ log = createLog (logDir , logConfig );
560
+
545
561
long pid = 137L ;
546
562
short epoch = 5 ;
547
- PrimitiveRef . IntRef seq = PrimitiveRef . ofInt ( 0 ) ;
563
+ int seq = 0 ;
548
564
549
- Supplier <MemoryRecords > records = () -> MemoryRecords .withTransactionalRecords (
550
- Compression .NONE , pid , epoch , seq .value ,
565
+ // add some transactional records
566
+ MemoryRecords records = MemoryRecords .withTransactionalRecords (
567
+ Compression .NONE , pid , epoch , seq ,
551
568
new SimpleRecord ("foo" .getBytes ()),
552
569
new SimpleRecord ("bar" .getBytes ()),
553
570
new SimpleRecord ("baz" .getBytes ()));
554
571
555
- LogAppendInfo firstAppendInfo = log .appendAsLeader (records . get () , 0 );
572
+ LogAppendInfo firstAppendInfo = log .appendAsLeader (records , 0 );
556
573
assertEquals (Optional .of (firstAppendInfo .firstOffset ()), log .firstUnstableOffset ());
557
574
558
- seq .value += 3 ;
559
- log .appendAsLeader (MemoryRecords .withTransactionalRecords (Compression .NONE , pid , epoch , seq .value ,
575
+ // add more transactional records
576
+ seq += 3 ;
577
+ log .appendAsLeader (MemoryRecords .withTransactionalRecords (Compression .NONE , pid , epoch , seq ,
560
578
new SimpleRecord ("blah" .getBytes ())), 0 );
561
-
562
579
assertEquals (Optional .of (firstAppendInfo .firstOffset ()), log .firstUnstableOffset ());
563
580
581
+ // now transaction is committed
564
582
LogAppendInfo commitAppendInfo = appendEndTxnMarkerAsLeader (log , pid , epoch , ControlRecordType .COMMIT , mockTime .milliseconds ());
565
583
584
+ // first unstable offset is not updated until the high watermark is advanced
566
585
assertEquals (Optional .of (firstAppendInfo .firstOffset ()), log .firstUnstableOffset ());
567
586
log .updateHighWatermark (commitAppendInfo .lastOffset () + 1 );
568
587
588
+ // now there should be no first unstable offset
569
589
assertEquals (Optional .empty (), log .firstUnstableOffset ());
570
590
}
571
591
0 commit comments