Skip to content

Conversation

@visz11
Copy link
Collaborator

@visz11 visz11 commented Nov 21, 2025

User description

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.


CodeAnt-AI Description

Ensure producer handles expired in-flight batches without corruption

What Changed

  • Added an integration test that delays the sender so an in-flight batch expires and then verifies a consumer still receives the record
  • Built a custom producer setup that corrupts returned buffers and exposes sender timing to stress test buffer reuse during batch expiration

Impact

✅ Prevents corrupted records when delivery timeout expires
✅ Confirms consumers see data despite batch expiry
✅ Validates buffer reuse safety during forced expiration

💡 Usage Guide

Checking Your Pull Request

Every time you make a pull request, our system automatically looks through it. We check for security issues, mistakes in how you're setting up your infrastructure, and common code problems. We do this to make sure your changes are solid and won't cause any trouble later.

Talking to CodeAnt AI

Got a question or need a hand with something in your pull request? You can easily get in touch with CodeAnt AI right here. Just type the following in a comment on your pull request, and replace "Your question here" with whatever you want to ask:

@codeant-ai ask: Your question here

This lets you have a chat with CodeAnt AI about your pull request, making it easier to understand and improve your code.

Example

@codeant-ai ask: Can you suggest a safer alternative to storing this secret?

Preserve Org Learnings with CodeAnt

You can record team preferences so CodeAnt AI applies them in future reviews. Reply directly to the specific CodeAnt AI suggestion (in the same thread) and replace "Your feedback here" with your input:

@codeant-ai: Your feedback here

This helps CodeAnt AI learn and adapt to your team's coding style and standards.

Example

@codeant-ai: Do not flag unused imports.

Retrigger review

Ask CodeAnt AI to review the PR again, by typing:

@codeant-ai: review

Check Your Repository Health

To analyze the health of your code repository, visit our dashboard at https://app.codeant.ai. This tool helps you identify potential issues and areas for improvement in your codebase, ensuring your repository maintains high standards of code health.

Summary by CodeRabbit

  • Tests

    • Added comprehensive integration test to validate producer resilience against buffer corruption scenarios during in-flight batch processing.
  • Bug Fixes

    • Improved producer's handling of expired in-flight batches to prevent data loss and ensure batch integrity during high-throughput scenarios.

✏️ Tip: You can customize this high-level summary in your review settings.

@codeant-ai
Copy link

codeant-ai bot commented Nov 21, 2025

CodeAnt AI is reviewing your PR.


Thanks for using CodeAnt! 🎉

We're free for open-source projects. if you're enjoying it, help us grow by sharing.

Share on X ·
Reddit ·
LinkedIn

@visz11
Copy link
Collaborator Author

visz11 commented Nov 21, 2025

@refacto-visz

@gemini-code-assist
Copy link

Summary of Changes

Hello @visz11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a new integration test for the Kafka Producer aimed at preventing data corruption issues that could arise from the reuse of ByteBuffer instances for in-flight batches. The test suite includes custom BufferPool and Sender implementations that simulate conditions where deallocated buffers are corrupted and batches are forced to expire. This setup ensures that the producer's internal mechanisms correctly handle such edge cases, preventing data integrity problems. Additionally, the PR refactors the batch expiration logic in the Sender by removing a dedicated method for in-flight batch expiration, streamlining the process.

Highlights

  • New Integration Test: Introduced ProducerIntegrationTest to validate the Kafka Producer's resilience against data corruption during batch expiration and buffer reuse scenarios.
  • Simulated Buffer Corruption: Added EvilBufferPool which intentionally corrupts ByteBuffer contents upon deallocation. This simulates a scenario where an in-flight buffer is mistakenly reused and its contents are unexpectedly modified, helping expose buffer reuse bugs.
  • Forced Batch Expiration: Modified the Sender in EvilKafkaProducerBuilder to introduce a Thread.sleep(500) delay within sendProducerData. This ensures that in-flight batches expire, triggering the specific test scenario.
  • Refactored Batch Expiration Logic: Removed the getExpiredInflightBatches method from Sender.java and its usage within sendProducerData, indicating a potential centralization or simplification of how expired in-flight batches are handled, likely within the RecordAccumulator.
  • Increased Visibility for Testing: Changed the lock field in BufferPool.java and the sendProducerData method in Sender.java to protected. This allows custom testing implementations, like EvilBufferPool and the custom Sender, to extend and override behavior for specific test cases.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link

