diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 078d006e37a37..523e7fb365ad0 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.feature.Features; import org.apache.kafka.common.feature.SupportedVersionRange; import org.apache.kafka.common.message.ApiMessageType; @@ -31,6 +32,12 @@ import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.UnalignedRecords; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.ByteBufferChannel; @@ -742,4 +749,84 @@ public static ApiVersionsResponse createApiVersionsResponse( setZkMigrationEnabled(zkMigrationEnabled). build(); } + + public static MemoryRecords singletonRecords(byte[] value, byte[] key) { + return singletonRecords(value, key, Compression.NONE, RecordBatch.NO_TIMESTAMP, RecordBatch.CURRENT_MAGIC_VALUE); + } + + public static MemoryRecords singletonRecords(byte[] value, long timestamp) { + return singletonRecords(value, null, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE); + } + + public static MemoryRecords singletonRecords( + byte[] value + ) { + return records(List.of(new SimpleRecord(RecordBatch.NO_TIMESTAMP, null, value)), + RecordBatch.CURRENT_MAGIC_VALUE, + Compression.NONE, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + 0, + RecordBatch.NO_PARTITION_LEADER_EPOCH + ); + } + + public static MemoryRecords singletonRecords( + byte[] value, + byte[] key, + Compression codec, + long timestamp, + byte magicValue + ) { + return records(List.of(new SimpleRecord(timestamp, key, value)), + magicValue, codec, + RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, + 0, + RecordBatch.NO_PARTITION_LEADER_EPOCH + ); + } + + public static MemoryRecords singletonRecords(byte[] value, byte[] key, long timestamp) { + return singletonRecords(value, key, Compression.NONE, timestamp, RecordBatch.CURRENT_MAGIC_VALUE); + } + + public static MemoryRecords records(List records) { + return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecords records(List records, long baseOffset) { + return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecords records(List records, long baseOffset, int partitionLeaderEpoch) { + return records(records, RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch); + } + + public static MemoryRecords records(List records, byte magicValue, Compression compression) { + return records(records, magicValue, compression, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecords records(List records, + byte magicValue, + Compression compression, + long producerId, + short producerEpoch, + int sequence, + long baseOffset, + int partitionLeaderEpoch) { + ByteBuffer buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records)); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, compression, TimestampType.CREATE_TIME, baseOffset, + System.currentTimeMillis(), producerId, producerEpoch, sequence, false, partitionLeaderEpoch); + for (SimpleRecord record : records) { + builder.append(record); + } + return builder.build(); + } } diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index e6fdf09331bfc..209a292928535 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -3010,475 +3010,8 @@ class UnifiedLogTest { assertFalse(LogTestUtils.hasOffsetOverflow(log)) } - @Test - def testDeleteOldSegments(): Unit = { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 100) - log.appendAsLeader(createRecords, 0) - - log.assignEpochStartOffset(0, 40) - log.assignEpochStartOffset(1, 90) - - // segments are not eligible for deletion if no high watermark has been set - val numSegments = log.numberOfSegments - log.deleteOldSegments() - assertEquals(numSegments, log.numberOfSegments) - assertEquals(0L, log.logStartOffset) - - // only segments with offset before the current high watermark are eligible for deletion - for (hw <- 25 to 30) { - log.updateHighWatermark(hw) - log.deleteOldSegments() - assertTrue(log.logStartOffset <= hw) - log.logSegments.forEach { segment => - val segmentFetchInfo = segment.read(segment.baseOffset, Int.MaxValue) - val segmentLastOffsetOpt = segmentFetchInfo.records.records.asScala.lastOption.map(_.offset) - segmentLastOffsetOpt.foreach { lastOffset => - assertTrue(lastOffset >= hw) - } - } - } - - // expire all segments - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(1, log.numberOfSegments, "The deleted segments should be gone.") - assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should have gone.") - assertEquals(new EpochEntry(1, 100), epochCache(log).epochEntries.get(0), "Epoch entry should be the latest epoch and the leo.") - - // append some messages to create some segments - for (_ <- 0 until 100) - log.appendAsLeader(createRecords, 0) - - log.delete() - assertEquals(0, log.numberOfSegments, "The number of segments should be 0") - assertEquals(0, log.deleteOldSegments(), "The number of deleted segments should be zero.") - assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should have gone.") - } - - @Test - def testLogDeletionAfterClose(): Unit = { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - log.appendAsLeader(createRecords, 0) - - assertEquals(1, log.numberOfSegments, "The deleted segments should be gone.") - assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should have gone.") - - log.close() - log.delete() - assertEquals(0, log.numberOfSegments, "The number of segments should be 0") - assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should have gone.") - } - - @Test - def testLogDeletionAfterDeleteRecords(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5) - val log = createLog(logDir, logConfig) - - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - assertEquals(3, log.numberOfSegments, "should have 3 segments") - assertEquals(log.logStartOffset, 0) - log.updateHighWatermark(log.logEndOffset) - - log.maybeIncrementLogStartOffset(1, LogStartOffsetIncrementReason.ClientRecordDeletion) - log.deleteOldSegments() - assertEquals(3, log.numberOfSegments, "should have 3 segments") - assertEquals(log.logStartOffset, 1) - - log.maybeIncrementLogStartOffset(6, LogStartOffsetIncrementReason.ClientRecordDeletion) - log.deleteOldSegments() - assertEquals(2, log.numberOfSegments, "should have 2 segments") - assertEquals(log.logStartOffset, 6) - - log.maybeIncrementLogStartOffset(15, LogStartOffsetIncrementReason.ClientRecordDeletion) - log.deleteOldSegments() - assertEquals(1, log.numberOfSegments, "should have 1 segments") - assertEquals(log.logStartOffset, 15) - } - def epochCache(log: UnifiedLog): LeaderEpochFileCache = log.leaderEpochCache - @Test - def shouldDeleteSizeBasedSegments(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(2,log.numberOfSegments, "should have 2 segments") - } - - @Test - def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(3,log.numberOfSegments, "should have 3 segments") - } - - @Test - def shouldDeleteTimeBasedSegmentsReadyToBeDeleted(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = 10) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(1, log.numberOfSegments, "There should be 1 segment remaining") - } - - @Test - def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp = mockTime.milliseconds) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000000) - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(3, log.numberOfSegments, "There should be 3 segments remaining") - } - - @Test - def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact") - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - // mark the oldest segment as older the retention.ms - log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000) - - val segments = log.numberOfSegments - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(segments, log.numberOfSegments, "There should be 3 segments remaining") - } - - @Test - def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) - val recordSize = createRecords.sizeInBytes - val logConfig = LogTestUtils.createLogConfig( - segmentBytes = recordSize * 2, - localRetentionBytes = recordSize / 2, - cleanupPolicy = "", - remoteLogStorageEnable = true - ) - val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) - - for (_ <- 0 until 10) - log.appendAsLeader(createRecords, 0) - - val segmentsBefore = log.numberOfSegments - log.updateHighWatermark(log.logEndOffset) - log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1) - val deleteOldSegments = log.deleteOldSegments() - - assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be deleted due to size retention") - assertTrue(deleteOldSegments > 0, "At least one segment should be deleted") - } - - @Test - def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention(): Unit = { - val oldTimestamp = mockTime.milliseconds - 20000 - def oldRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = oldTimestamp) - val recordSize = oldRecords.sizeInBytes - val logConfig = LogTestUtils.createLogConfig( - segmentBytes = recordSize * 2, - localRetentionMs = 5000, - cleanupPolicy = "", - remoteLogStorageEnable = true - ) - val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) - - for (_ <- 0 until 10) - log.appendAsLeader(oldRecords, 0) - - def newRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = mockTime.milliseconds) - for (_ <- 0 until 5) - log.appendAsLeader(newRecords, 0) - - val segmentsBefore = log.numberOfSegments - - log.updateHighWatermark(log.logEndOffset) - log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1) - val deleteOldSegments = log.deleteOldSegments() - - assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be deleted due to time retention") - assertTrue(deleteOldSegments > 0, "At least one segment should be deleted") - } - - @Test - def shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact,delete") - val log = createLog(logDir, logConfig) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(1, log.numberOfSegments, "There should be 1 segment remaining") - } - - @Test - def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes, timestamp = 10L) - val recordsPerSegment = 5 - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000, cleanupPolicy = "compact") - val log = createLog(logDir, logConfig, brokerTopicStats) - - // append some messages to create some segments - for (_ <- 0 until 15) - log.appendAsLeader(createRecords, 0) - - // Three segments should be created - assertEquals(3, log.logSegments.asScala.count(_ => true)) - log.updateHighWatermark(log.logEndOffset) - log.maybeIncrementLogStartOffset(recordsPerSegment, LogStartOffsetIncrementReason.ClientRecordDeletion) - - // The first segment, which is entirely before the log start offset, should be deleted - // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset - // greater than the start offset - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - assertEquals(2, log.numberOfSegments, "There should be 2 segments remaining") - assertTrue(log.logSegments.asScala.head.baseOffset <= log.logStartOffset) - assertTrue(log.logSegments.asScala.tail.forall(s => s.baseOffset > log.logStartOffset)) - } - - @Test - def shouldApplyEpochToMessageOnAppendIfLeader(): Unit = { - val records = (0 until 50).toArray.map(id => new SimpleRecord(id.toString.getBytes)) - - //Given this partition is on leader epoch 72 - val epoch = 72 - val log = createLog(logDir, new LogConfig(new Properties)) - log.assignEpochStartOffset(epoch, records.length) - - //When appending messages as a leader (i.e. assignOffsets = true) - for (record <- records) - log.appendAsLeader( - MemoryRecords.withRecords(Compression.NONE, record), - epoch - ) - - //Then leader epoch should be set on messages - for (i <- records.indices) { - val read = LogTestUtils.readLog(log, i, 1).records.batches.iterator.next() - assertEquals(72, read.partitionLeaderEpoch, "Should have set leader epoch") - } - } - - @Test - def followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache(): Unit = { - val messageIds = (0 until 50).toArray - val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes)) - - //Given each message has an offset & epoch, as msgs from leader would - def recordsForEpoch(i: Int): MemoryRecords = { - val recs = MemoryRecords.withRecords(messageIds(i), Compression.NONE, records(i)) - recs.batches.forEach{record => - record.setPartitionLeaderEpoch(42) - record.setLastOffset(i) - } - recs - } - - val log = createLog(logDir, new LogConfig(new Properties)) - - //When appending as follower (assignOffsets = false) - for (i <- records.indices) - log.appendAsFollower(recordsForEpoch(i), i) - - assertEquals(Optional.of(42), log.latestEpoch) - } - - @Test - def shouldTruncateLeaderEpochsWhenDeletingSegments(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) - val log = createLog(logDir, logConfig) - val cache = epochCache(log) - - // Given three segments of 5 messages each - for (_ <- 0 until 15) { - log.appendAsLeader(createRecords, 0) - } - - //Given epochs - cache.assign(0, 0) - cache.assign(1, 5) - cache.assign(2, 10) - - //When first segment is removed - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - - //The oldest epoch entry should have been removed - assertEquals(util.List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)), cache.epochEntries) - } - - @Test - def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments(): Unit = { - def createRecords = TestUtils.singletonRecords("test".getBytes) - val logConfig = LogTestUtils.createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10) - val log = createLog(logDir, logConfig) - val cache = epochCache(log) - - // Given three segments of 5 messages each - for (_ <- 0 until 15) { - log.appendAsLeader(createRecords, 0) - } - - //Given epochs - cache.assign(0, 0) - cache.assign(1, 7) - cache.assign(2, 10) - - //When first segment removed (up to offset 5) - log.updateHighWatermark(log.logEndOffset) - log.deleteOldSegments() - - //The first entry should have gone from (0,0) => (0,5) - assertEquals(util.List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new EpochEntry(2, 10)), cache.epochEntries) - } - - @Test - def shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog(): Unit = { - def createRecords(startOffset: Long, epoch: Int): MemoryRecords = { - TestUtils.records(Seq(new SimpleRecord("value".getBytes)), - baseOffset = startOffset, partitionLeaderEpoch = epoch) - } - - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes) - val log = createLog(logDir, logConfig) - val cache = epochCache(log) - - def append(epoch: Int, startOffset: Long, count: Int): Unit = { - for (i <- 0 until count) - log.appendAsFollower(createRecords(startOffset + i, epoch), epoch) - } - - //Given 2 segments, 10 messages per segment - append(epoch = 0, startOffset = 0, count = 10) - append(epoch = 1, startOffset = 10, count = 6) - append(epoch = 2, startOffset = 16, count = 4) - - assertEquals(2, log.numberOfSegments) - assertEquals(20, log.logEndOffset) - - //When truncate to LEO (no op) - log.truncateTo(log.logEndOffset) - - //Then no change - assertEquals(3, cache.epochEntries.size) - - //When truncate - log.truncateTo(11) - - //Then no change - assertEquals(2, cache.epochEntries.size) - - //When truncate - log.truncateTo(10) - - //Then - assertEquals(1, cache.epochEntries.size) - - //When truncate all - log.truncateTo(0) - - //Then - assertEquals(0, cache.epochEntries.size) - } - - @Test - def testFirstUnstableOffsetNoTransactionalData(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - val records = MemoryRecords.withRecords(Compression.NONE, - new SimpleRecord("foo".getBytes), - new SimpleRecord("bar".getBytes), - new SimpleRecord("baz".getBytes)) - - log.appendAsLeader(records, 0) - assertEquals(Optional.empty, log.firstUnstableOffset) - } - - @Test - def testFirstUnstableOffsetWithTransactionalData(): Unit = { - val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) - val log = createLog(logDir, logConfig) - - val pid = 137L - val epoch = 5.toShort - var seq = 0 - - // add some transactional records - val records = MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, - new SimpleRecord("foo".getBytes), - new SimpleRecord("bar".getBytes), - new SimpleRecord("baz".getBytes)) - - val firstAppendInfo = log.appendAsLeader(records, 0) - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - - // add more transactional records - seq += 3 - log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, - new SimpleRecord("blah".getBytes)), 0) - - // LSO should not have changed - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - - // now transaction is committed - val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.COMMIT, mockTime.milliseconds()) - - // first unstable offset is not updated until the high watermark is advanced - assertEquals(Optional.of(firstAppendInfo.firstOffset), log.firstUnstableOffset) - log.updateHighWatermark(commitAppendInfo.lastOffset + 1) - - // now there should be no first unstable offset - assertEquals(Optional.empty, log.firstUnstableOffset) - } - @Test def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = { val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 * 5) diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java index 50b666a0fb1d5..f433fbd5bd394 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java @@ -16,11 +16,23 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.RequestLocal; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import java.io.File; import java.io.IOException; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ConcurrentMap; public class LogTestUtils { public static LogSegment createSegment(long offset, File logDir, int indexIntervalBytes, Time time) throws IOException { @@ -33,4 +45,176 @@ public static LogSegment createSegment(long offset, File logDir, int indexInterv // Create and return the LogSegment instance return new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time); } + + public static LogAppendInfo appendEndTxnMarkerAsLeader(UnifiedLog log, + long producerId, + short producerEpoch, + ControlRecordType controlType, + long timestamp, + int coordinatorEpoch, + int leaderEpoch) { + MemoryRecords records = endTxnRecords(controlType, producerId, producerEpoch, 0L, coordinatorEpoch, leaderEpoch, timestamp); + + return log.appendAsLeader(records, leaderEpoch, AppendOrigin.COORDINATOR, RequestLocal.noCaching(), VerificationGuard.SENTINEL); + } + + public static MemoryRecords endTxnRecords(ControlRecordType controlRecordType, + long producerId, + short epoch, + long offset, + int coordinatorEpoch, + int partitionLeaderEpoch, + long timestamp) { + EndTransactionMarker marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch); + return MemoryRecords.withEndTransactionMarker(offset, timestamp, partitionLeaderEpoch, producerId, epoch, marker); + } + + @SuppressWarnings("ParameterNumber") + public static UnifiedLog createLog(File dir, + LogConfig config, + BrokerTopicStats brokerTopicStats, + Scheduler scheduler, + Time time, + long logStartOffset, + long recoveryPoint, + int maxTransactionTimeoutMs, + ProducerStateManagerConfig producerStateManagerConfig, + int producerIdExpirationCheckIntervalMs, + boolean lastShutdownClean, + Optional topicId, + ConcurrentMap numRemainingSegments, + boolean remoteStorageSystemEnable, + LogOffsetsListener logOffsetsListener) throws IOException { + return UnifiedLog.create( + dir, + config, + logStartOffset, + recoveryPoint, + scheduler, + brokerTopicStats, + time, + maxTransactionTimeoutMs, + producerStateManagerConfig, + producerIdExpirationCheckIntervalMs, + new LogDirFailureChannel(10), + lastShutdownClean, + topicId, + numRemainingSegments, + remoteStorageSystemEnable, + logOffsetsListener + ); + } + + public static class LogConfigBuilder { + private long segmentMs = LogConfig.DEFAULT_SEGMENT_MS; + private int segmentBytes = LogConfig.DEFAULT_SEGMENT_BYTES; + private long retentionMs = LogConfig.DEFAULT_RETENTION_MS; + private long localRetentionMs = LogConfig.DEFAULT_LOCAL_RETENTION_MS; + private long retentionBytes = ServerLogConfigs.LOG_RETENTION_BYTES_DEFAULT; + private long localRetentionBytes = LogConfig.DEFAULT_LOCAL_RETENTION_BYTES; + private long segmentJitterMs = LogConfig.DEFAULT_SEGMENT_JITTER_MS; + private String cleanupPolicy = ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT; + private int maxMessageBytes = ServerLogConfigs.MAX_MESSAGE_BYTES_DEFAULT; + private int indexIntervalBytes = ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT; + private int segmentIndexBytes = ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT; + private long fileDeleteDelayMs = ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT; + private boolean remoteLogStorageEnable = LogConfig.DEFAULT_REMOTE_STORAGE_ENABLE; + private boolean remoteLogCopyDisable = LogConfig.DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG; + private boolean remoteLogDeleteOnDisable = LogConfig.DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG; + + public LogConfigBuilder withSegmentMs(long segmentMs) { + this.segmentMs = segmentMs; + return this; + } + + public LogConfigBuilder withSegmentBytes(int segmentBytes) { + this.segmentBytes = segmentBytes; + return this; + } + + public LogConfigBuilder withRetentionMs(long retentionMs) { + this.retentionMs = retentionMs; + return this; + } + + public LogConfigBuilder withLocalRetentionMs(long localRetentionMs) { + this.localRetentionMs = localRetentionMs; + return this; + } + + public LogConfigBuilder withRetentionBytes(long retentionBytes) { + this.retentionBytes = retentionBytes; + return this; + } + + public LogConfigBuilder withLocalRetentionBytes(long localRetentionBytes) { + this.localRetentionBytes = localRetentionBytes; + return this; + } + + public LogConfigBuilder withSegmentJitterMs(long segmentJitterMs) { + this.segmentJitterMs = segmentJitterMs; + return this; + } + + public LogConfigBuilder withCleanupPolicy(String cleanupPolicy) { + this.cleanupPolicy = cleanupPolicy; + return this; + } + + public LogConfigBuilder withMaxMessageBytes(int maxMessageBytes) { + this.maxMessageBytes = maxMessageBytes; + return this; + } + + public LogConfigBuilder withIndexIntervalBytes(int indexIntervalBytes) { + this.indexIntervalBytes = indexIntervalBytes; + return this; + } + + public LogConfigBuilder withSegmentIndexBytes(int segmentIndexBytes) { + this.segmentIndexBytes = segmentIndexBytes; + return this; + } + + public LogConfigBuilder withFileDeleteDelayMs(long fileDeleteDelayMs) { + this.fileDeleteDelayMs = fileDeleteDelayMs; + return this; + } + + public LogConfigBuilder withRemoteLogStorageEnable(boolean remoteLogStorageEnable) { + this.remoteLogStorageEnable = remoteLogStorageEnable; + return this; + } + + public LogConfigBuilder withRemoteLogCopyDisable(boolean remoteLogCopyDisable) { + this.remoteLogCopyDisable = remoteLogCopyDisable; + return this; + } + + public LogConfigBuilder withRemoteLogDeleteOnDisable(boolean remoteLogDeleteOnDisable) { + this.remoteLogDeleteOnDisable = remoteLogDeleteOnDisable; + return this; + } + + public LogConfig build() { + Properties logProps = new Properties(); + logProps.put(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(segmentMs)); + logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, String.valueOf(segmentBytes)); + logProps.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs)); + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, String.valueOf(localRetentionMs)); + logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, String.valueOf(retentionBytes)); + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, String.valueOf(localRetentionBytes)); + logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, String.valueOf(segmentJitterMs)); + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy); + logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(maxMessageBytes)); + logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, String.valueOf(indexIntervalBytes)); + logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(segmentIndexBytes)); + logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, String.valueOf(fileDeleteDelayMs)); + logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, String.valueOf(remoteLogStorageEnable)); + logProps.put(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, String.valueOf(remoteLogCopyDisable)); + logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, String.valueOf(remoteLogDeleteOnDisable)); + return new LogConfig(logProps); + } + } } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java index ea14932ff203d..e8cbeeba36d66 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java @@ -16,17 +16,58 @@ */ package org.apache.kafka.storage.internals.log; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.server.util.Scheduler; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class UnifiedLogTest { private final File tmpDir = TestUtils.tempDirectory(); + private final File logDir = TestUtils.randomPartitionLogDir(tmpDir); + private final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(false); + private final MockTime mockTime = new MockTime(); + private final int maxTransactionTimeoutMs = 60 * 60 * 1000; + private final ProducerStateManagerConfig producerStateManagerConfig = new ProducerStateManagerConfig(maxTransactionTimeoutMs, false); + private final List logsToClose = new ArrayList<>(); + + private UnifiedLog log; + + @AfterEach + public void tearDown() throws IOException { + brokerTopicStats.close(); + for (UnifiedLog log : logsToClose) { + log.close(); + } + Utils.delete(tmpDir); + } @Test public void testOffsetFromProducerSnapshotFile() { @@ -34,4 +75,576 @@ public void testOffsetFromProducerSnapshotFile() { File snapshotFile = LogFileUtils.producerSnapshotFile(tmpDir, offset); assertEquals(offset, UnifiedLog.offsetFromFile(snapshotFile)); } + + @Test + public void shouldApplyEpochToMessageOnAppendIfLeader() throws IOException { + SimpleRecord[] records = java.util.stream.IntStream.range(0, 50) + .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) + .toArray(SimpleRecord[]::new); + + // Given this partition is on leader epoch 72 + int epoch = 72; + try (UnifiedLog log = createLog(logDir, new LogConfig(new Properties()))) { + log.assignEpochStartOffset(epoch, records.length); + + // When appending messages as a leader (i.e. assignOffsets = true) + for (SimpleRecord record : records) { + log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, record), epoch); + } + + // Then leader epoch should be set on messages + for (int i = 0; i < records.length; i++) { + FetchDataInfo read = log.read(i, 1, FetchIsolation.LOG_END, true); + RecordBatch batch = read.records.batches().iterator().next(); + assertEquals(epoch, batch.partitionLeaderEpoch(), "Should have set leader epoch"); + } + } + } + + @Test + public void followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache() throws IOException { + int[] messageIds = java.util.stream.IntStream.range(0, 50).toArray(); + SimpleRecord[] records = Arrays.stream(messageIds) + .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) + .toArray(SimpleRecord[]::new); + + //Given each message has an offset & epoch, as msgs from leader would + Function recordsForEpoch = i -> { + MemoryRecords recs = MemoryRecords.withRecords(messageIds[i], Compression.NONE, records[i]); + recs.batches().forEach(record -> { + record.setPartitionLeaderEpoch(42); + record.setLastOffset(i); + }); + return recs; + }; + + try (UnifiedLog log = createLog(logDir, new LogConfig(new Properties()))) { + // Given each message has an offset & epoch, as msgs from leader would + for (int i = 0; i < records.length; i++) { + log.appendAsFollower(recordsForEpoch.apply(i), i); + } + + assertEquals(Optional.of(42), log.latestEpoch()); + } + } + + @Test + public void shouldTruncateLeaderEpochsWhenDeletingSegments() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 10L) + .build(); + + log = createLog(logDir, config); + LeaderEpochFileCache cache = epochCache(log); + + // Given three segments of 5 messages each + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + + // Given epochs + cache.assign(0, 0); + cache.assign(1, 5); + cache.assign(2, 10); + + // When first segment is removed + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + + //The oldest epoch entry should have been removed + assertEquals(List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)), cache.epochEntries()); + } + + @Test + public void shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 10L) + .build(); + + log = createLog(logDir, config); + LeaderEpochFileCache cache = epochCache(log); + + // Given three segments of 5 messages each + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + + // Given epochs + cache.assign(0, 0); + cache.assign(1, 7); + cache.assign(2, 10); + + // When first segment removed (up to offset 5) + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + + //The first entry should have gone from (0,0) => (0,5) + assertEquals(List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new EpochEntry(2, 10)), cache.epochEntries()); + } + + @Test + public void shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog() throws IOException { + Supplier records = () -> TestUtils.records(List.of(new SimpleRecord("value".getBytes())), 0, 0); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(10 * records.get().sizeInBytes()) + .build(); + log = createLog(logDir, config); + LeaderEpochFileCache cache = epochCache(log); + + //Given 2 segments, 10 messages per segment + append(0, 0, 10); + append(1, 10, 6); + append(2, 16, 4); + + assertEquals(2, log.numberOfSegments()); + assertEquals(20, log.logEndOffset()); + + // When truncate to LEO (no op) + log.truncateTo(log.logEndOffset()); + // Then no change + assertEquals(3, cache.epochEntries().size()); + + // When truncate + log.truncateTo(11); + // Then no change + assertEquals(2, cache.epochEntries().size()); + + // When truncate + log.truncateTo(10); + assertEquals(1, cache.epochEntries().size()); + + // When truncate all + log.truncateTo(0); + assertEquals(0, cache.epochEntries().size()); + } + + @Test + public void shouldDeleteSizeBasedSegments() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 10L) + .build(); + log = createLog(logDir, config); + + // append some messages to create some segments + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(2, log.numberOfSegments(), "should have 2 segments"); + } + + @Test + public void shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 15L) + .build(); + + log = createLog(logDir, config); + + // append some messages to create some segments + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(3, log.numberOfSegments(), "should have 3 segments"); + } + + @Test + public void shouldDeleteTimeBasedSegmentsReadyToBeDeleted() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), 10L); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 15) + .withRetentionMs(10000L) + .build(); + log = createLog(logDir, config); + + // append some messages to create some segments + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(1, log.numberOfSegments(), "There should be 1 segment remaining"); + } + + @Test + public void shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds()); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionMs(10000000) + .build(); + log = createLog(logDir, logConfig); + + // append some messages to create some segments + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(3, log.numberOfSegments(), "There should be 3 segments remaining"); + } + + @Test + public void shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionMs(10000) + .withCleanupPolicy("compact") + .build(); + log = createLog(logDir, config); + + // append some messages to create some segments + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + + // mark the oldest segment as older the retention.ms + log.logSegments().iterator().next().setLastModified(mockTime.milliseconds() - 20000); + + int segments = log.numberOfSegments(); + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(segments, log.numberOfSegments(), "There should be 3 segments remaining"); + } + + @Test + public void shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withRetentionBytes(records.get().sizeInBytes() * 10L) + .withCleanupPolicy("compact, delete") + .build(); + + log = createLog(logDir, config); + + // append some messages to create some segments + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(1, log.numberOfSegments(), "There should be 1 segment remaining"); + } + + @Test + public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + int recordSize = records.get().sizeInBytes(); + LogConfig config = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(recordSize * 2) + .withRetentionBytes(recordSize / 2) + .withCleanupPolicy("") + .withRemoteLogStorageEnable(true) + .build(); + log = createLog(logDir, config, true); + + for (int i = 0; i < 10; i++) { + log.appendAsLeader(records.get(), 0); + } + + int segmentsBefore = log.numberOfSegments(); + log.updateHighWatermark(log.logEndOffset()); + log.updateHighestOffsetInRemoteStorage(log.logEndOffset() - 1); + int deletedSegments = log.deleteOldSegments(); + + assertTrue(log.numberOfSegments() < segmentsBefore, "Some segments should be deleted due to size retention"); + assertTrue(deletedSegments > 0, "At least one segment should be deleted"); + } + + @Test + public void shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention() throws IOException { + long oldTimestamp = mockTime.milliseconds() - 20000; + Supplier oldRecords = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), oldTimestamp); + int recordSize = oldRecords.get().sizeInBytes(); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(recordSize * 2) + .withLocalRetentionMs(5000) + .withCleanupPolicy("") + .withRemoteLogStorageEnable(true) + .build(); + log = createLog(logDir, logConfig, true); + + for (int i = 0; i < 10; i++) { + log.appendAsLeader(oldRecords.get(), 0); + } + + Supplier newRecords = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), mockTime.milliseconds()); + for (int i = 0; i < 5; i++) { + log.appendAsLeader(newRecords.get(), 0); + } + + int segmentsBefore = log.numberOfSegments(); + + log.updateHighWatermark(log.logEndOffset()); + log.updateHighestOffsetInRemoteStorage(log.logEndOffset() - 1); + int deletedSegments = log.deleteOldSegments(); + + assertTrue(log.numberOfSegments() < segmentsBefore, "Some segments should be deleted due to time retention"); + assertTrue(deletedSegments > 0, "At least one segment should be deleted"); + } + + @Test + public void testLogDeletionAfterDeleteRecords() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes()); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .build(); + log = createLog(logDir, logConfig); + + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + assertEquals(3, log.numberOfSegments()); + assertEquals(0, log.logStartOffset()); + log.updateHighWatermark(log.logEndOffset()); + + log.maybeIncrementLogStartOffset(1, LogStartOffsetIncrementReason.ClientRecordDeletion); + log.deleteOldSegments(); + assertEquals(3, log.numberOfSegments()); + assertEquals(1, log.logStartOffset()); + + log.maybeIncrementLogStartOffset(6, LogStartOffsetIncrementReason.ClientRecordDeletion); + log.deleteOldSegments(); + assertEquals(2, log.numberOfSegments()); + assertEquals(6, log.logStartOffset()); + + log.maybeIncrementLogStartOffset(15, LogStartOffsetIncrementReason.ClientRecordDeletion); + log.deleteOldSegments(); + assertEquals(1, log.numberOfSegments()); + assertEquals(15, log.logStartOffset()); + } + + @Test + public void testLogDeletionAfterClose() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withSegmentIndexBytes(1000) + .withRetentionMs(999) + .build(); + log = createLog(logDir, logConfig); + // avoid close after test because it is closed in this test + logsToClose.remove(log); + + // append some messages to create some segments + log.appendAsLeader(records.get(), 0); + + assertEquals(1, log.numberOfSegments(), "The deleted segments should be gone."); + assertEquals(1, epochCache(log).epochEntries().size(), "Epoch entries should have gone."); + + log.close(); + log.delete(); + assertEquals(0, log.numberOfSegments()); + assertEquals(0, epochCache(log).epochEntries().size(), "Epoch entries should have gone."); + } + + @Test + public void testDeleteOldSegments() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), mockTime.milliseconds() - 1000); + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * 5) + .withSegmentIndexBytes(1000) + .withRetentionMs(999) + .build(); + log = createLog(logDir, logConfig); + // avoid close after test because it is closed in this test + logsToClose.remove(log); + + // append some messages to create some segments + for (int i = 0; i < 100; i++) { + log.appendAsLeader(records.get(), 0); + } + + log.assignEpochStartOffset(0, 40); + log.assignEpochStartOffset(1, 90); + + // segments are not eligible for deletion if no high watermark has been set + int numSegments = log.numberOfSegments(); + log.deleteOldSegments(); + assertEquals(numSegments, log.numberOfSegments()); + assertEquals(0L, log.logStartOffset()); + + // only segments with offset before the current high watermark are eligible for deletion + for (long hw = 25; hw <= 30; hw++) { + log.updateHighWatermark(hw); + log.deleteOldSegments(); + assertTrue(log.logStartOffset() <= hw); + long finalHw = hw; + log.logSegments().forEach(segment -> { + FetchDataInfo segmentFetchInfo; + try { + segmentFetchInfo = segment.read(segment.baseOffset(), Integer.MAX_VALUE); + } catch (IOException e) { + throw new RuntimeException(e); + } + Optional lastBatch = Optional.empty(); + for (RecordBatch batch : segmentFetchInfo.records.batches()) { + lastBatch = Optional.of(batch); + } + lastBatch.ifPresent(batch -> assertTrue(batch.lastOffset() >= finalHw)); + }); + } + + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(1, log.numberOfSegments(), "The deleted segments should be gone."); + assertEquals(1, epochCache(log).epochEntries().size(), "Epoch entries should have gone."); + assertEquals(new EpochEntry(1, 100), epochCache(log).epochEntries().get(0), "Epoch entry should be the latest epoch and the leo."); + + for (int i = 0; i < 100; i++) { + log.appendAsLeader(records.get(), 0); + } + + log.delete(); + assertEquals(0, log.numberOfSegments(), "The number of segments should be 0"); + assertEquals(0, log.deleteOldSegments(), "The number of deleted segments should be zero."); + assertEquals(0, epochCache(log).epochEntries().size(), "Epoch entries should have gone."); + } + + @Test + public void shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete() throws IOException { + Supplier records = () -> TestUtils.singletonRecords("test".getBytes(), "test".getBytes(), 10L); + int recordsPerSegment = 5; + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(records.get().sizeInBytes() * recordsPerSegment) + .withSegmentIndexBytes(1000) + .withCleanupPolicy("compact") + .build(); + log = createLog(logDir, logConfig); + + // append some messages to create some segments + for (int i = 0; i < 15; i++) { + log.appendAsLeader(records.get(), 0); + } + + assertEquals(3, log.numberOfSegments()); + log.updateHighWatermark(log.logEndOffset()); + log.maybeIncrementLogStartOffset(recordsPerSegment, LogStartOffsetIncrementReason.ClientRecordDeletion); + + // The first segment, which is entirely before the log start offset, should be deleted + // Of the remaining the segments, the first can overlap the log start offset and the rest must have a base offset + // greater than the start offset. + log.updateHighWatermark(log.logEndOffset()); + log.deleteOldSegments(); + assertEquals(2, log.numberOfSegments(), "There should be 2 segments remaining"); + assertTrue(log.logSegments().iterator().next().baseOffset() <= log.logStartOffset()); + log.logSegments().forEach(segment -> { + if (log.logSegments().iterator().next() != segment) { + assertTrue(segment.baseOffset() > log.logStartOffset()); + } + }); + } + + @Test + public void testFirstUnstableOffsetNoTransactionalData() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(1024 * 1024 * 5) + .build(); + log = createLog(logDir, logConfig); + + MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, + new SimpleRecord("foo".getBytes()), + new SimpleRecord("bar".getBytes()), + new SimpleRecord("baz".getBytes())); + + log.appendAsLeader(records, 0); + assertEquals(Optional.empty(), log.firstUnstableOffset()); + } + + @Test + public void testFirstUnstableOffsetWithTransactionalData() throws IOException { + LogConfig logConfig = new LogTestUtils.LogConfigBuilder() + .withSegmentBytes(1024 * 1024 * 5) + .build(); + log = createLog(logDir, logConfig); + + long pid = 137L; + short epoch = 5; + int seq = 0; + + // add some transactional records + MemoryRecords records = MemoryRecords.withTransactionalRecords( + Compression.NONE, pid, epoch, seq, + new SimpleRecord("foo".getBytes()), + new SimpleRecord("bar".getBytes()), + new SimpleRecord("baz".getBytes())); + + LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0); + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + + // add more transactional records + seq += 3; + log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq, + new SimpleRecord("blah".getBytes())), 0); + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + + // now transaction is committed + LogAppendInfo commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch, + ControlRecordType.COMMIT, mockTime.milliseconds(), 0, 0); + + // first unstable offset is not updated until the high watermark is advanced + assertEquals(Optional.of(firstAppendInfo.firstOffset()), log.firstUnstableOffset()); + log.updateHighWatermark(commitAppendInfo.lastOffset() + 1); + + // now there should be no first unstable offset + assertEquals(Optional.empty(), log.firstUnstableOffset()); + } + + private void append(int epoch, long startOffset, int count) { + Function records = i -> + TestUtils.records(List.of(new SimpleRecord("value".getBytes())), startOffset + i, epoch); + for (int i = 0; i < count; i++) { + log.appendAsFollower(records.apply(i), epoch); + } + } + + private LeaderEpochFileCache epochCache(UnifiedLog log) { + return log.leaderEpochCache(); + } + + private UnifiedLog createLog(File dir, LogConfig config) throws IOException { + return createLog(dir, config, false); + } + + private UnifiedLog createLog(File dir, LogConfig config, boolean remoteStorageSystemEnable) throws IOException { + return createLog(dir, config, this.brokerTopicStats, mockTime.scheduler, this.mockTime, + this.producerStateManagerConfig, Optional.empty(), remoteStorageSystemEnable); + } + + private UnifiedLog createLog( + File dir, + LogConfig config, + BrokerTopicStats brokerTopicStats, + Scheduler scheduler, + MockTime time, + ProducerStateManagerConfig producerStateManagerConfig, + Optional topicId, + boolean remoteStorageSystemEnable) throws IOException { + + UnifiedLog log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, 0L, 0L, + 3600000, producerStateManagerConfig, + TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, true, topicId, + new ConcurrentHashMap<>(), remoteStorageSystemEnable, LogOffsetsListener.NO_OP_OFFSETS_LISTENER); + + this.logsToClose.add(log); + return log; + } } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java index 4c75272edd4af..a9e49dd04925e 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java @@ -35,6 +35,7 @@ * Helper functions for writing unit tests. *

* Package-private: Not intended for use outside {@code org.apache.kafka.common.test}. + * Use {@code org/apache/kafka/test/TestUtils} instead. */ class TestUtils { private static final Logger log = LoggerFactory.getLogger(TestUtils.class);