-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-17862: [buffer pool] corruption during buffer reuse from the pool #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,270 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.kafka.clients.producer; | ||
|
|
||
| import org.apache.kafka.clients.ApiVersions; | ||
| import org.apache.kafka.clients.ClientUtils; | ||
| import org.apache.kafka.clients.CommonClientConfigs; | ||
| import org.apache.kafka.clients.KafkaClient; | ||
| import org.apache.kafka.clients.producer.internals.BufferPool; | ||
| import org.apache.kafka.clients.producer.internals.ProducerInterceptors; | ||
| import org.apache.kafka.clients.producer.internals.ProducerMetadata; | ||
| import org.apache.kafka.clients.producer.internals.ProducerMetrics; | ||
| import org.apache.kafka.clients.producer.internals.RecordAccumulator; | ||
| import org.apache.kafka.clients.producer.internals.Sender; | ||
| import org.apache.kafka.common.compress.NoCompression; | ||
| import org.apache.kafka.common.internals.ClusterResourceListeners; | ||
| import org.apache.kafka.common.internals.Plugin; | ||
| import org.apache.kafka.common.metrics.Metrics; | ||
| import org.apache.kafka.common.metrics.MetricsReporter; | ||
| import org.apache.kafka.common.metrics.Sensor; | ||
| import org.apache.kafka.common.serialization.ByteArraySerializer; | ||
| import org.apache.kafka.common.serialization.Serializer; | ||
| import org.apache.kafka.common.test.ClusterInstance; | ||
| import org.apache.kafka.common.test.TestUtils; | ||
| import org.apache.kafka.common.test.api.ClusterConfigProperty; | ||
| import org.apache.kafka.common.test.api.ClusterTest; | ||
| import org.apache.kafka.common.utils.KafkaThread; | ||
| import org.apache.kafka.common.utils.LogContext; | ||
| import org.apache.kafka.common.utils.Time; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.time.Duration; | ||
| import java.util.Arrays; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.concurrent.ExecutionException; | ||
|
|
||
|
|
||
| public class ProducerIntegrationTest { | ||
|
|
||
| @ClusterTest(serverProperties = { | ||
| @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), | ||
| }) | ||
| public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster) throws InterruptedException, | ||
| ExecutionException { | ||
| String topic = "test-topic"; | ||
| cluster.createTopic("test-topic", 1, (short) 1); | ||
| try (var producer = expireProducer(cluster)) { | ||
| producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())).get(); | ||
| } | ||
| try (var consumer = cluster.consumer()) { | ||
| consumer.subscribe(List.of(topic)); | ||
| TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data"); | ||
| } | ||
|
|
||
| } | ||
|
|
||
|
|
||
| @SuppressWarnings({"unchecked", "this-escape"}) | ||
| private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) { | ||
| Map<String, Object> config = Map.of( | ||
| ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(), | ||
| ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(), | ||
| ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), | ||
| ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false, | ||
| ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000, | ||
| ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500 | ||
| ); | ||
| return new EvilKafkaProducerBuilder().build(config); | ||
| } | ||
|
|
||
| static class EvilKafkaProducerBuilder { | ||
|
|
||
| Serializer<byte[]> serializer = new ByteArraySerializer(); | ||
| ApiVersions apiVersions = new ApiVersions(); | ||
| LogContext logContext = new LogContext("[expire Producer test ]"); | ||
| Metrics metrics = new Metrics(Time.SYSTEM); | ||
|
|
||
| String clientId; | ||
| String transactionalId; | ||
| ProducerConfig config; | ||
| ProducerMetadata metadata; | ||
| RecordAccumulator accumulator; | ||
| Partitioner partitioner; | ||
| Sender sender; | ||
| ProducerInterceptors<String, String> interceptors; | ||
|
|
||
| @SuppressWarnings({"unchecked", "this-escape"}) | ||
| Producer<byte[], byte[]> build(Map<String, Object> configs) { | ||
| this.config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null)); | ||
| transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG); | ||
| clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); | ||
| return new KafkaProducer<>( | ||
| config, | ||
| logContext, | ||
| metrics, | ||
| serializer, | ||
| serializer, | ||
| buildMetadata(), | ||
| buildAccumulator(), | ||
| null, | ||
| buildSender(), | ||
| buildInterceptors(), | ||
| buildPartition(), | ||
| Time.SYSTEM, | ||
| ioThread(), | ||
| Optional.empty() | ||
| ); | ||
| } | ||
|
|
||
|
|
||
| private ProducerInterceptors buildInterceptors() { | ||
| this.interceptors = new ProducerInterceptors<>(List.of(), metrics); | ||
| return this.interceptors; | ||
| } | ||
|
|
||
| private Partitioner buildPartition() { | ||
| this.partitioner = config.getConfiguredInstance( | ||
| ProducerConfig.PARTITIONER_CLASS_CONFIG, | ||
| Partitioner.class, | ||
| Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); | ||
| return this.partitioner; | ||
| } | ||
|
|
||
| private Sender buildSender() { | ||
| int maxInflightRequests = config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); | ||
| int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); | ||
| ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); | ||
| Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics); | ||
| KafkaClient client = ClientUtils.createNetworkClient(config, | ||
| this.metrics, | ||
| "producer", | ||
| logContext, | ||
| apiVersions, | ||
| Time.SYSTEM, | ||
| maxInflightRequests, | ||
| metadata, | ||
| throttleTimeSensor, | ||
| null); | ||
|
|
||
| short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG)); | ||
| this.sender = new Sender(logContext, | ||
| client, | ||
| metadata, | ||
| this.accumulator, | ||
| maxInflightRequests == 1, | ||
| config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), | ||
| acks, | ||
| config.getInt(ProducerConfig.RETRIES_CONFIG), | ||
| metricsRegistry.senderMetrics, | ||
| Time.SYSTEM, | ||
| requestTimeoutMs, | ||
| config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), | ||
| null) { | ||
| @Override | ||
| protected long sendProducerData(long now) { | ||
| long result = super.sendProducerData(now); | ||
| try { | ||
| // Ensure the batch expires. | ||
| Thread.sleep(500); | ||
| return result; | ||
| } catch (InterruptedException e) { | ||
| throw new RuntimeException(e); | ||
|
Comment on lines
+178
to
+179
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interrupted Thread HandlingThread interruption status is lost when InterruptedException is caught and wrapped in RuntimeException. This violates thread interruption contract and can lead to threads that cannot be properly terminated, potentially causing resource leaks. Standards
|
||
| } | ||
| } | ||
|
Comment on lines
+172
to
+181
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread Sleep InjectionThe test overrides sendProducerData to introduce a 500ms delay, ensuring batches expire while still in flight. This deliberate timing manipulation creates the conditions necessary to reproduce the race condition where buffers are deallocated while still being used by the network layer. Standards
|
||
| }; | ||
| return this.sender; | ||
| } | ||
|
|
||
| private RecordAccumulator buildAccumulator() { | ||
| long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); | ||
| long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); | ||
| int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG)); | ||
| Plugin<Partitioner> partitionerPlugin = Plugin.wrapInstance( | ||
| config.getConfiguredInstance( | ||
| ProducerConfig.PARTITIONER_CLASS_CONFIG, | ||
| Partitioner.class, | ||
| Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), | ||
| metrics, | ||
| ProducerConfig.PARTITIONER_CLASS_CONFIG); | ||
| boolean enableAdaptivePartitioning = partitionerPlugin.get() == null && | ||
| config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG); | ||
| this.accumulator = new RecordAccumulator(logContext, | ||
| batchSize, | ||
| NoCompression.NONE, | ||
| (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE), | ||
| retryBackoffMs, | ||
| retryBackoffMaxMs, | ||
| config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), | ||
| new RecordAccumulator.PartitionerConfig( | ||
| enableAdaptivePartitioning, | ||
| config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG) | ||
| ), | ||
| metrics, | ||
| "producer-metrics", | ||
| Time.SYSTEM, | ||
| null, | ||
| new EvilBufferPool(config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG), batchSize, metrics, | ||
| Time.SYSTEM, "producer-metrics")); | ||
| return accumulator; | ||
| } | ||
|
|
||
| private ProducerMetadata buildMetadata() { | ||
| long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); | ||
| long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); | ||
| List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config); | ||
| ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners( | ||
| List.of(), | ||
| reporters, | ||
| List.of( | ||
| Plugin.wrapInstance(serializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).get(), | ||
| Plugin.wrapInstance(serializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).get())); | ||
| this.metadata = new ProducerMetadata(retryBackoffMs, | ||
| retryBackoffMaxMs, | ||
| config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), | ||
| config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG), | ||
| logContext, | ||
| clusterResourceListeners, | ||
| Time.SYSTEM); | ||
| metadata.bootstrap(ClientUtils.parseAndValidateAddresses(config)); | ||
| return metadata; | ||
| } | ||
|
|
||
| private KafkaThread ioThread() { | ||
| KafkaThread ioThread = new KafkaThread("test_io_thread", sender, true); | ||
| ioThread.start(); | ||
| return ioThread; | ||
| } | ||
| } | ||
|
|
||
| static class EvilBufferPool extends BufferPool { | ||
|
Comment on lines
+88
to
+247
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Evil Class NamingUsing 'Evil' prefix in class names doesn't clearly communicate the test class purpose or behavior. More descriptive names like 'CorruptingBufferPool' or 'DelayingSenderProducerBuilder' would better communicate the test component's intended behavior and failure mode. Standards
|
||
|
|
||
| public EvilBufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) { | ||
| super(memory, poolableSize, metrics, time, metricGrpName); | ||
| } | ||
|
|
||
| /** | ||
| * Override deallocate to intentionally corrupt the ByteBuffer being returned to the pool. | ||
| * This is used to simulate a scenario where an in-flight buffer is mistakenly reused | ||
| * and its contents are unexpectedly modified, helping expose buffer reuse bugs. | ||
| */ | ||
| @Override | ||
| public void deallocate(ByteBuffer buffer, int size) { | ||
| // Ensure atomicity using reentrant behavior | ||
| lock.lock(); | ||
| try { | ||
| Arrays.fill(buffer.array(), (byte) 0); | ||
| super.deallocate(buffer, size); | ||
|
Comment on lines
+263
to
+264
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Buffer Corruption RiskThe EvilBufferPool implementation zeroes out buffer contents before returning it to the pool, but after acquiring the lock. This creates a race condition where a buffer could be corrupted while still in use by another thread, causing data corruption. Standards
|
||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
|
Comment on lines
+258
to
+267
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Buffer Reuse VulnerabilityThe test demonstrates a critical buffer reuse vulnerability where deallocated buffers are zeroed out before returning to the pool, but in-flight batches may still reference these buffers. This can lead to data corruption when buffers are reused while still referenced by in-flight requests, potentially causing message integrity issues or information disclosure. Standards
|
||
| } | ||
|
Comment on lines
+247
to
+268
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Improve Test DocumentationWhile the test class has good documentation about simulating buffer corruption, it should clarify that this implementation intentionally introduces a race condition to test the system's resilience against buffer corruption scenarios. Standards
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -48,7 +48,7 @@ public class BufferPool { | |||||||||||||
|
|
||||||||||||||
| private final long totalMemory; | ||||||||||||||
| private final int poolableSize; | ||||||||||||||
| private final ReentrantLock lock; | ||||||||||||||
| protected final ReentrantLock lock; | ||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changing the lock to protected seems reasonable to allow subclasses to access it, but consider if there are other ways to achieve the same goal without exposing the lock directly. If this is the best approach, please add a comment explaining why this change was necessary and how subclasses should use the lock safely.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lock Visibility ChangeChanging lock visibility from private to protected exposes internal synchronization mechanism. This creates maintenance risk as subclasses can now directly access and manipulate the lock, potentially breaking thread-safety guarantees of the BufferPool class. Standards
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Protected Field AccessThe lock field visibility is changed from private to protected to enable the test class to properly synchronize buffer zeroing operations. This change ensures that the test can accurately simulate and detect the buffer corruption issue while maintaining thread safety. Standards
|
||||||||||||||
| private final Deque<ByteBuffer> free; | ||||||||||||||
| private final Deque<Condition> waiters; | ||||||||||||||
| /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */ | ||||||||||||||
|
|
||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -172,43 +172,6 @@ private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) { | |||||||||||||||||
| this.accumulator.deallocate(batch); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Get the in-flight batches that has reached delivery timeout. | ||||||||||||||||||
| */ | ||||||||||||||||||
| private List<ProducerBatch> getExpiredInflightBatches(long now) { | ||||||||||||||||||
| List<ProducerBatch> expiredBatches = new ArrayList<>(); | ||||||||||||||||||
|
|
||||||||||||||||||
| for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt = inFlightBatches.entrySet().iterator(); batchIt.hasNext();) { | ||||||||||||||||||
| Map.Entry<TopicPartition, List<ProducerBatch>> entry = batchIt.next(); | ||||||||||||||||||
| List<ProducerBatch> partitionInFlightBatches = entry.getValue(); | ||||||||||||||||||
| if (partitionInFlightBatches != null) { | ||||||||||||||||||
| Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator(); | ||||||||||||||||||
| while (iter.hasNext()) { | ||||||||||||||||||
| ProducerBatch batch = iter.next(); | ||||||||||||||||||
| if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) { | ||||||||||||||||||
| iter.remove(); | ||||||||||||||||||
| // expireBatches is called in Sender.sendProducerData, before client.poll. | ||||||||||||||||||
| // The !batch.isDone() invariant should always hold. An IllegalStateException | ||||||||||||||||||
| // exception will be thrown if the invariant is violated. | ||||||||||||||||||
| if (!batch.isDone()) { | ||||||||||||||||||
| expiredBatches.add(batch); | ||||||||||||||||||
| } else { | ||||||||||||||||||
| throw new IllegalStateException(batch.topicPartition + " batch created at " + | ||||||||||||||||||
| batch.createdMs + " gets unexpected final state " + batch.finalState()); | ||||||||||||||||||
| } | ||||||||||||||||||
| } else { | ||||||||||||||||||
| accumulator.maybeUpdateNextBatchExpiryTime(batch); | ||||||||||||||||||
| break; | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| if (partitionInFlightBatches.isEmpty()) { | ||||||||||||||||||
| batchIt.remove(); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| return expiredBatches; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private void addToInflightBatches(List<ProducerBatch> batches) { | ||||||||||||||||||
| for (ProducerBatch batch : batches) { | ||||||||||||||||||
| List<ProducerBatch> inflightBatchList = inFlightBatches.computeIfAbsent(batch.topicPartition, | ||||||||||||||||||
|
|
@@ -355,7 +318,8 @@ private boolean shouldHandleAuthorizationError(RuntimeException exception) { | |||||||||||||||||
| return false; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| private long sendProducerData(long now) { | ||||||||||||||||||
| // Visible for testing | ||||||||||||||||||
| protected long sendProducerData(long now) { | ||||||||||||||||||
|
Comment on lines
+321
to
+322
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Making this method
Suggested change
Comment on lines
+321
to
+322
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sender Method VisibilityChanging method visibility from private to protected increases coupling by exposing internal implementation details. While the change enables testing, it creates a maintenance burden as this method now becomes part of the protected API that must be maintained for compatibility. Standards
|
||||||||||||||||||
| MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot(); | ||||||||||||||||||
|
Comment on lines
+321
to
323
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Protected Method VisibilityChanging method visibility from private to protected solely for testing purposes exposes internal implementation details. This creates tighter coupling between test and implementation, making future refactoring more difficult as tests now depend on implementation details. Standards
|
||||||||||||||||||
| // get the list of partitions with data ready to send | ||||||||||||||||||
| RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadataSnapshot, now); | ||||||||||||||||||
|
|
@@ -404,9 +368,7 @@ private long sendProducerData(long now) { | |||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| accumulator.resetNextBatchExpiryTime(); | ||||||||||||||||||
| List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now); | ||||||||||||||||||
| List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now); | ||||||||||||||||||
| expiredBatches.addAll(expiredInflightBatches); | ||||||||||||||||||
|
|
||||||||||||||||||
|
Comment on lines
371
to
372
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing
Suggested change
Comment on lines
371
to
372
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Buffer Corruption VulnerabilityThe code was incorrectly handling expired batches by combining those from accumulator with in-flight batches. This could lead to buffer corruption as in-flight batches might still be in use when their buffers are returned to the pool and reused. Standards
|
||||||||||||||||||
| // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics | ||||||||||||||||||
| // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why | ||||||||||||||||||
|
|
||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test Class Complexity
The EvilKafkaProducerBuilder test class directly constructs complex KafkaProducer internals, creating tight coupling to implementation details. This approach makes tests brittle to internal changes and increases maintenance burden when producer implementation evolves.
Standards