coderabbitai bot commented Nov 21, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

A new integration test is added to verify in-flight batch integrity during producer operations. Concurrently, producer internals are modified to improve testability: the BufferPool's lock is exposed for subclass access, the Sender's sendProducerData method is made protected for testing, and in-flight batch expiry detection logic is consolidated to use the accumulator's existing expiration handling.

Changes

Cohort / File(s) Summary
Integration Test
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
New integration test class with a single test method testInFlightBatchShouldNotBeCorrupted that validates producer behavior under buffer corruption scenarios. Includes internal helper classes EvilKafkaProducerBuilder and EvilBufferPool for constructing a customized producer and simulating buffer reuse conditions.
Producer Internals
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java, clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
BufferPool's lock field visibility changed from private to protected to permit subclass access. Sender's sendProducerData method changed from private to protected for testability. Sender's in-flight batch expiry detection logic removed and replaced with existing accumulator.expiredBatches(now) call, consolidating expiration handling.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

  • The new integration test file requires understanding of the custom builder pattern, buffer pool override mechanics, and how the test validates producer behavior under corruption scenarios
  • Review the Sender logic change to ensure the expiration consolidation maintains equivalent behavior and doesn't introduce race conditions
  • Verify that BufferPool's protected lock and Sender's protected method don't break encapsulation or create unintended coupling for subclasses

Poem

🐰 A test is born to catch the sneaky bit,
When buffers dance, we'll know which ones don't fit.
Lock and method, now exposed with care,
Expiration simplified, no burden to bear!

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.75% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Description check ⚠️ Warning The pull request description is entirely composed of the template placeholder text and contains no actual description of changes, objectives, or testing strategy. Replace the template placeholder with a detailed description of the changes made, explain the rationale for modifying BufferPool.lock visibility and Sender.sendProducerData, describe the new integration test, and document the testing strategy.
Title check ❓ Inconclusive The title 'Clone expire batch' is vague and does not clearly convey the main purpose of the changes, which involve handling expired in-flight batches with a new integration test. Use a more descriptive title such as 'Add integration test for expired in-flight batch handling' or 'Test producer handling of expired batches'.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch clone-expire_batch

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codeant-ai codeant-ai bot added the size:L This PR changes 100-499 lines, ignoring generated files label Nov 21, 2025
@refacto-visz
Copy link

refacto-visz bot commented Nov 21, 2025

fix: Prevent premature expiration of in-flight producer batches

TL;DR: Removes logic that prematurely expired in-flight producer batches, preventing a race condition that could corrupt data by deallocating and reusing a buffer before the broker finished reading it.


Refacto PR Summary

This change addresses a critical race condition in the Kafka producer Sender thread. Previously, the Sender could expire a batch based on delivery.timeout.ms even after it had been sent to the network layer (i.e., was "in-flight"). This allowed the batch's underlying ByteBuffer to be deallocated and returned to the BufferPool for reuse while the original network request was potentially still being processed by the broker, leading to data corruption.

The fix removes the getExpiredInflightBatches method from the Sender entirely. Now, only batches waiting in the RecordAccumulator are expired by the Sender's main loop. In-flight requests are correctly handled by the network client's own request timeout mechanism (request.timeout.ms), which ensures buffers are only deallocated after the request is fully completed. A new integration test, testInFlightBatchShouldNotBeCorrupted, has been added to explicitly trigger and verify the fix for this race condition.

Change Highlights

Click to expand
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: Removed the getExpiredInflightBatches method and its invocation. This is the core fix, preventing the Sender from expiring batches that have already been sent to the network client.
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java: Added a new integration test that reliably reproduces the data corruption bug by using a custom BufferPool that zeroes-out deallocated buffers, proving the fix is effective.
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java: Changed the visibility of the internal lock from private to protected to allow the new integration test to extend BufferPool and simulate the bug.

