Skip to content

Commit 1df2ac5

Browse files
authored
KAFKA-19012 Fix rare producer message corruption, don't reuse buffers on the client in certain error cases (#21065)
Client versions 2.8.0 and later are affected by a [change](30bc21c) that exposes a latent bug in how BufferPool is used (BufferPool is a class used on the client side to allocate memory in ByteBuffers, for performance it will reuse them with the caller of the class doing manual memory management by calling free when they are done with the memory). The bug is that a pooled ByteBuffer can be freed while it is still in use by the network sending thread - this early freeing can happen when batches expire / brokers are disconnecting from clients. This bug has existed for more than a decade (since Kafka 0.x it seems), but never manifested because prior to 2.8.0 the pooled ByteBuffer (which contained record data aka your publishes) was copied into a freshly allocated ByteBuffer before any potential reuse and that fresh ByteBuffer was what got written over the network to the broker. With a change included in 2.8.0, the pooled ByteBuffer remains as-is inside of a MemoryRecords instance and this pooled ByteBuffer (which in some cases can be reused and overwritten with other data) is written over the network. Two contributing factors are that the checksum for Kafka records only includes the key/value/headers/etc and not the topic so there is no protection there, and also an implementation detail is that, also newly in the commit that exposed the bug, the produce request header (which includes the topic and partition of a group of message batches) is serialized in a buffer separately from the messages themselves (and the latter is what gets put in the pooled ByteBuffer) which allows you to get messages misrouted to a random recently used topic as opposed to simple duplicate messages on their intended topic. The key change is in Sender.sendProducerData, we cannot allow the pooled ByteBuffer to be reused for expired in-flight batches until the request completes. For these batches we avoid deallocating the buffer in the normal failBatch call, deferring it until we call completeBatch (or a different path of failBatch). There are some automated tests to cover this, and also manual testing done to reproduce the issue from KAFKA-19012 and verify that this is sufficient to stop it. Reviewers: Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent 5bb44d6 commit 1df2ac5

File tree

5 files changed

+166
-38
lines changed

5 files changed

+166
-38
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
7272
private final AtomicInteger attempts = new AtomicInteger(0);
7373
private final boolean isSplitBatch;
7474
private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);
75+
private boolean bufferDeallocated = false;
76+
// Tracks if the batch has been sent to the NetworkClient
77+
private boolean inflight = false;
7578

7679
int recordCount;
7780
int maxRecordSize;
@@ -581,6 +584,22 @@ public boolean sequenceHasBeenReset() {
581584
return reopened;
582585
}
583586

