Skip to content

Commit 41611b4

Browse files
authored
MINOR: Followup KAFKA-19112 document updated (#20492)
Some sections are not very clear, and we need to update the documentation. Reviewers: TengYao Chi <[email protected]>, Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent fb0518c commit 41611b4

File tree

4 files changed

+56
-6
lines changed

4 files changed

+56
-6
lines changed

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1658,7 +1658,7 @@ class Partition(val topicPartition: TopicPartition,
16581658
def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult = inReadLock(leaderIsrUpdateLock) {
16591659
leaderLogIfLocal match {
16601660
case Some(leaderLog) =>
1661-
if (!leaderLog.config.delete)
1661+
if (!leaderLog.config.delete && leaderLog.config.compact)
16621662
throw new PolicyViolationException(s"Records of partition $topicPartition can not be deleted due to the configured policy")
16631663

16641664
val convertedOffset = if (offset == DeleteRecordsRequest.HIGH_WATERMARK)

core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import com.yammer.metrics.core.Metric
2121
import kafka.log.LogManager
2222
import kafka.server._
2323
import kafka.utils._
24-
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException}
24+
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, PolicyViolationException, UnknownLeaderEpochException}
2525
import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData}
2626
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
2727
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@@ -61,7 +61,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec
6161
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
6262
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
6363
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
64-
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
64+
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
6565
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
6666
import org.junit.jupiter.params.ParameterizedTest
6767
import org.junit.jupiter.params.provider.ValueSource
@@ -4030,4 +4030,46 @@ class PartitionTest extends AbstractPartitionTest {
40304030
alterPartitionManager)
40314031
partition.tryCompleteDelayedRequests()
40324032
}
4033+
4034+
@Test
4035+
def testDeleteRecordsOnLeaderWithEmptyPolicy(): Unit = {
4036+
val leaderEpoch = 5
4037+
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
4038+
4039+
val emptyPolicyConfig = new LogConfig(util.Map.of(
4040+
TopicConfig.CLEANUP_POLICY_CONFIG, ""
4041+
))
4042+
4043+
val mockLog = mock(classOf[UnifiedLog])
4044+
when(mockLog.config).thenReturn(emptyPolicyConfig)
4045+
when(mockLog.logEndOffset).thenReturn(2L)
4046+
when(mockLog.logStartOffset).thenReturn(0L)
4047+
when(mockLog.highWatermark).thenReturn(2L)
4048+
when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)
4049+
4050+
partition.setLog(mockLog, false)
4051+
4052+
val result = partition.deleteRecordsOnLeader(1L)
4053+
assertEquals(1L, result.requestedOffset)
4054+
}
4055+
4056+
@Test
4057+
def testDeleteRecordsOnLeaderWithCompactPolicy(): Unit = {
4058+
val leaderEpoch = 5
4059+
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
4060+
4061+
val emptyPolicyConfig = new LogConfig(util.Map.of(
4062+
TopicConfig.CLEANUP_POLICY_CONFIG, "compact"
4063+
))
4064+
4065+
val mockLog = mock(classOf[UnifiedLog])
4066+
when(mockLog.config).thenReturn(emptyPolicyConfig)
4067+
when(mockLog.logEndOffset).thenReturn(2L)
4068+
when(mockLog.logStartOffset).thenReturn(0L)
4069+
when(mockLog.highWatermark).thenReturn(2L)
4070+
when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)
4071+
4072+
partition.setLog(mockLog, false)
4073+
assertThrows(classOf[PolicyViolationException], () => partition.deleteRecordsOnLeader(1L))
4074+
}
40334075
}

docs/upgrade.html

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,17 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
136136
settings.
137137
</li>
138138
<li>
139-
The <code>cleanup.policy</code> is empty and <code>remote.storage.enable</code> is set to true, the
139+
<code>cleanup.policy</code> now supports empty values, which means infinite retention.
140+
This is equivalent to setting <code>retention.ms=-1</code> and <code>retention.bytes=-1</code>
141+
<br>
142+
If <code>cleanup.policy</code> is empty and <code>remote.storage.enable</code> is set to true, the
140143
local log segments will be cleaned based on the values of <code>log.local.retention.bytes</code> and
141144
<code>log.local.retention.ms</code>.
145+
<br>
146+
If <code>cleanup.policy</code> is empty and <code>remote.storage.enable</code> is set to false,
147+
local log segments will not be deleted automatically. However, records can still be deleted
148+
explicitly through <code>deleteRecords</code> API calls, which will advance the log start offset
149+
and remove the corresponding log segments.
142150
</li>
143151
</ul>
144152
</li>

storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1934,8 +1934,8 @@ public int deleteOldSegments() throws IOException {
19341934
deleteRetentionSizeBreachedSegments() +
19351935
deleteRetentionMsBreachedSegments();
19361936
} else {
1937-
// If cleanup.policy is empty and remote storage is disabled, we should not delete any local
1938-
// log segments
1937+
// If cleanup.policy is empty and remote storage is disabled, we should not delete any local log segments
1938+
// unless the log start offset advances through deleteRecords
19391939
return deleteLogStartOffsetBreachedSegments();
19401940
}
19411941
}

0 commit comments

Comments
 (0)