Sequence Diagram

sequenceDiagram
    participant App as Producer App
    participant Sender as Sender Thread
    participant Accumulator as RecordAccumulator
    participant BufferPool as BufferPool
    participant Broker as Kafka Broker

    Note over App, Broker: Buggy Flow (Before Fix)
    App->>Accumulator: send(record)
    Accumulator->>BufferPool: allocate(buffer)
    BufferPool-->>Accumulator: ByteBuffer
    Accumulator->>Accumulator: Fills buffer with data

    Sender->>Accumulator: drain()
    Accumulator-->>Sender: ProducerBatch (in-flight)
    Sender->>Broker: Sends batch data
    
    loop Sender Run Loop
        Note over Sender: delivery.timeout.ms expires
        Sender->>Sender: getExpiredInflightBatches()
        Sender->>Accumulator: deallocate(batch)
        Accumulator->>BufferPool: deallocate(buffer)
        Note over BufferPool: Buffer is now free for reuse
    end

    App->>Accumulator: send(new_record)
    Accumulator->>BufferPool: allocate(buffer)
    BufferPool-->>Accumulator: Reuses same ByteBuffer
    Accumulator->>Accumulator: Overwrites buffer with new data

    Note over Broker, Accumulator: RACE CONDITION: Broker reads buffer as it's being overwritten, causing data corruption.
Loading

Testing Guide

Click to expand
  1. Run the new integration test: Execute the testInFlightBatchShouldNotBeCorrupted test in ProducerIntegrationTest.java.
  2. Verify Test Pass: The test should pass. This confirms that a message sent with a producer configured to expire batches quickly is not corrupted, even when the underlying buffer pool is instrumented to zero-out deallocated buffers.
  3. Conceptual Verification (Old Behavior): Before this change, the test would have failed. The consumer would have received a corrupted record (e.g., filled with zeros) because the Sender would expire the in-flight batch, causing its buffer to be deallocated and overwritten by the EvilBufferPool before the broker could successfully read the data.

@refacto-visz
Copy link

refacto-visz bot commented Nov 21, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@codeant-ai
Copy link

codeant-ai bot commented Nov 21, 2025

CodeAnt AI finished reviewing your PR.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a potential data corruption issue by removing the logic that expires in-flight batches from the Sender. This change correctly prevents a race condition where a batch's buffer could be deallocated and reused while the original request is still being processed by the broker. The fix is validated by a new integration test, ProducerIntegrationTest, which is well-designed to simulate this scenario using an EvilBufferPool that corrupts buffers on deallocation. The related changes in Sender.java and BufferPool.java are minimal and appropriate for implementing the fix and enabling the test. Overall, the changes are sound and improve the producer's reliability.

Comment on lines +178 to +180
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It's generally better practice to handle InterruptedException by re-interrupting the current thread. While throwing a RuntimeException will cause the test thread to terminate, restoring the interrupted status is a cleaner approach that allows higher-level interrupt handling logic to function as expected.

                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }

@refacto-visz
Copy link

refacto-visz bot commented Nov 21, 2025

Code Review: Kafka Producer Internals & Integration Tests

👍 Well Done
Excellent Integration Test for Race Condition

The new integration test effectively simulates and validates the fix for a critical race condition involving buffer reuse and expiring in-flight batches, preventing future regressions.

Simplified and Corrected Expiration Logic

Removing getExpiredInflightBatches and centralizing timeout handling correctly simplifies the logic, eliminating a source of data corruption bugs and improving overall maintainability.

📁 Selected files for review (3)
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
🎯 Custom Instructions
✅ Applied Instructions
Organization Guidelines
  • Use feature flags for new functionality and include a clear rollback plan.
  • Very critical to consider apply pagination to queries
  • Follow the company security checklist:
    • No hard-coded secrets or credentials.
    • Validate all external inputs.
    • Use parameterized queries for DB access.

Scope: All files

📝 Additional Comments
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (1)
Weakened Encapsulation For Testing

