Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;

import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.compress.NoCompression;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;


public class ProducerIntegrationTest {

@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster) throws InterruptedException,
ExecutionException {
String topic = "test-topic";
cluster.createTopic("test-topic", 1, (short) 1);
try (var producer = expireProducer(cluster)) {
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())).get();
}
try (var consumer = cluster.consumer()) {
consumer.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
}
Comment on lines +57 to +70
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Test does not actually assert that the batch is “not corrupted”

testInFlightBatchShouldNotBeCorrupted only checks that consumer.poll(...).count() == 1. If a bug caused the batch contents to be zeroed or otherwise mangled while still producing a logically valid record, this test would still pass as long as one record shows up.

To make the test match its intent, consider also asserting that the key and value match what was produced, for example:

-        try (var consumer = cluster.consumer()) {
-            consumer.subscribe(List.of(topic));
-            TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
-        }
+        try (var consumer = cluster.consumer()) {
+            consumer.subscribe(List.of(topic));
+            TestUtils.waitForCondition(() -> {
+                var records = consumer.poll(Duration.ofSeconds(1));
+                if (records.count() != 1) {
+                    return false;
+                }
+                var record = records.iterator().next();
+                return Arrays.equals("key".getBytes(), record.key())
+                    && Arrays.equals("value".getBytes(), record.value());
+            }, 5000, "failed to poll correct data");
+        }

This ensures the test will fail if the in-flight buffer gets reused or corrupted but still yields a single record.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster) throws InterruptedException,
ExecutionException {
String topic = "test-topic";
cluster.createTopic("test-topic", 1, (short) 1);
try (var producer = expireProducer(cluster)) {
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())).get();
}
try (var consumer = cluster.consumer()) {
consumer.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
}
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster) throws InterruptedException,
ExecutionException {
String topic = "test-topic";
cluster.createTopic("test-topic", 1, (short) 1);
try (var producer = expireProducer(cluster)) {
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())).get();
}
try (var consumer = cluster.consumer()) {
consumer.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> {
var records = consumer.poll(Duration.ofSeconds(1));
if (records.count() != 1) {
return false;
}
var record = records.iterator().next();
return Arrays.equals("key".getBytes(), record.key())
&& Arrays.equals("value".getBytes(), record.value());
}, 5000, "failed to poll correct data");
}
🤖 Prompt for AI Agents
In
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
around lines 57 to 70, the test only asserts that one record was polled but
doesn't verify the record's key/value contents; update the test to fetch the
polled records, assert exactly one ConsumerRecord is returned, and assert the
record's key and value equal the produced bytes (or their expected String
representations) so the test fails if the in-flight batch data is zeroed or
corrupted.


}


@SuppressWarnings({"unchecked", "this-escape"})
private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) {
Map<String, Object> config = Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false,
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000,
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500
);
return new EvilKafkaProducerBuilder().build(config);
}

static class EvilKafkaProducerBuilder {

Serializer<byte[]> serializer = new ByteArraySerializer();
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = new LogContext("[expire Producer test ]");
Metrics metrics = new Metrics(Time.SYSTEM);

String clientId;
String transactionalId;
ProducerConfig config;
ProducerMetadata metadata;
RecordAccumulator accumulator;
Partitioner partitioner;
Sender sender;
ProducerInterceptors<String, String> interceptors;

@SuppressWarnings({"unchecked", "this-escape"})
Producer<byte[], byte[]> build(Map<String, Object> configs) {
this.config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
return new KafkaProducer<>(
config,
logContext,
metrics,
serializer,
serializer,
buildMetadata(),
buildAccumulator(),
null,
buildSender(),
buildInterceptors(),
buildPartition(),
Time.SYSTEM,
ioThread(),
Optional.empty()
);
}
Comment on lines +88 to +125
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brittle Test Implementation Due to Tight Coupling

The EvilKafkaProducerBuilder class manually reconstructs a KafkaProducer by duplicating complex initialization logic. This approach is tightly coupled to the producer's internal implementation, making the test extremely brittle. Any future refactoring of KafkaProducer's constructor or its internal components will likely break this test, increasing maintenance overhead.

