Skip to content

Commit 80b9377

Browse files
authored
fix(logCleaner): optimize write buffer management and clear buffer before use (#2704)
1 parent 32c51bd commit 80b9377

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

core/src/main/scala/kafka/log/LogCleaner.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,9 @@ private[log] class Cleaner(val id: Int,
12241224
}
12251225
for (batch <- fetchDataInfo.records.batches().asScala) {
12261226
checkDone(topicPartition)
1227+
1228+
writeBuffer.clear();
1229+
12271230
val records = MemoryRecords.readableRecords(batch.asInstanceOf[DefaultRecordBatch].buffer())
12281231
throttler.maybeThrottle(records.sizeInBytes)
12291232
val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize, decompressionBufferSupplier)
@@ -1241,7 +1244,13 @@ private[log] class Cleaner(val id: Int,
12411244
dest.append(result.maxOffset, result.maxTimestamp, result.shallowOffsetOfMaxTimestamp(), retained)
12421245
throttler.maybeThrottle(outputBuffer.limit())
12431246
}
1247+
1248+
// Grow the write buffer if needed, avoid always allocate a new buffer.
1249+
if (result.outputBuffer.capacity() > this.writeBuffer.capacity()) {
1250+
this.writeBuffer = ByteBuffer.allocate(result.outputBuffer.capacity())
1251+
}
12441252
}
1253+
restoreBuffers()
12451254
}
12461255

12471256
private def buildOffsetMapForSegmentV2(topicPartition: TopicPartition,

0 commit comments

Comments
 (0)