The visibility of BufferPool.lock was changed from private to protected to enable critical testing of internal concurrency logic. While this is a pragmatic and acceptable trade-off for this specific test, it weakens encapsulation by exposing internal state, which can make the class harder to reason about and refactor in the future.

Standards:

  • Clean-Code-Encapsulation
  • Maintainability-Quality-Modifiability
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (1)
Weakened Method Encapsulation for Testing

Widening method visibility from private to protected enables test overrides but slightly breaks encapsulation. While a common and acceptable pattern for enabling critical tests, future changes could explore alternatives like dependency injection or package-private access to keep production code APIs tighter.

Standards:

  • ISO-IEC-25010-Maintainability-Modifiability

Comment on lines +88 to +125
static class EvilKafkaProducerBuilder {

Serializer<byte[]> serializer = new ByteArraySerializer();
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = new LogContext("[expire Producer test ]");
Metrics metrics = new Metrics(Time.SYSTEM);

String clientId;
String transactionalId;
ProducerConfig config;
ProducerMetadata metadata;
RecordAccumulator accumulator;
Partitioner partitioner;
Sender sender;
ProducerInterceptors<String, String> interceptors;

@SuppressWarnings({"unchecked", "this-escape"})
Producer<byte[], byte[]> build(Map<String, Object> configs) {
this.config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
return new KafkaProducer<>(
config,
logContext,
metrics,
serializer,
serializer,
buildMetadata(),
buildAccumulator(),
null,
buildSender(),
buildInterceptors(),
buildPartition(),
Time.SYSTEM,
ioThread(),
Optional.empty()
);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brittle Test Implementation Due to Tight Coupling

The EvilKafkaProducerBuilder class manually reconstructs a KafkaProducer by duplicating complex initialization logic. This approach is tightly coupled to the producer's internal implementation, making the test extremely brittle. Any future refactoring of KafkaProducer's constructor or its internal components will likely break this test, increasing maintenance overhead.

    @SuppressWarnings({"unchecked", "this-escape"})
    private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) {
        Map<String, Object> config = Map.of(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
            ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false,
            ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000,
            ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500
        );
        return EvilKafkaProducer.create(config);
    }

    static class EvilKafkaProducer {
        @SuppressWarnings({"unchecked", "this-escape"})
        public static Producer<byte[], byte[]> create(Map<String, Object> configs) {
            ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
            LogContext logContext = new LogContext("[expire Producer test ]");
            Time time = Time.SYSTEM;
            Metrics metrics = new Metrics(time);
            Serializer<byte[]> serializer = new ByteArraySerializer();
            ApiVersions apiVersions = new ApiVersions();

            ProducerMetadata metadata = buildMetadata(config, clientId, logContext, metrics, serializer);
            RecordAccumulator accumulator = buildAccumulator(config, clientId, logContext, metrics, time);
            Sender sender = buildSender(config, logContext, metrics, apiVersions, metadata, accumulator);
            ProducerInterceptors<String, String> interceptors = new ProducerInterceptors<>(List.of(), metrics);
            Partitioner partitioner = buildPartitioner(config, clientId, metrics);

            KafkaThread ioThread = new KafkaThread("test_io_thread", sender, true);
            ioThread.start();

            return new KafkaProducer<>(config, logContext, metrics, serializer, serializer, metadata, accumulator, null, sender, interceptors, partitioner, time, ioThread, Optional.empty());
        }