    @SuppressWarnings({"unchecked", "this-escape"})
    private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) {
        Map<String, Object> config = Map.of(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
            ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false,
            ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000,
            ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500
        );
        return EvilKafkaProducer.create(config);
    }

    static class EvilKafkaProducer {
        @SuppressWarnings({"unchecked", "this-escape"})
        public static Producer<byte[], byte[]> create(Map<String, Object> configs) {
            ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
            LogContext logContext = new LogContext("[expire Producer test ]");
            Time time = Time.SYSTEM;
            Metrics metrics = new Metrics(time);
            Serializer<byte[]> serializer = new ByteArraySerializer();
            ApiVersions apiVersions = new ApiVersions();

            ProducerMetadata metadata = buildMetadata(config, clientId, logContext, metrics, serializer);
            RecordAccumulator accumulator = buildAccumulator(config, clientId, logContext, metrics, time);
            Sender sender = buildSender(config, logContext, metrics, apiVersions, metadata, accumulator);
            ProducerInterceptors<String, String> interceptors = new ProducerInterceptors<>(List.of(), metrics);
            Partitioner partitioner = buildPartitioner(config, clientId, metrics);

            KafkaThread ioThread = new KafkaThread("test_io_thread", sender, true);
            ioThread.start();

            return new KafkaProducer<>(config, logContext, metrics, serializer, serializer, metadata, accumulator, null, sender, interceptors, partitioner, time, ioThread, Optional.empty());
        }

        private static Partitioner buildPartitioner(ProducerConfig config, String clientId, Metrics metrics) {
            Partitioner partitioner = config.getConfiguredInstance(
                ProducerConfig.PARTITIONER_CLASS_CONFIG,
                Partitioner.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
            return partitioner;
        }

        private static Sender buildSender(ProducerConfig config, LogContext logContext, Metrics metrics, ApiVersions apiVersions, ProducerMetadata metadata, RecordAccumulator accumulator) {
            int maxInflightRequests = config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
            int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            ProducerMetrics metricsRegistry = new ProducerMetrics(metrics);
            Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
            KafkaClient client = ClientUtils.createNetworkClient(config, metrics, "producer", logContext, apiVersions, Time.SYSTEM, maxInflightRequests, metadata, throttleTimeSensor, null);

            short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
            return new Sender(logContext, client, metadata, accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, config.getInt(ProducerConfig.RETRIES_CONFIG), metricsRegistry.senderMetrics, Time.SYSTEM, requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), null) {
                @Override
                protected long sendProducerData(long now) {
                    long result = super.sendProducerData(now);
                    try {
                        //  Ensure the batch expires.
                        Thread.sleep(500);
                        return result;
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }

        private static RecordAccumulator buildAccumulator(ProducerConfig config, String clientId, LogContext logContext, Metrics metrics, Time time) {
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
            int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
            Plugin<Partitioner> partitionerPlugin = Plugin.wrapInstance(config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), metrics, ProducerConfig.PARTitioner_CLASS_CONFIG);
            boolean enableAdaptivePartitioning = partitionerPlugin.get() == null && config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
            return new RecordAccumulator(logContext, batchSize, NoCompression.NONE, (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE), retryBackoffMs, retryBackoffMaxMs, config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), new RecordAccumulator.PartitionerConfig(enableAdaptivePartitioning, config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)), metrics, "producer-metrics", time, null, new EvilBufferPool(config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG), batchSize, metrics, time, "producer-metrics"));
        }

        private static ProducerMetadata buildMetadata(ProducerConfig config, String clientId, LogContext logContext, Metrics metrics, Serializer<byte[]> serializer) {
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
            List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
            ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(List.of(), reporters, List.of(Plugin.wrapInstance(serializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).get(), Plugin.wrapInstance(serializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).get()));
            ProducerMetadata metadata = new ProducerMetadata(retryBackoffMs, retryBackoffMaxMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG), logContext, clusterResourceListeners, Time.SYSTEM);
            metadata.bootstrap(ClientUtils.parseAndValidateAddresses(config));
            return metadata;
        }
    }
