Skip to content

Commit 24d03a1

Browse files
authored
KAFKA-19517: Include control records in LoadSummary#numRecords (#20206)
## Summary jira: https://issues.apache.org/jira/browse/KAFKA-19517 Ensure `LoadSummary#numRecords` counts all records, including control batches, to maintain consistency with numBytes. ## Test `testLoading` now verifies `numRecords`. Reviewers: Chia-Ping Tsai <[email protected]>, TengYao Chi <[email protected]>
1 parent 5059819 commit 24d03a1

File tree

2 files changed

+6
-1
lines changed

2 files changed

+6
-1
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ private ReplayResult processMemoryRecords(
214214
for (MutableRecordBatch batch : memoryRecords.batches()) {
215215
if (batch.isControlBatch()) {
216216
for (Record record : batch) {
217+
loadStats.numRecords++;
218+
217219
ControlRecordType controlRecord = ControlRecordType.parse(record.key());
218220
if (controlRecord == ControlRecordType.COMMIT) {
219221
if (LOG.isTraceEnabled()) {

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorLoaderImplTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ void testLoading() throws Exception {
183183
when(log.read(8L, 1000, FetchIsolation.LOG_END, true))
184184
.thenReturn(readResult5);
185185

186-
assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS));
186+
CoordinatorLoader.LoadSummary summary = loader.load(tp, coordinator).get(10, TimeUnit.SECONDS);
187+
assertNotNull(summary);
188+
// Includes 7 normal + 2 control (COMMIT, ABORT)
189+
assertEquals(9, summary.numRecords());
187190

188191
verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k1", "v1"));
189192
verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Tuple<>("k2", "v2"));

0 commit comments

Comments
 (0)