        private static Partitioner buildPartitioner(ProducerConfig config, String clientId, Metrics metrics) {
            Partitioner partitioner = config.getConfiguredInstance(
                ProducerConfig.PARTITIONER_CLASS_CONFIG,
                Partitioner.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
            return partitioner;
        }

        private static Sender buildSender(ProducerConfig config, LogContext logContext, Metrics metrics, ApiVersions apiVersions, ProducerMetadata metadata, RecordAccumulator accumulator) {
            int maxInflightRequests = config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
            int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            ProducerMetrics metricsRegistry = new ProducerMetrics(metrics);
            Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
            KafkaClient client = ClientUtils.createNetworkClient(config, metrics, "producer", logContext, apiVersions, Time.SYSTEM, maxInflightRequests, metadata, throttleTimeSensor, null);

            short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
            return new Sender(logContext, client, metadata, accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, config.getInt(ProducerConfig.RETRIES_CONFIG), metricsRegistry.senderMetrics, Time.SYSTEM, requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), null) {
                @Override
                protected long sendProducerData(long now) {
                    long result = super.sendProducerData(now);
                    try {
                        //  Ensure the batch expires.
                        Thread.sleep(500);
                        return result;
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }

        private static RecordAccumulator buildAccumulator(ProducerConfig config, String clientId, LogContext logContext, Metrics metrics, Time time) {
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
            int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
            Plugin<Partitioner> partitionerPlugin = Plugin.wrapInstance(config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), metrics, ProducerConfig.PARTitioner_CLASS_CONFIG);
            boolean enableAdaptivePartitioning = partitionerPlugin.get() == null && config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
            return new RecordAccumulator(logContext, batchSize, NoCompression.NONE, (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE), retryBackoffMs, retryBackoffMaxMs, config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), new RecordAccumulator.PartitionerConfig(enableAdaptivePartitioning, config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)), metrics, "producer-metrics", time, null, new EvilBufferPool(config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG), batchSize, metrics, time, "producer-metrics"));
        }

        private static ProducerMetadata buildMetadata(ProducerConfig config, String clientId, LogContext logContext, Metrics metrics, Serializer<byte[]> serializer) {
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
            List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
            ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(List.of(), reporters, List.of(Plugin.wrapInstance(serializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).get(), Plugin.wrapInstance(serializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).get()));
            ProducerMetadata metadata = new ProducerMetadata(retryBackoffMs, retryBackoffMaxMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG), logContext, clusterResourceListeners, Time.SYSTEM);
            metadata.bootstrap(ClientUtils.parseAndValidateAddresses(config));
            return metadata;
        }
    }