Commitable Suggestion
Suggested change
static class EvilKafkaProducerBuilder {
Serializer<byte[]> serializer = new ByteArraySerializer();
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = new LogContext("[expire Producer test ]");
Metrics metrics = new Metrics(Time.SYSTEM);
String clientId;
String transactionalId;
ProducerConfig config;
ProducerMetadata metadata;
RecordAccumulator accumulator;
Partitioner partitioner;
Sender sender;
ProducerInterceptors<String, String> interceptors;
@SuppressWarnings({"unchecked", "this-escape"})
Producer<byte[], byte[]> build(Map<String, Object> configs) {
this.config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
return new KafkaProducer<>(
config,
logContext,
metrics,
serializer,
serializer,
buildMetadata(),
buildAccumulator(),
null,
buildSender(),
buildInterceptors(),
buildPartition(),
Time.SYSTEM,
ioThread(),
Optional.empty()
);
}
@SuppressWarnings({"unchecked", "this-escape"})
private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) {
Map<String, Object> config = Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false,
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000,
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500
);
return EvilKafkaProducer.create(config);
}
static class EvilKafkaProducer {
@SuppressWarnings({"unchecked", "this-escape"})
public static Producer<byte[], byte[]> create(Map<String, Object> configs) {
ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext = new LogContext("[expire Producer test ]");
Time time = Time.SYSTEM;
Metrics metrics = new Metrics(time);
Serializer<byte[]> serializer = new ByteArraySerializer();
ApiVersions apiVersions = new ApiVersions();
ProducerMetadata metadata = buildMetadata(config, clientId, logContext, metrics, serializer);
RecordAccumulator accumulator = buildAccumulator(config, clientId, logContext, metrics, time);
Sender sender = buildSender(config, logContext, metrics, apiVersions, metadata, accumulator);
ProducerInterceptors<String, String> interceptors = new ProducerInterceptors<>(List.of(), metrics);
Partitioner partitioner = buildPartitioner(config, clientId, metrics);
KafkaThread ioThread = new KafkaThread("test_io_thread", sender, true);
ioThread.start();
return new KafkaProducer<>(config, logContext, metrics, serializer, serializer, metadata, accumulator, null, sender, interceptors, partitioner, time, ioThread, Optional.empty());
}
private static Partitioner buildPartitioner(ProducerConfig config, String clientId, Metrics metrics) {
Partitioner partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
return partitioner;
}
private static Sender buildSender(ProducerConfig config, LogContext logContext, Metrics metrics, ApiVersions apiVersions, ProducerMetadata metadata, RecordAccumulator accumulator) {
int maxInflightRequests = config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ProducerMetrics metricsRegistry = new ProducerMetrics(metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = ClientUtils.createNetworkClient(config, metrics, "producer", logContext, apiVersions, Time.SYSTEM, maxInflightRequests, metadata, throttleTimeSensor, null);
short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
return new Sender(logContext, client, metadata, accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, config.getInt(ProducerConfig.RETRIES_CONFIG), metricsRegistry.senderMetrics, Time.SYSTEM, requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), null) {
@Override
protected long sendProducerData(long now) {
long result = super.sendProducerData(now);
try {
// Ensure the batch expires.
Thread.sleep(500);
return result;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
}
private static RecordAccumulator buildAccumulator(ProducerConfig config, String clientId, LogContext logContext, Metrics metrics, Time time) {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
Plugin<Partitioner> partitionerPlugin = Plugin.wrapInstance(config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), metrics, ProducerConfig.PARTitioner_CLASS_CONFIG);
boolean enableAdaptivePartitioning = partitionerPlugin.get() == null && config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
return new RecordAccumulator(logContext, batchSize, NoCompression.NONE, (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE), retryBackoffMs, retryBackoffMaxMs, config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), new RecordAccumulator.PartitionerConfig(enableAdaptivePartitioning, config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)), metrics, "producer-metrics", time, null, new EvilBufferPool(config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG), batchSize, metrics, time, "producer-metrics"));
}
private static ProducerMetadata buildMetadata(ProducerConfig config, String clientId, LogContext logContext, Metrics metrics, Serializer<byte[]> serializer) {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(List.of(), reporters, List.of(Plugin.wrapInstance(serializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).get(), Plugin.wrapInstance(serializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).get()));
ProducerMetadata metadata = new ProducerMetadata(retryBackoffMs, retryBackoffMaxMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG), logContext, clusterResourceListeners, Time.SYSTEM);
metadata.bootstrap(ClientUtils.parseAndValidateAddresses(config));
return metadata;
}
}
Standards
  • Maintainability-Quality-Coupling
  • Clean-Code-DRY



private ProducerInterceptors buildInterceptors() {
this.interceptors = new ProducerInterceptors<>(List.of(), metrics);
return this.interceptors;
}

private Partitioner buildPartition() {
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
return this.partitioner;
}

private Sender buildSender() {
int maxInflightRequests = config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = ClientUtils.createNetworkClient(config,
this.metrics,
"producer",
logContext,
apiVersions,
Time.SYSTEM,
maxInflightRequests,
metadata,
throttleTimeSensor,
null);

short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
this.sender = new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
config.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
Time.SYSTEM,
requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
null) {
@Override
protected long sendProducerData(long now) {
long result = super.sendProducerData(now);
try {
// Ensure the batch expires.
Thread.sleep(500);
return result;
Comment on lines +175 to +177
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timing-Dependent Test Logic Can Cause Flakiness

The test uses Thread.sleep() to force a batch to expire, which makes the test dependent on timing. This can lead to flakiness, where the test might fail on slower machines or under heavy load. Consider using a controllable Time implementation (e.g., MockTime) to deterministically simulate the passage of time for more reliable testing.

Standards
  • Maintainability-Quality-Testability

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Comment on lines +178 to +180

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's generally better practice to handle InterruptedException by re-interrupting the current thread. While throwing a RuntimeException will cause the test thread to terminate, restoring the interrupted status is a cleaner approach that allows higher-level interrupt handling logic to function as expected.

                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }

}
};
return this.sender;
}

private RecordAccumulator buildAccumulator() {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
Plugin<Partitioner> partitionerPlugin = Plugin.wrapInstance(
config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),
metrics,
ProducerConfig.PARTITIONER_CLASS_CONFIG);
boolean enableAdaptivePartitioning = partitionerPlugin.get() == null &&
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
this.accumulator = new RecordAccumulator(logContext,
batchSize,
NoCompression.NONE,
(int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE),
retryBackoffMs,
retryBackoffMaxMs,
config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG),
new RecordAccumulator.PartitionerConfig(
enableAdaptivePartitioning,
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
),
metrics,
"producer-metrics",
Time.SYSTEM,
null,
new EvilBufferPool(config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG), batchSize, metrics,
Time.SYSTEM, "producer-metrics"));
return accumulator;
}

private ProducerMetadata buildMetadata() {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
List.of(),
reporters,
List.of(
Plugin.wrapInstance(serializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).get(),
Plugin.wrapInstance(serializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).get()));
this.metadata = new ProducerMetadata(retryBackoffMs,
retryBackoffMaxMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
metadata.bootstrap(ClientUtils.parseAndValidateAddresses(config));
return metadata;
}

private KafkaThread ioThread() {
KafkaThread ioThread = new KafkaThread("test_io_thread", sender, true);
ioThread.start();
return ioThread;
}
}

static class EvilBufferPool extends BufferPool {

public EvilBufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
super(memory, poolableSize, metrics, time, metricGrpName);
}

/**
* Override deallocate to intentionally corrupt the ByteBuffer being returned to the pool.
* This is used to simulate a scenario where an in-flight buffer is mistakenly reused
* and its contents are unexpectedly modified, helping expose buffer reuse bugs.
*/
@Override
public void deallocate(ByteBuffer buffer, int size) {
// Ensure atomicity using reentrant behavior
lock.lock();
try {
Arrays.fill(buffer.array(), (byte) 0);
super.deallocate(buffer, size);
} finally {
lock.unlock();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class BufferPool {

private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
protected final ReentrantLock lock;
private final Deque<ByteBuffer> free;
private final Deque<Condition> waiters;
/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,43 +172,6 @@ private void maybeRemoveAndDeallocateBatch(ProducerBatch batch) {
this.accumulator.deallocate(batch);
}

/**
* Get the in-flight batches that has reached delivery timeout.
*/
private List<ProducerBatch> getExpiredInflightBatches(long now) {
List<ProducerBatch> expiredBatches = new ArrayList<>();

for (Iterator<Map.Entry<TopicPartition, List<ProducerBatch>>> batchIt = inFlightBatches.entrySet().iterator(); batchIt.hasNext();) {
Map.Entry<TopicPartition, List<ProducerBatch>> entry = batchIt.next();
List<ProducerBatch> partitionInFlightBatches = entry.getValue();
if (partitionInFlightBatches != null) {
Iterator<ProducerBatch> iter = partitionInFlightBatches.iterator();
while (iter.hasNext()) {
ProducerBatch batch = iter.next();
if (batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now)) {
iter.remove();
// expireBatches is called in Sender.sendProducerData, before client.poll.
// The !batch.isDone() invariant should always hold. An IllegalStateException
// exception will be thrown if the invariant is violated.
if (!batch.isDone()) {
expiredBatches.add(batch);
} else {
throw new IllegalStateException(batch.topicPartition + " batch created at " +
batch.createdMs + " gets unexpected final state " + batch.finalState());
}
} else {
accumulator.maybeUpdateNextBatchExpiryTime(batch);
break;
}
}
if (partitionInFlightBatches.isEmpty()) {
batchIt.remove();
}
}
}
return expiredBatches;
}

private void addToInflightBatches(List<ProducerBatch> batches) {
for (ProducerBatch batch : batches) {
List<ProducerBatch> inflightBatchList = inFlightBatches.computeIfAbsent(batch.topicPartition,
Expand Down Expand Up @@ -355,7 +318,8 @@ private boolean shouldHandleAuthorizationError(RuntimeException exception) {
return false;
}

private long sendProducerData(long now) {
// Visible for testing
protected long sendProducerData(long now) {
MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadataSnapshot, now);
Expand Down Expand Up @@ -404,9 +368,7 @@ private long sendProducerData(long now) {
}

accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
Expand Down
Loading