587+
public boolean isBufferDeallocated() {
588+
return bufferDeallocated;
589+
}
590+
591+
public void markBufferDeallocated() {
592+
bufferDeallocated = true;
593+
}
594+
595+
public boolean isInflight() {
596+
return inflight;
597+
}
598+
599+
public void setInflight(boolean inflight) {
600+
this.inflight = inflight;
601+
}
602+
584603
// VisibleForTesting
585604
OptionalInt currentLeaderEpoch() {
586605
return currentLeaderEpoch;

clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1027,14 +1027,39 @@ BuiltInPartitioner createBuiltInPartitioner(LogContext logContext, String topic,
10271027
}
10281028

10291029
/**
1030-
* Deallocate the record batch
1030+
* Complete and deallocate the record batch
1031+
*/
1032+
public void completeAndDeallocateBatch(ProducerBatch batch) {
1033+
completeBatch(batch);
1034+
deallocate(batch);
1035+
}
1036+
1037+
/**
1038+
* Only perform deallocation (and not removal from the incomplete set)
10311039
*/
10321040
public void deallocate(ProducerBatch batch) {
1033-
incomplete.remove(batch);
10341041
// Only deallocate the batch if it is not a split batch because split batch are allocated outside the
10351042
// buffer pool.
1036-
if (!batch.isSplitBatch())
1037-
free.deallocate(batch.buffer(), batch.initialCapacity());
1043+
if (!batch.isSplitBatch()) {
1044+
if (batch.isBufferDeallocated()) {
1045+
log.warn("Skipping deallocating a batch that has already been deallocated. Batch is {}, created time is {}", batch, batch.createdMs);
1046+
} else {
1047+
batch.markBufferDeallocated();
1048+
if (batch.isInflight()) {
1049+
// Create a fresh ByteBuffer to give to BufferPool to reuse since we can't safely call deallocate with the ProduceBatch's buffer
1050+
free.deallocate(ByteBuffer.allocate(batch.initialCapacity()));
1051+
throw new IllegalStateException("Attempting to deallocate a batch that is inflight. Batch is " + batch);
1052+
}
1053+
free.deallocate(batch.buffer(), batch.initialCapacity());
1054+
}
1055+
}
1056+
}
1057+
1058+
/**
1059+
* Remove from the incomplete list but do not free memory yet
1060+
*/
1061+
public void completeBatch(ProducerBatch batch) {
1062+
incomplete.remove(batch);
10381063
}
10391064

10401065
/**
@@ -1132,7 +1157,14 @@ void abortBatches(final RuntimeException reason) {
11321157
dq.remove(batch);
11331158
}
11341159
batch.abort(reason);
1135-
deallocate(batch);
1160+
if (batch.isInflight()) {
1161+
// KAFKA-19012: if the batch has been sent it might still be in use by the network client so we cannot allow it to be reused yet.
1162+
// We skip deallocating it now. When the request in network client completes with a response, either Sender.completeBatch() or
1163+
// Sender.failBatch() will be called with deallocateBatch=true. The buffer associated with the batch will be deallocated then.
1164+
completeBatch(batch);
1165+
} else {
1166+
completeAndDeallocateBatch(batch);
1167+
}
11361168
}
11371169
}
11381170

@@ -1152,7 +1184,7 @@ void abortUndrainedBatches(RuntimeException reason) {
11521184
}
11531185
if (aborted) {
11541186
batch.abort(reason);
1155-
deallocate(batch);
1187+
completeAndDeallocateBatch(batch);
11561188
}
11571189
}
11581190
}

clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,12 @@ private void maybeRemoveFromInflightBatches(ProducerBatch batch) {
171171

172172
private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
173173
maybeRemoveFromInflightBatches(batch);
174-
this.accumulator.deallocate(batch);
174+
this.accumulator.completeAndDeallocateBatch(batch);
175+
}
176+
177+
private void maybeRemoveAndDeallocateBatchLater(ProducerBatch batch) {
178+
maybeRemoveFromInflightBatches(batch);
179+
this.accumulator.completeBatch(batch);
175180
}
176181

177182
/**
@@ -354,6 +359,24 @@ private boolean shouldHandleAuthorizationError(RuntimeException exception) {
354359
return false;
355360
}
356361

362+
private void failExpiredBatches(List<ProducerBatch> expiredBatches, long now, boolean deallocateBuffer) {
363+
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
364+
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
365+
// we need to reset the producer id here.
366+
if (!expiredBatches.isEmpty())
367+
log.trace("Expired {} batches in accumulator", expiredBatches.size());
368+
for (ProducerBatch expiredBatch : expiredBatches) {
369+
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
370+
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation. "
371+
+ "The request has not been sent, or no server response has been received yet.";
372+
failBatch(expiredBatch, new TimeoutException(errorMessage), false, deallocateBuffer);
373+
if (transactionManager != null && expiredBatch.inRetry()) {
374+
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
375+
transactionManager.markSequenceUnresolved(expiredBatch);
376+
}
377+
}
378+
}
379+
357380
private long sendProducerData(long now) {
358381
MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
359382
// get the list of partitions with data ready to send
@@ -405,23 +428,10 @@ private long sendProducerData(long now) {
405428
accumulator.resetNextBatchExpiryTime();
406429
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
407430
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
408-
expiredBatches.addAll(expiredInflightBatches);
409431

410-
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
411-
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
412-
// we need to reset the producer id here.
413-
if (!expiredBatches.isEmpty())
414-
log.trace("Expired {} batches in accumulator", expiredBatches.size());
415-
for (ProducerBatch expiredBatch : expiredBatches) {
416-
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
417-
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation. "
418-
+ "The request has not been sent, or no server response has been received yet.";
419-
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
420-
if (transactionManager != null && expiredBatch.inRetry()) {
421-
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
422-
transactionManager.markSequenceUnresolved(expiredBatch);
423-
}
424-
}
432+
failExpiredBatches(expiredBatches, now, true);
433+
failExpiredBatches(expiredInflightBatches, now, false);
434+
425435
sensors.updateProduceRequestMetrics(batches);
426436

427437
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
@@ -524,6 +534,7 @@ private void maybeAbortBatches(RuntimeException exception) {
524534
if (accumulator.hasIncomplete()) {
525535
log.error("Aborting producer batches due to fatal error", exception);
526536
accumulator.abortBatches(exception);
537+
inFlightBatches.clear();
527538
}
528539
}
529540

@@ -659,6 +670,7 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
659670
*/
660671
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
661672
long now, Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo) {
673+
batch.setInflight(false);
662674
Errors error = response.error;
663675

664676
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
@@ -696,7 +708,7 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
696708
// tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
697709
// its retries -- if it did, we don't know whether the sequence number was accepted or not, and
698710
// thus it is not safe to reassign the sequence.
699-
failBatch(batch, response, batch.attempts() < this.retries);
711+
failBatch(batch, response, batch.attempts() < this.retries, true);
700712
}
701713
if (error.exception() instanceof InvalidMetadataException) {
702714
if (error.exception() instanceof UnknownTopicOrPartitionException) {
@@ -749,12 +761,16 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
749761

750762
if (batch.complete(response.baseOffset, response.logAppendTime)) {
751763
maybeRemoveAndDeallocateBatch(batch);
764+
} else {
765+
// Always safe to call deallocate because the batch keeps track of whether or not it was deallocated yet
766+
this.accumulator.deallocate(batch);
752767
}
753768
}
754769

755770
private void failBatch(ProducerBatch batch,
756771
ProduceResponse.PartitionResponse response,
757-
boolean adjustSequenceNumbers) {
772+
boolean adjustSequenceNumbers,
773+
boolean deallocateBatch) {
758774
final RuntimeException topLevelException;
759775
if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED)
760776
topLevelException = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
@@ -764,7 +780,7 @@ else if (response.error == Errors.CLUSTER_AUTHORIZATION_FAILED)
764780
topLevelException = response.error.exception(response.errorMessage);
765781

766782
if (response.recordErrors == null || response.recordErrors.isEmpty()) {
767-
failBatch(batch, topLevelException, adjustSequenceNumbers);
783+
failBatch(batch, topLevelException, adjustSequenceNumbers, deallocateBatch);
768784
} else {
769785
Map<Integer, RuntimeException> recordErrorMap = new HashMap<>(response.recordErrors.size());
770786
for (ProduceResponse.RecordError recordError : response.recordErrors) {
@@ -803,23 +819,25 @@ else if (response.error == Errors.CLUSTER_AUTHORIZATION_FAILED)
803819
}
804820
};
805821

806-
failBatch(batch, topLevelException, recordExceptions, adjustSequenceNumbers);
822+
failBatch(batch, topLevelException, recordExceptions, adjustSequenceNumbers, deallocateBatch);
807823
}
808824
}
809825

810826
private void failBatch(
811827
ProducerBatch batch,
812828
RuntimeException topLevelException,
813-
boolean adjustSequenceNumbers
829+
boolean adjustSequenceNumbers,
830+
boolean deallocateBatch
814831
) {
815-
failBatch(batch, topLevelException, batchIndex -> topLevelException, adjustSequenceNumbers);
832+
failBatch(batch, topLevelException, batchIndex -> topLevelException, adjustSequenceNumbers, deallocateBatch);
816833
}
817834

818835
private void failBatch(
819836
ProducerBatch batch,
820837
RuntimeException topLevelException,
821838
Function<Integer, RuntimeException> recordExceptions,
822-
boolean adjustSequenceNumbers
839+
boolean adjustSequenceNumbers,
840+
boolean deallocateBatch
823841
) {
824842
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
825843

@@ -833,7 +851,20 @@ private void failBatch(
833851
log.debug("Encountered error when transaction manager was handling a failed batch", e);
834852
}
835853
}
836-
maybeRemoveAndDeallocateBatch(batch);
854+
if (deallocateBatch) {
855+
maybeRemoveAndDeallocateBatch(batch);
856+
} else {
857+
// Fix for KAFKA-19012
858+
// The pooled ByteBuffer associated with this batch might still be in use by the network client so we
859+
// cannot allow it to be reused yet. We skip deallocating it now. When the request in the network client
860+
// completes with a response, either completeBatch() or failBatch() will be called with deallocateBatch=true.
861+
// The buffer associated with the batch will be deallocated then.
862+
maybeRemoveAndDeallocateBatchLater(batch);
863+
}
864+
} else {
865+
if (deallocateBatch) {
866+
this.accumulator.deallocate(batch);
867+
}
837868
}
838869
}
839870

@@ -886,6 +917,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
886917
.setIndex(tp.partition())
887918
.setRecords(records));
888919
recordsByPartition.put(tp, batch);
920+
batch.setInflight(true);
889921
}
890922

891923
String transactionalId = null;

clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ public void testStressfulSituation() throws Exception {
405405
for (ProducerBatch batch : batches) {
406406
for (@SuppressWarnings("UnusedLocalVariable") Record ignored : batch.records().records())
407407
read++;
408-
accum.deallocate(batch);
408+
accum.completeAndDeallocateBatch(batch);
409409
}
410410
}
411411
}
@@ -669,7 +669,7 @@ public void testFlush() throws Exception {
669669

670670
for (List<ProducerBatch> batches: results.values())
671671
for (ProducerBatch batch: batches)
672-
accum.deallocate(batch);
672+
accum.completeAndDeallocateBatch(batch);
673673

674674
// should be complete with no unsent records.
675675
accum.awaitFlushCompletion();
@@ -1575,7 +1575,7 @@ private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSi
15751575
assertEquals(1, batches.values().iterator().next().size());
15761576
ProducerBatch batch = batches.values().iterator().next().get(0);
15771577
int numSplitBatches = accum.splitAndReenqueue(batch);
1578-
accum.deallocate(batch);
1578+
accum.completeAndDeallocateBatch(batch);
15791579

15801580
return numSplitBatches;
15811581
}
@@ -1599,7 +1599,7 @@ private BatchDrainedResult completeOrSplitBatches(RecordAccumulator accum, int b
15991599
} else {
16001600
batch.complete(0L, 0L);
16011601
}
1602-
accum.deallocate(batch);
1602+
accum.completeAndDeallocateBatch(batch);
16031603
}
16041604
}
16051605
} while (batchDrained);

0 commit comments

Comments
 (0)