Commitable Suggestion
Suggested change
static class EvilKafkaProducerBuilder {
Serializer<byte[]> serializer = new ByteArraySerializer();
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = new LogContext("[expire Producer test ]");
Metrics metrics = new Metrics(Time.SYSTEM);
String clientId;
String transactionalId;
ProducerConfig config;
ProducerMetadata metadata;
RecordAccumulator accumulator;
Partitioner partitioner;
Sender sender;
ProducerInterceptors<String, String> interceptors;
@SuppressWarnings({"unchecked", "this-escape"})
Producer<byte[], byte[]> build(Map<String, Object> configs) {
this.config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
return new KafkaProducer<>(
config,
logContext,
metrics,
serializer,
serializer,
buildMetadata(),
buildAccumulator(),
null,
buildSender(),
buildInterceptors(),
buildPartition(),
Time.SYSTEM,
ioThread(),
Optional.empty()
);
}
@SuppressWarnings({"unchecked", "this-escape"})
private Producer<byte[], byte[]> expireProducer(ClusterInstance cluster) {
Map<String, Object> config = Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName(),
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(),
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false,
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 2000,
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1500
);
return EvilKafkaProducer.create(config);
}
static class EvilKafkaProducer {
@SuppressWarnings({"unchecked", "this-escape"})
public static Producer<byte[], byte[]> create(Map<String, Object> configs) {
ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext = new LogContext("[expire Producer test ]");
Time time = Time.SYSTEM;
Metrics metrics = new Metrics(time);
Serializer<byte[]> serializer = new ByteArraySerializer();
ApiVersions apiVersions = new ApiVersions();
ProducerMetadata metadata = buildMetadata(config, clientId, logContext, metrics, serializer);
RecordAccumulator accumulator = buildAccumulator(config, clientId, logContext, metrics, time);
Sender sender = buildSender(config, logContext, metrics, apiVersions, metadata, accumulator);
ProducerInterceptors<String, String> interceptors = new ProducerInterceptors<>(List.of(), metrics);
Partitioner partitioner = buildPartitioner(config, clientId, metrics);
KafkaThread ioThread = new KafkaThread("test_io_thread", sender, true);
ioThread.start();
return new KafkaProducer<>(config, logContext, metrics, serializer, serializer, metadata, accumulator, null, sender, interceptors, partitioner, time, ioThread, Optional.empty());
}
private static Partitioner buildPartitioner(ProducerConfig config, String clientId, Metrics metrics) {
Partitioner partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
return partitioner;
}
private static Sender buildSender(ProducerConfig config, LogContext logContext, Metrics metrics, ApiVersions apiVersions, ProducerMetadata metadata, RecordAccumulator accumulator) {
int maxInflightRequests = config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ProducerMetrics metricsRegistry = new ProducerMetrics(metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = ClientUtils.createNetworkClient(config, metrics, "producer", logContext, apiVersions, Time.SYSTEM, maxInflightRequests, metadata, throttleTimeSensor, null);
short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
return new Sender(logContext, client, metadata, accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, config.getInt(ProducerConfig.RETRIES_CONFIG), metricsRegistry.senderMetrics, Time.SYSTEM, requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), null) {
@Override
protected long sendProducerData(long now) {
long result = super.sendProducerData(now);
try {
// Ensure the batch expires.
Thread.sleep(500);
return result;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
}
private static RecordAccumulator buildAccumulator(ProducerConfig config, String clientId, LogContext logContext, Metrics metrics, Time time) {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
Plugin<Partitioner> partitionerPlugin = Plugin.wrapInstance(config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), metrics, ProducerConfig.PARTitioner_CLASS_CONFIG);
boolean enableAdaptivePartitioning = partitionerPlugin.get() == null && config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
return new RecordAccumulator(logContext, batchSize, NoCompression.NONE, (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE), retryBackoffMs, retryBackoffMaxMs, config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), new RecordAccumulator.PartitionerConfig(enableAdaptivePartitioning, config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)), metrics, "producer-metrics", time, null, new EvilBufferPool(config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG), batchSize, metrics, time, "producer-metrics"));
}
private static ProducerMetadata buildMetadata(ProducerConfig config, String clientId, LogContext logContext, Metrics metrics, Serializer<byte[]> serializer) {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(List.of(), reporters, List.of(Plugin.wrapInstance(serializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).get(), Plugin.wrapInstance(serializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).get()));
ProducerMetadata metadata = new ProducerMetadata(retryBackoffMs, retryBackoffMaxMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG), logContext, clusterResourceListeners, Time.SYSTEM);
metadata.bootstrap(ClientUtils.parseAndValidateAddresses(config));
return metadata;
}
}
Standards
  • Maintainability-Quality-Coupling
  • Clean-Code-DRY

Comment on lines +175 to +177
// Ensure the batch expires.
Thread.sleep(500);
return result;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timing-Dependent Test Logic Can Cause Flakiness

The test uses Thread.sleep() to force a batch to expire, which makes the test dependent on timing. This can lead to flakiness, where the test might fail on slower machines or under heavy load. Consider using a controllable Time implementation (e.g., MockTime) to deterministically simulate the passage of time for more reliable testing.

Standards
  • Maintainability-Quality-Testability

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (6)
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (1)

49-59: Clarify why lock is now protected to discourage non-test usage

Changing lock from private to protected is reasonable to support the test subclass, but it also exposes the core synchronization primitive to any future subclasses, which can make concurrency assumptions harder to maintain.

Consider adding a brief comment (e.g. “// protected for testing (EvilBufferPool)”) next to the field to signal that subclasses should not build new synchronization schemes around this lock in production code.

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java (5)

75-86: Builder wiring looks correct, but keep config and serializer setup minimal and intentional

The custom expireProducer/EvilKafkaProducerBuilder path mimics the core KafkaProducer wiring and appears consistent (config is wrapped via ProducerConfig.appendSerializerToConfig, serializers passed explicitly, metadata/accumulator/sender constructed in the expected order).

Given this is test-only scaffolding, the extra indirection is acceptable, but you might consider:

  • Documenting in a brief comment that this is intentionally mirroring the production KafkaProducer constructor signature.
  • Keeping the initial config map limited to only the knobs necessary to exercise the expiry behavior (which is already mostly true).

Not a correctness issue, just about keeping the test fixture easy to reason about.


128-139: Tighten ProducerInterceptors generics for clarity

ProducerInterceptors is used as a raw type in buildInterceptors, while the field is declared as ProducerInterceptors<String, String> and the producer itself is parameterized as <byte[], byte[]>. This works because the interceptor list is empty, but it introduces unnecessary raw-type/unchecked warnings.

You could make this more explicit and type-safe:

-    ProducerInterceptors<String, String> interceptors;
+    ProducerInterceptors<byte[], byte[]> interceptors;
...
-    private ProducerInterceptors buildInterceptors() {
-        this.interceptors = new ProducerInterceptors<>(List.of(), metrics);
+    private ProducerInterceptors<byte[], byte[]> buildInterceptors() {
+        this.interceptors = new ProducerInterceptors<>(List.<ProducerInterceptor<byte[], byte[]>>of(), metrics);
         return this.interceptors;
     }

This keeps the fixture aligned with the actual key/value types of the producer.


141-183: sendProducerData override: consider preserving interrupt status instead of throwing

Overriding sendProducerData to inject a Thread.sleep(500) is a reasonable way to exercise expiry behavior in tests, and calling super.sendProducerData(now) first preserves base semantics.

One small robustness concern: if the I/O thread is ever interrupted during shutdown, the overridden method converts InterruptedException into a RuntimeException, which will be logged as an “uncaught error” before the loop exits. For a test-only Sender subclass this is not fatal, but you could avoid the spurious error and keep interrupt semantics clearer by preserving the interrupt flag instead of throwing:

-                protected long sendProducerData(long now) {
-                    long result = super.sendProducerData(now);
-                    try {
-                        //  Ensure the batch expires.
-                        Thread.sleep(500);
-                        return result;
-                    } catch (InterruptedException e) {
-                        throw new RuntimeException(e);
-                    }
-                }
+                protected long sendProducerData(long now) {
+                    long result = super.sendProducerData(now);
+                    try {
+                        // Ensure enough time passes to exercise delivery-timeout / expiry.
+                        Thread.sleep(500);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
+                    return result;
+                }

This keeps the test behavior while avoiding unnecessary error logs on normal shutdown.


186-217: Assumption in adaptive partitioning flag logic

In buildAccumulator, enableAdaptivePartitioning is set only when partitionerPlugin.get() == null:

boolean enableAdaptivePartitioning = partitionerPlugin.get() == null &&
    config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);

Given this builder always constructs a concrete partitioner instance, partitionerPlugin.get() will typically be non-null, so adaptive partitioning will never be enabled regardless of config. That’s fine if this test does not care about adaptive partitioning at all (which seems to be the case), but the condition may surprise future readers who expect the flag to reflect the config.

If the intent is “test fixture, adaptive partitioning off”, consider simplifying to a hard false or a comment explaining that this flag is deliberately disabled here.


247-269: EvilBufferPool deallocate: redundant locking and reliance on array-backed buffers

The corruption hook is clever, but there are two details worth tightening:

  1. Redundant locking:
    EvilBufferPool.deallocate acquires lock and then calls super.deallocate, which itself acquires the same ReentrantLock. This is safe but slightly surprising and makes the test more tightly coupled to the current ReentrantLock-based implementation.

    Since super.deallocate already synchronizes on lock, you can safely do the corruption before calling it and drop the explicit locking here:

  •    @Override
    
  •    public void deallocate(ByteBuffer buffer, int size) {
    
  •        //  Ensure atomicity using reentrant behavior
    
  •        lock.lock();
    
  •        try {
    
  •            Arrays.fill(buffer.array(), (byte) 0);
    
  •            super.deallocate(buffer, size);
    
  •        } finally {
    
  •            lock.unlock();
    
  •        }
    
  •    }
    
  •    @Override
    
  •    public void deallocate(ByteBuffer buffer, int size) {
    
  •        // Intentionally corrupt the buffer before it is returned to the pool.
    
  •        Arrays.fill(buffer.array(), (byte) 0);
    
  •        super.deallocate(buffer, size);
    
  •    }
    
    
    This still guarantees other threads only ever see the fully-corrupted buffer once it is put back into the pool.
    
    
  1. Array-backed buffer assumption:
    Arrays.fill(buffer.array(), ...) assumes that all buffers passed to deallocate are heap-backed. That holds today because BufferPool.allocateByteBuffer uses ByteBuffer.allocate, but if BufferPool ever switches to allocateDirect (or a caller passes in a non-array-backed buffer), this test will start failing with UnsupportedOperationException.

    A brief comment noting that this relies on BufferPool’s current heap-buffer behavior (and is intended as a white-box test) would make the assumption explicit.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and eb32f14.

📒 Files selected for processing (3)
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java (1 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (1 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java (3)
clients/src/main/java/org/apache/kafka/clients/ClientUtils.java (1)
  • ClientUtils (50-278)
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (1)
  • BufferPool (45-356)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
  • Plugin (40-132)
🔇 Additional comments (1)
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (1)

321-323: sendProducerData visibility relaxation looks appropriate for the test hook

Making sendProducerData(long now) protected with a “Visible for testing” comment cleanly enables the test subclass override without changing runtime behavior of existing callers. This is a narrow, controlled visibility increase and looks fine as-is.

Comment on lines +57 to +70
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster) throws InterruptedException,
ExecutionException {
String topic = "test-topic";
cluster.createTopic("test-topic", 1, (short) 1);
try (var producer = expireProducer(cluster)) {
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())).get();
}
try (var consumer = cluster.consumer()) {
consumer.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Test does not actually assert that the batch is “not corrupted”

testInFlightBatchShouldNotBeCorrupted only checks that consumer.poll(...).count() == 1. If a bug caused the batch contents to be zeroed or otherwise mangled while still producing a logically valid record, this test would still pass as long as one record shows up.

To make the test match its intent, consider also asserting that the key and value match what was produced, for example:

-        try (var consumer = cluster.consumer()) {
-            consumer.subscribe(List.of(topic));
-            TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
-        }
+        try (var consumer = cluster.consumer()) {
+            consumer.subscribe(List.of(topic));
+            TestUtils.waitForCondition(() -> {
+                var records = consumer.poll(Duration.ofSeconds(1));
+                if (records.count() != 1) {
+                    return false;
+                }
+                var record = records.iterator().next();
+                return Arrays.equals("key".getBytes(), record.key())
+                    && Arrays.equals("value".getBytes(), record.value());
+            }, 5000, "failed to poll correct data");
+        }

This ensures the test will fail if the in-flight buffer gets reused or corrupted but still yields a single record.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster) throws InterruptedException,
ExecutionException {
String topic = "test-topic";
cluster.createTopic("test-topic", 1, (short) 1);
try (var producer = expireProducer(cluster)) {
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())).get();
}
try (var consumer = cluster.consumer()) {
consumer.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, 5000, "failed to poll data");
}
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"),
})
public void testInFlightBatchShouldNotBeCorrupted(ClusterInstance cluster) throws InterruptedException,
ExecutionException {
String topic = "test-topic";
cluster.createTopic("test-topic", 1, (short) 1);
try (var producer = expireProducer(cluster)) {
producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())).get();
}
try (var consumer = cluster.consumer()) {
consumer.subscribe(List.of(topic));
TestUtils.waitForCondition(() -> {
var records = consumer.poll(Duration.ofSeconds(1));
if (records.count() != 1) {
return false;
}
var record = records.iterator().next();
return Arrays.equals("key".getBytes(), record.key())
&& Arrays.equals("value".getBytes(), record.value());
}, 5000, "failed to poll correct data");
}
🤖 Prompt for AI Agents
In
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
around lines 57 to 70, the test only asserts that one record was polled but
doesn't verify the record's key/value contents; update the test to fetch the
polled records, assert exactly one ConsumerRecord is returned, and assert the
record's key and value equal the produced bytes (or their expected String
representations) so the test fails if the in-flight batch data is zeroed or
corrupted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

clients producer size:L This PR changes 100-499 lines, ignoring generated files

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants