Skip to content

Conversation

@arvi18
Copy link

@arvi18 arvi18 commented Apr 21, 2025

issue see https://issues.apache.org/jira/browse/KAFKA-17862

🔍 Problem Summary

When an expired batch is still part of an in-flight request, we
prematurely release the ByteBuffer back to the BufferPool. This leads to
two critical issues:

  1. Expiration does not prevent the in-flight request from being sent.
  2. The expired batch’s ByteBuffer is deallocate to the pool too early.
    It may be re-allocated for another producer batch while still being
    referenced by the in-flight request, potentially causing data
    corruption.

We can tolerate Issue 1, but Issue 2 is critical — we cannot allow it to
happen.

Therefore, we remove the expiration handling of ProducerBatch before
send, and instead defer the ByteBuffer deallocation to the response
handling logic.

Summary by CodeRabbit

  • Tests

    • Added a new integration test to verify Kafka producer batch integrity under buffer corruption and batch expiration scenarios.
  • Refactor

    • Adjusted internal method visibility to support enhanced testing and maintainability.
    • Removed redundant logic for handling expired in-flight batches to streamline batch expiration processing.

@coderabbitai
Copy link

coderabbitai bot commented Apr 21, 2025

Walkthrough

A new integration test class, ProducerIntegrationTest, has been added to the Kafka clients integration tests. This class introduces a test to ensure that in-flight batches in the Kafka producer are not corrupted during expiration, using custom producer and buffer pool implementations to simulate and detect buffer corruption scenarios. Supporting changes include making the lock field in BufferPool protected and modifying the Sender class by removing the getExpiredInflightBatches method and making the sendProducerData method protected and visible for testing.

Changes

File(s) Change Summary
clients/clients-integration-tests/.../ProducerIntegrationTest.java Added a new public integration test class with custom producer and buffer pool to simulate and verify batch corruption scenarios.
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java Changed visibility of the lock field from private to protected.
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java Removed the private getExpiredInflightBatches method; made sendProducerData protected and visible for testing; updated logic.

Sequence Diagram(s)

sequenceDiagram
    participant Test as ProducerIntegrationTest
    participant Producer as EvilKafkaProducer
    participant Sender as CustomSender
    participant BufferPool as EvilBufferPool
    participant Kafka as KafkaCluster
    participant Consumer as KafkaConsumer

    Test->>Producer: Build producer with custom Sender and BufferPool
    Producer->>Sender: Send record
    Sender->>BufferPool: Allocate buffer
    Sender->>Sender: Sleep to force batch expiration
    BufferPool->>Sender: Deallocate (corrupt buffer)
    Sender->>Kafka: Send batch
    Kafka->>Consumer: Record available
    Test->>Consumer: Consume record
    Consumer-->>Test: Return record (verify integrity)
Loading

Poem

In the warren of code where the buffers hop,
A sneaky test checks if corruption will stop.
With evil pools and batches in flight,
It pokes and it prods in the Kafka night.
But the records arrive, unbroken, intact—
The rabbit approves, for the system’s on track!
🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@visz11
Copy link
Collaborator

visz11 commented Apr 21, 2025

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Apr 21, 2025

✅ Actions performed

Full review triggered.

@visz11
Copy link
Collaborator

visz11 commented Apr 22, 2025

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Apr 22, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (1)

51-53: Visibility relaxation of lock widens surface for concurrency misuse

Changing lock from private to protected gives every subclass full control over the internal locking mechanism.
While this enables the new EvilBufferPool test helper, it also:

  • Exposes a core synchronisation primitive to arbitrary external subclasses (the class is public), risking accidental dead‑locks or missed unlocks that violate the pool’s invariants.
  • Couples the subclass implementation to the exact locking strategy—​future refactors of BufferPool would become breaking changes.

Consider keeping lock private and offering a narrower protected utility instead, e.g.:

// inside BufferPool
protected <T> T withLock(Supplier<T> action) {
    lock.lock();
    try {
        return action.get();
    } finally {
        lock.unlock();
    }
}

Subclasses then wrap their critical section without needing direct access.

This keeps the contract stable while still unblocking tests.

clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (1)

321-323: Method made protected; please annotate and seal extension points

sendProducerData switched from private to protected to allow test overrides.
Two follow‑ups help avoid unintended use in production code:

  1. Annotate the method with @org.apache.kafka.common.annotation.InterfaceStability.Unstable or at least add a Javadoc note: “For test purposes only – not a public API”.
  2. Alternatively mark Sender itself final and expose a package‑private subclass used only in tests to avoid external extension of a critical component.

This limits the blast‑radius while still supporting the new integration tests.

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java (2)

171-178: Avoid hard‑coded Thread.sleep in timing‑sensitive tests

Injecting a real 500 ms sleep increases test runtime and introduces flakiness on congested CI hosts.

Replace with the controllable Time abstraction already used elsewhere:

-Thread.sleep(500);
+Time.SYSTEM.sleep(500); // or Better: pass in a MockTime and advance it

Even better, pass a Time instance into the producer builder and advance a MockTime to force expiry deterministically.


259-267: Double locking & potential UnsupportedOperationException in EvilBufferPool.deallocate

  1. lock.lock() / unlock() surrounds a call to super.deallocate, which also locks.
    Re‑entrancy works, but the extra acquire adds overhead and obscures the locking hierarchy.
  2. buffer.array() throws UnsupportedOperationException for direct or read‑only buffers.
    Today the pool only allocates heap buffers, but guarding makes the helper future‑proof.

Suggested simplification:

-lock.lock();
-try {
-    Arrays.fill(buffer.array(), (byte) 0);
-    super.deallocate(buffer, size);
-} finally {
-    lock.unlock();
-}
+if (buffer.hasArray() && !buffer.isReadOnly()) {
+    Arrays.fill(buffer.array(), (byte) 0);
+}
+super.deallocate(buffer, size);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and eb32f14.

📒 Files selected for processing (3)
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java (1 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (1 hunks)
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (1 hunks)

@visz11
Copy link
Collaborator

visz11 commented Apr 24, 2025

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request addresses a critical issue of potential data corruption in the Kafka producer when expired batches are prematurely released back to the buffer pool. The changes include removing expiration handling before send and deferring deallocation to the response handling logic, along with adding an integration test to verify batch integrity. Overall, the changes seem well-reasoned and the added test is a good way to prevent regressions. However, there are a few areas that could be improved for clarity and maintainability.

Summary of Findings

  • Visibility of sendProducerData: The change in visibility of the sendProducerData method from private to protected seems reasonable for testing purposes. However, it's important to ensure that this change doesn't inadvertently expose internal implementation details to subclasses or other parts of the codebase. Consider adding a comment explaining why the visibility was changed and how it should be used.
  • Redundant Batch Expiry Handling: The removal of getExpiredInflightBatches and the consolidation of expiry handling within accumulator.expiredBatches simplifies the logic and reduces the risk of inconsistencies. This is a positive change that improves the overall clarity of the code.
  • EvilBufferPool Corruption: The EvilBufferPool class intentionally corrupts the ByteBuffer during deallocation to simulate buffer reuse bugs. While this is a clever approach for testing, it's important to ensure that this class is only used in testing environments and doesn't accidentally make its way into production code.

Merge Readiness

The pull request addresses a critical issue and includes a new integration test, which is a significant improvement. However, before merging, it's important to address the high severity comment regarding the visibility of sendProducerData and ensure that the EvilBufferPool is strictly limited to testing environments. I am unable to approve this pull request, and recommend that other reviewers also examine this code before merging. At a minimum, the high severity issue should be addressed before merging.

Comment on lines +321 to +322
// Visible for testing
protected long sendProducerData(long now) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Making this method protected increases its visibility. It's good that it's visible for testing, but it's important to document why this change was made and how it should be used. This is particularly important since this method is part of the core sending logic.

Suggested change
// Visible for testing
protected long sendProducerData(long now) {
/**
* Send data in the accumulator to the brokers.
* Visible for testing.
*/
@VisibleForTesting
protected long sendProducerData(long now) {

private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
protected final ReentrantLock lock;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Changing the lock to protected seems reasonable to allow subclasses to access it, but consider if there are other ways to achieve the same goal without exposing the lock directly. If this is the best approach, please add a comment explaining why this change was necessary and how subclasses should use the lock safely.

Suggested change
protected final ReentrantLock lock;
/**
* The lock protecting all mutable state in the pool.
* It is protected to allow subclasses in testing to access it, but should be used with caution.
*/
protected final ReentrantLock lock;

Comment on lines 371 to 372
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Removing expiredInflightBatches simplifies the logic, which is good. Can you add a comment explaining why this was removed, and how the expiry is now handled?

Suggested change
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
// Remove expiredInflightBatches since expiry is now handled in the accumulator directly.
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);

@visz11
Copy link
Collaborator

visz11 commented Jul 1, 2025

/do-refacto

@refacto-test
Copy link

refacto-test bot commented Jul 1, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@visz11
Copy link
Collaborator

visz11 commented Sep 16, 2025

/refacto-test-arvi

@refacto-visz
Copy link

refacto-visz bot commented Sep 16, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-visz
Copy link

refacto-visz bot commented Sep 16, 2025

Code Review: Buffer Corruption Fix

👍 Well Done
Comprehensive Test Case

Created detailed integration test that effectively reproduces and validates the buffer corruption issue.

Thread Safety Improvement

Proper locking in BufferPool prevents buffer corruption during reuse.

📌 Files Processed
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
📝 Additional Comments
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java (5)
Forced Thread Sleep

The test intentionally introduces a 500ms sleep in the sendProducerData method to force batch expiration. This blocking sleep causes thread starvation and artificially delays processing, creating a performance bottleneck that would be catastrophic in production code.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Optimization-Pattern-Asynchronous-Processing
  • Algorithmic-Complexity-Time-Efficiency
Resource Lifecycle Management

The test correctly simulates the race condition by introducing a delay in sendProducerData, ensuring batches expire while in flight. This demonstrates how removing the getExpiredInflightBatches method creates a logical flaw where buffers can be deallocated while still in use by in-flight requests, causing data corruption.

Standards:

  • Algorithm-Correctness-Resource-Management
  • Logic-Verification-Concurrency
  • Business-Rule-Data-Integrity
Buffer Zeroing Protection

The test demonstrates a vulnerability by zeroing buffer contents during deallocation, which would corrupt data if the buffer is still in use. This simulates the security issue where in-flight buffers could be prematurely returned to the pool and reused while still being processed.

Standards:

  • CWE-672
  • OWASP-A06
  • NIST-SSDF-PW.1
Test Class Organization

Multiple inner test classes within ProducerIntegrationTest increase complexity and reduce readability. Consider extracting these test helper classes to separate files or a dedicated test utilities package to improve organization and enable reuse across test classes.

Standards:

  • Clean-Code-Class-Organization
  • Maintainability-Quality-Test-Structure
  • SOLID-SRP
Test Builder Complexity

EvilKafkaProducerBuilder directly instantiates and configures internal components of KafkaProducer, creating tight coupling to implementation details. This test will be brittle against internal refactoring of KafkaProducer, requiring maintenance whenever the producer's internal structure changes.

Standards:

  • Clean-Code-Test-Design
  • Maintainability-Quality-Test-Isolation
  • Design-Pattern-Test-Doubles
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java (1)
Removed Expired Batches Method

The removal of getExpiredInflightBatches method and related code simplifies batch expiration handling. This change improves performance by eliminating redundant expiration checks and consolidating batch expiration logic in the accumulator, reducing processing overhead.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Resource-Utilization
  • Optimization-Pattern-Code-Simplification
  • Algorithmic-Complexity-Time-Efficiency
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (1)
Improve Thread Safety

Changing the ReentrantLock visibility from private to protected allows subclasses to directly access the lock. While this enables the test case, it could lead to incorrect lock usage in subclasses, potentially causing deadlocks or inconsistent state.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • ISO-IEC-25010-Functional-Correctness-Appropriateness

Comment on lines 371 to 372
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer Corruption Vulnerability

The code was incorrectly handling expired batches by combining those from accumulator with in-flight batches. This could lead to buffer corruption as in-flight batches might still be in use when their buffers are returned to the pool and reused.

Standards
  • CWE-672
  • OWASP-A06
  • NIST-SSDF-PW.1

private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
protected final ReentrantLock lock;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock Visibility Change

Changing lock visibility from private to protected exposes internal synchronization mechanism. This creates maintenance risk as subclasses can now directly access and manipulate the lock, potentially breaking thread-safety guarantees of the BufferPool class.

Standards
  • Clean-Code-Encapsulation
  • Maintainability-Quality-Thread-Safety
  • Design-Pattern-Information-Hiding

Comment on lines +321 to +322
// Visible for testing
protected long sendProducerData(long now) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sender Method Visibility

Changing method visibility from private to protected increases coupling by exposing internal implementation details. While the change enables testing, it creates a maintenance burden as this method now becomes part of the protected API that must be maintained for compatibility.

Standards
  • Clean-Code-Encapsulation
  • Maintainability-Quality-API-Design
  • Design-Pattern-Information-Hiding

@visz11
Copy link
Collaborator

visz11 commented Sep 16, 2025

/refacto-test-arvi

@refacto-visz
Copy link

refacto-visz bot commented Sep 16, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-visz
Copy link

refacto-visz bot commented Sep 16, 2025

Code Review: Buffer Pool Corruption Fix

👍 Well Done
Comprehensive Test Case

Created thorough integration test that effectively reproduces and validates the buffer corruption issue.

Root Cause Identification

Correctly identified buffer reuse vulnerability in expired batch handling.

📌 Files Processed
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
📝 Additional Comments
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java (1)
Thread Sleep Impact

The test introduces a 500ms sleep in the sendProducerData method to force batch expiration. While appropriate for testing, this pattern would cause significant performance degradation in production code by blocking the sender thread. The test correctly isolates this behavior to the test environment.

Standards:

  • ISO-IEC-25010-Performance-Efficiency-Time-Behavior
  • Optimization-Pattern-Async-Processing
  • Testing-Pattern-Performance-Simulation
clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java (1)
Lock Field Exposure

The lock field visibility was changed from private to protected to support testing. This exposes internal synchronization mechanisms and could lead to incorrect lock usage in subclasses. Consider alternative testing approaches or document this field is for testing only.

Standards:

  • ISO-IEC-25010-Maintainability-Modifiability
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • DbC-Encapsulation

Comment on lines +258 to +267
@Override
public void deallocate(ByteBuffer buffer, int size) {
// Ensure atomicity using reentrant behavior
lock.lock();
try {
Arrays.fill(buffer.array(), (byte) 0);
super.deallocate(buffer, size);
} finally {
lock.unlock();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer Reuse Vulnerability

The test demonstrates a critical buffer reuse vulnerability where deallocated buffers are zeroed out before returning to the pool, but in-flight batches may still reference these buffers. This can lead to data corruption when buffers are reused while still referenced by in-flight requests, potentially causing message integrity issues or information disclosure.

Standards
  • CWE-672
  • OWASP-A04
  • NIST-SSDF-PW.1

Comment on lines +172 to +181
protected long sendProducerData(long now) {
long result = super.sendProducerData(now);
try {
// Ensure the batch expires.
Thread.sleep(500);
return result;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread Sleep Injection

The test overrides sendProducerData to introduce a 500ms delay, ensuring batches expire while still in flight. This deliberate timing manipulation creates the conditions necessary to reproduce the race condition where buffers are deallocated while still being used by the network layer.

Standards
  • Algorithm-Correctness-Concurrency
  • Logic-Verification-Race-Condition
  • Business-Rule-Timing

Comment on lines +321 to 323
// Visible for testing
protected long sendProducerData(long now) {
MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protected Method Visibility

Changing method visibility from private to protected solely for testing purposes exposes internal implementation details. This creates tighter coupling between test and implementation, making future refactoring more difficult as tests now depend on implementation details.

Standards
  • Clean-Code-Encapsulation
  • SOLID-OCP
  • Maintainability-Quality-Coupling

private final long totalMemory;
private final int poolableSize;
private final ReentrantLock lock;
protected final ReentrantLock lock;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protected Field Access

The lock field visibility is changed from private to protected to enable the test class to properly synchronize buffer zeroing operations. This change ensures that the test can accurately simulate and detect the buffer corruption issue while maintaining thread safety.

Standards
  • Algorithm-Correctness-Testability
  • Logic-Verification-Access-Control
  • Business-Rule-Thread-Safety

Comment on lines +88 to +93
static class EvilKafkaProducerBuilder {

Serializer<byte[]> serializer = new ByteArraySerializer();
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = new LogContext("[expire Producer test ]");
Metrics metrics = new Metrics(Time.SYSTEM);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test Class Complexity

The EvilKafkaProducerBuilder test class directly constructs complex KafkaProducer internals, creating tight coupling to implementation details. This approach makes tests brittle to internal changes and increases maintenance burden when producer implementation evolves.

Standards
  • Clean-Code-Test-Design
  • SOLID-SRP
  • Maintainability-Quality-Coupling

Comment on lines +88 to +247
static class EvilKafkaProducerBuilder {

Serializer<byte[]> serializer = new ByteArraySerializer();
ApiVersions apiVersions = new ApiVersions();
LogContext logContext = new LogContext("[expire Producer test ]");
Metrics metrics = new Metrics(Time.SYSTEM);

String clientId;
String transactionalId;
ProducerConfig config;
ProducerMetadata metadata;
RecordAccumulator accumulator;
Partitioner partitioner;
Sender sender;
ProducerInterceptors<String, String> interceptors;

@SuppressWarnings({"unchecked", "this-escape"})
Producer<byte[], byte[]> build(Map<String, Object> configs) {
this.config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, null, null));
transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
return new KafkaProducer<>(
config,
logContext,
metrics,
serializer,
serializer,
buildMetadata(),
buildAccumulator(),
null,
buildSender(),
buildInterceptors(),
buildPartition(),
Time.SYSTEM,
ioThread(),
Optional.empty()
);
}


private ProducerInterceptors buildInterceptors() {
this.interceptors = new ProducerInterceptors<>(List.of(), metrics);
return this.interceptors;
}

private Partitioner buildPartition() {
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
return this.partitioner;
}

private Sender buildSender() {
int maxInflightRequests = config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = ClientUtils.createNetworkClient(config,
this.metrics,
"producer",
logContext,
apiVersions,
Time.SYSTEM,
maxInflightRequests,
metadata,
throttleTimeSensor,
null);

short acks = Short.parseShort(config.getString(ProducerConfig.ACKS_CONFIG));
this.sender = new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
config.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
Time.SYSTEM,
requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
null) {
@Override
protected long sendProducerData(long now) {
long result = super.sendProducerData(now);
try {
// Ensure the batch expires.
Thread.sleep(500);
return result;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
return this.sender;
}

private RecordAccumulator buildAccumulator() {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
int batchSize = Math.max(1, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
Plugin<Partitioner> partitionerPlugin = Plugin.wrapInstance(
config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),
metrics,
ProducerConfig.PARTITIONER_CLASS_CONFIG);
boolean enableAdaptivePartitioning = partitionerPlugin.get() == null &&
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
this.accumulator = new RecordAccumulator(logContext,
batchSize,
NoCompression.NONE,
(int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE),
retryBackoffMs,
retryBackoffMaxMs,
config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG),
new RecordAccumulator.PartitionerConfig(
enableAdaptivePartitioning,
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG)
),
metrics,
"producer-metrics",
Time.SYSTEM,
null,
new EvilBufferPool(config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG), batchSize, metrics,
Time.SYSTEM, "producer-metrics"));
return accumulator;
}

private ProducerMetadata buildMetadata() {
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(clientId, config);
ClusterResourceListeners clusterResourceListeners = ClientUtils.configureClusterResourceListeners(
List.of(),
reporters,
List.of(
Plugin.wrapInstance(serializer, metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).get(),
Plugin.wrapInstance(serializer, metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).get()));
this.metadata = new ProducerMetadata(retryBackoffMs,
retryBackoffMaxMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
metadata.bootstrap(ClientUtils.parseAndValidateAddresses(config));
return metadata;
}

private KafkaThread ioThread() {
KafkaThread ioThread = new KafkaThread("test_io_thread", sender, true);
ioThread.start();
return ioThread;
}
}

static class EvilBufferPool extends BufferPool {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evil Class Naming

Using 'Evil' prefix in class names doesn't clearly communicate the test class purpose or behavior. More descriptive names like 'CorruptingBufferPool' or 'DelayingSenderProducerBuilder' would better communicate the test component's intended behavior and failure mode.

Standards
  • Clean-Code-Naming
  • Maintainability-Quality-Clarity

@visz11
Copy link
Collaborator

visz11 commented Sep 16, 2025

/refacto-test-arvi

@refacto-visz
Copy link

refacto-visz bot commented Sep 16, 2025

Refacto is reviewing this PR. Please wait for the review comments to be posted.

@refacto-visz
Copy link

refacto-visz bot commented Sep 16, 2025

Code Review: Buffer Pool Implementation

👍 Well Done
Comprehensive Test Case

Added test case effectively reproduces and validates the buffer corruption issue.

Visibility Enhancement

Protected method visibility change enables proper testing of critical functionality.

📌 Files Processed
  • clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
  • clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
📝 Additional Comments
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIntegrationTest.java (2)
Explicit Test Assertion

The test verifies message count but doesn't explicitly verify message content integrity. Since this test is specifically checking for buffer corruption issues, adding verification of the actual message content would strengthen the test's reliability validation.

Standards:

  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Testing
Sender Sleep Reliability

Using Thread.sleep() for timing-dependent tests can lead to flaky tests on different environments. Consider using a more deterministic approach or at least capturing and restoring the thread's interrupted status rather than wrapping in RuntimeException.

Standards:

  • ISO-IEC-25010-Reliability-Maturity
  • SRE-Testing

Comment on lines +263 to +264
Arrays.fill(buffer.array(), (byte) 0);
super.deallocate(buffer, size);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffer Corruption Risk

The EvilBufferPool implementation zeroes out buffer contents before returning it to the pool, but after acquiring the lock. This creates a race condition where a buffer could be corrupted while still in use by another thread, causing data corruption.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • ISO-IEC-25010-Functional-Correctness-Appropriateness
  • SRE-Error-Handling

Comment on lines +178 to +179
} catch (InterruptedException e) {
throw new RuntimeException(e);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interrupted Thread Handling

Thread interruption status is lost when InterruptedException is caught and wrapped in RuntimeException. This violates thread interruption contract and can lead to threads that cannot be properly terminated, potentially causing resource leaks.

Standards
  • ISO-IEC-25010-Reliability-Fault-Tolerance
  • SRE-Error-Handling

Comment on lines +247 to +268
static class EvilBufferPool extends BufferPool {

public EvilBufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
super(memory, poolableSize, metrics, time, metricGrpName);
}

/**
* Override deallocate to intentionally corrupt the ByteBuffer being returned to the pool.
* This is used to simulate a scenario where an in-flight buffer is mistakenly reused
* and its contents are unexpectedly modified, helping expose buffer reuse bugs.
*/
@Override
public void deallocate(ByteBuffer buffer, int size) {
// Ensure atomicity using reentrant behavior
lock.lock();
try {
Arrays.fill(buffer.array(), (byte) 0);
super.deallocate(buffer, size);
} finally {
lock.unlock();
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve Test Documentation

While the test class has good documentation about simulating buffer corruption, it should clarify that this implementation intentionally introduces a race condition to test the system's resilience against buffer corruption scenarios.

Standards
  • ISO-IEC-25010-Maintainability
  • SRE-Observability

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants