Skip to content

Commit 201c6eb

Browse files
authored
fix(log): Prevent potential offset overflow in ElasticLogSegment (#2720)
* fix(log): Prevent potential offset overflow in ElasticLogSegment This commit addresses an issue where a log segment could accommodate more than Integer.MAX_VALUE records, leading to a potential integer overflow when calculating relative offsets. The root cause was that the check `offset - baseOffset <= Integer.MAX_VALUE` allowed a relative offset to be exactly `Integer.MAX_VALUE`. Since offsets are 0-based, this allows for `Integer.MAX_VALUE + 1` records, which cannot be represented by a standard Integer. This fix implements the following changes: 1. In `ElasticLogSegment`, the offset validation is changed from `<=` to `< Integer.MAX_VALUE` to ensure the relative offset strictly fits within an Integer's bounds. 2. In `LogCleaner`, a new segment grouping method `groupSegmentsBySizeV2` is introduced for `ElasticUnifiedLog`. This method uses the same stricter offset check to prevent incorrectly grouping segments that would exceed the offset limit. 3. The corresponding unit tests in `LogCleanerTest` have been updated to reflect these new boundaries and validate the fix. Fixes: #2718 * fix(logCleaner): unify segment grouping logic * fix(logCleaner): extract offset range check for segment grouping to prevent overflow in ElasticLogSegment * style(logCleaner): fix indentation in segment grouping condition for readability * style(logCleaner): fix line break in offset range check for readability * chore: add AutoMQ inject * style(logCleaner): remove unnecessary blank line after segment grouping * fix(stream): validate record batch count to prevent negative values in append
1 parent 014d2d2 commit 201c6eb

File tree

4 files changed

+80
-15
lines changed

4 files changed

+80
-15
lines changed

core/src/main/scala/kafka/log/LogCleaner.scala

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,31 @@
1717

1818
package kafka.log
1919

20-
import java.io.{File, IOException}
21-
import java.nio._
22-
import java.util.Date
23-
import java.util.concurrent.TimeUnit
2420
import kafka.common._
25-
import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
21+
import kafka.log.LogCleaner._
2622
import kafka.log.streamaspect.ElasticLogSegment
2723
import kafka.server.{BrokerReconfigurable, KafkaConfig}
2824
import kafka.utils.{Logging, Pool}
29-
import org.apache.kafka.common.{KafkaException, TopicPartition}
3025
import org.apache.kafka.common.config.ConfigException
3126
import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
3227
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
3328
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
3429
import org.apache.kafka.common.record._
3530
import org.apache.kafka.common.utils.{BufferSupplier, Time}
31+
import org.apache.kafka.common.{KafkaException, TopicPartition}
3632
import org.apache.kafka.server.config.ServerConfigs
3733
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3834
import org.apache.kafka.server.util.ShutdownableThread
39-
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, TransactionIndex}
35+
import org.apache.kafka.storage.internals.log._
4036
import org.apache.kafka.storage.internals.utils.Throttler
4137

42-
import scala.jdk.CollectionConverters._
38+
import java.io.{File, IOException}
39+
import java.nio._
40+
import java.util.Date
41+
import java.util.concurrent.TimeUnit
4342
import scala.collection.mutable.ListBuffer
4443
import scala.collection.{Iterable, Seq, Set, mutable}
44+
import scala.jdk.CollectionConverters._
4545
import scala.util.control.ControlThrowable
4646

4747
/**
@@ -977,6 +977,18 @@ private[log] class Cleaner(val id: Int,
977977
private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int, firstUncleanableOffset: Long): List[Seq[LogSegment]] = {
978978
var grouped = List[List[LogSegment]]()
979979
var segs = segments.toList
980+
981+
// AutoMQ inject start
982+
def isOffsetRangeValid(group: List[LogSegment]) = {
983+
val offsetRange = lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset
984+
// For ElasticLogSegment, use a stricter offset range check (`< Int.MaxValue`) to prevent a potential overflow
985+
// issue as described in https://github.com/AutoMQ/automq/issues/2717.
986+
// For other segment types, the original less-strict check (`<= Int.MaxValue`) is retained.
987+
if (group.last.isInstanceOf[ElasticLogSegment]) offsetRange < Int.MaxValue
988+
else offsetRange <= Int.MaxValue
989+
}
990+
// AutoMQ inject end
991+
980992
while (segs.nonEmpty) {
981993
var group = List(segs.head)
982994
var logSize = segs.head.size.toLong
@@ -990,7 +1002,9 @@ private[log] class Cleaner(val id: Int,
9901002
//if first segment size is 0, we don't need to do the index offset range check.
9911003
//this will avoid empty log left every 2^31 message.
9921004
(segs.head.size == 0 ||
993-
lastOffsetForFirstSegment(segs, firstUncleanableOffset) - group.last.baseOffset <= Int.MaxValue)) {
1005+
// AutoMQ inject start
1006+
isOffsetRangeValid(group))) {
1007+
// AutoMQ inject end
9941008
group = segs.head :: group
9951009
logSize += segs.head.size
9961010
indexSize += offsetIndexSize(segs.head)

core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.kafka.storage.internals.log.LogFileUtils;
4040
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
4141
import org.apache.kafka.storage.internals.log.LogSegment;
42+
import org.apache.kafka.storage.internals.log.LogSegmentOffsetOverflowException;
4243
import org.apache.kafka.storage.internals.log.OffsetIndex;
4344
import org.apache.kafka.storage.internals.log.OffsetPosition;
4445
import org.apache.kafka.storage.internals.log.ProducerAppendInfo;
@@ -194,9 +195,26 @@ public int size() {
194195
return log.sizeInBytes();
195196
}
196197

198+
/**
199+
* Checks that the argument offset can be represented as an integer offset relative to the baseOffset.
200+
* This method is similar in purpose to {@see org.apache.kafka.storage.internals.log.LogSegment#canConvertToRelativeOffset}.
201+
* <p>
202+
* The implementation is inspired by {@see org.apache.kafka.storage.internals.log.AbstractIndex#canAppendOffset},
203+
* but uses {@code < Integer.MAX_VALUE} instead of {@code <= Integer.MAX_VALUE} to address an offset overflow issue.
204+
*
205+
* @param offset The offset to check.
206+
* @return true if the offset can be converted, false otherwise.
207+
* @see <a href="https://github.com/AutoMQ/automq/issues/2718">Issue #2718</a>
208+
*/
197209
private boolean canConvertToRelativeOffset(long offset) {
198210
long relativeOffset = offset - baseOffset;
199-
return relativeOffset >= 0 && relativeOffset <= Integer.MAX_VALUE;
211+
// Note: The check is `relativeOffset < Integer.MAX_VALUE` instead of `<=` to avoid overflow.
212+
// See https://github.com/AutoMQ/automq/issues/2718 for details.
213+
return relativeOffset >= 0 && relativeOffset < Integer.MAX_VALUE;
214+
}
215+
private void ensureOffsetInRange(long offset) throws IOException {
216+
if (!canConvertToRelativeOffset(offset))
217+
throw new LogSegmentOffsetOverflowException(this, offset);
200218
}
201219

202220
@Override
@@ -216,6 +234,8 @@ public void append(
216234
meta.firstBatchTimestamp(largestTimestampMs);
217235
}
218236

237+
ensureOffsetInRange(largestOffset);
238+
219239
// append the messages
220240
long appendedBytes = log.append(records, largestOffset + 1);
221241
if (LOGGER.isTraceEnabled()) {

core/src/test/scala/unit/kafka/log/LogCleanerTest.scala

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils
3030
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
3131
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
3232
import org.apache.kafka.server.util.MockTime
33-
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
33+
import org.apache.kafka.storage.internals.log._
3434
import org.apache.kafka.storage.internals.utils.Throttler
3535
import org.junit.jupiter.api.Assertions._
3636
import org.junit.jupiter.api.{AfterEach, Test}
@@ -1437,8 +1437,20 @@ class LogCleanerTest extends Logging {
14371437
//create 3 segments
14381438
for (i <- 0 until 3) {
14391439
log.appendAsLeader(TestUtils.singletonRecords(value = v, key = k), leaderEpoch = 0)
1440-
//0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
1441-
val records = messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
1440+
1441+
// AutoMQ inject start
1442+
val records = if (log.isInstanceOf[ElasticUnifiedLog]) {
1443+
// Create a sparse segment by appending a record with a large offset.
1444+
// A segment can contain up to Int.MaxValue messages (see https://github.com/AutoMQ/automq/issues/2717).
1445+
// The offset `(i + 1L) * Int.MaxValue - 1` ensures that this is the last message of the i-th segment,
1446+
// creating a large gap to the next segment's base offset. This helps test segment grouping with sparse offsets.
1447+
messageWithOffset(k, v, (i + 1L) * Int.MaxValue - 1)
1448+
} else {
1449+
//0 to Int.MaxValue is Int.MaxValue+1 message, -1 will be the last message of i-th segment
1450+
messageWithOffset(k, v, (i + 1L) * (Int.MaxValue + 1L) -1 )
1451+
}
1452+
1453+
// AutoMQ inject end
14421454
log.appendAsFollower(records)
14431455
assertEquals(i + 1, log.numberOfSegments)
14441456
}
@@ -1469,6 +1481,13 @@ class LogCleanerTest extends Logging {
14691481
//trigger a clean and 2 empty segments should cleaned to 1
14701482
cleaner.clean(LogToClean(log.topicPartition, log, 0, firstUncleanableOffset))
14711483
assertEquals(totalSegments - 1, log.numberOfSegments)
1484+
1485+
// AutoMQ inject start
1486+
1487+
// after clean, the 2nd and 3rd segment should be none empty
1488+
assertEquals(2, log.logSegments.asScala.takeRight(2).count(_.size > 0))
1489+
1490+
// AutoMQ inject end
14721491
}
14731492

14741493
/**
@@ -1492,12 +1511,21 @@ class LogCleanerTest extends Logging {
14921511
log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
14931512

14941513
// forward offset and append message to next segment at offset Int.MaxValue
1495-
val records = messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue - 1)
1514+
// AutoMQ inject start
1515+
val records = if (log.isInstanceOf[ElasticUnifiedLog]) {
1516+
messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue - 2)
1517+
} else {
1518+
messageWithOffset("hello".getBytes, "hello".getBytes, Int.MaxValue - 1)
1519+
}
1520+
// AutoMQ inject end
1521+
14961522
log.appendAsFollower(records)
14971523
log.appendAsLeader(TestUtils.singletonRecords(value = "hello".getBytes, key = "hello".getBytes), leaderEpoch = 0)
14981524

14991525
// AutoMQ inject start
1500-
if (!log.isInstanceOf[ElasticUnifiedLog]) {
1526+
if (log.isInstanceOf[ElasticUnifiedLog]) {
1527+
assertEquals(Int.MaxValue - 1, log.activeSegment.readNextOffset() - 1)
1528+
} else {
15011529
assertEquals(Int.MaxValue, log.activeSegment.offsetIndex.lastOffset)
15021530
}
15031531
// AutoMQ inject end

s3stream/src/main/java/com/automq/stream/s3/S3Stream.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch
183183
if (snapshotRead()) {
184184
return FutureUtil.failedFuture(new IllegalStateException("Append operation is not support for readonly stream"));
185185
}
186+
if (recordBatch.count() < 0) {
187+
return FutureUtil.failedFuture(new IllegalArgumentException("record batch count is negative"));
188+
}
186189
long startTimeNanos = System.nanoTime();
187190
readLock.lock();
188191
try {

0 commit comments

Comments
 (0)