Skip to content

[fix][client] Prevent epoch race in MultiTopicsConsumer batch receive#25210

Open
ChimdumebiNebolisa wants to merge 9 commits intoapache:masterfrom
ChimdumebiNebolisa:fix-25204-consumer-epoch
Open

[fix][client] Prevent epoch race in MultiTopicsConsumer batch receive#25210
ChimdumebiNebolisa wants to merge 9 commits intoapache:masterfrom
ChimdumebiNebolisa:fix-25204-consumer-epoch

Conversation

@ChimdumebiNebolisa
Copy link

@ChimdumebiNebolisa ChimdumebiNebolisa commented Feb 4, 2026

Fixes #25204

Motivation

MultiTopicsConsumer processes a received batch by validating the consumer epoch per message. If redeliverUnacknowledgedMessages() runs concurrently, it can increment consumerEpoch while the batch loop is still iterating, which can produce a mixed outcome where part of the batch is accepted and the rest is filtered. This matches the behavior reported in #25204.

Modifications

  • Serialized batch processing with redeliver by holding incomingQueueLock across the entire batch loop and the incomingMessages.size() read, so consumerEpoch cannot change mid-batch.
  • Updated MultiTopicsConsumerEpochRaceTest to assert the post-fix invariant:
    • redeliver is blocked while the batch holds the lock
    • the batch observes a stable epoch and accepts both messages (acceptedByEpochCount == 2)
    • redeliver completes shortly after the batch releases the lock

Verifying this change

  • This change added tests and can be verified as follows:
    • Run the regression test (it is in flaky, so it must not be excluded):
      • mvn --% -pl pulsar-client "-Dtest=org.apache.pulsar.client.impl.MultiTopicsConsumerEpochRaceTest" "-DexcludedGroups=quarantine" test
    • Run module tests:
      • mvn -pl pulsar-client test

Does this pull request potentially affect one of the following parts

  • The threading model

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: ChimdumebiNebolisa#2

@github-actions
Copy link

github-actions bot commented Feb 4, 2026

@ChimdumebiNebolisa Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@ChimdumebiNebolisa ChimdumebiNebolisa changed the title Fix 25204 consumer epoch [test][client] Add deterministic regression for consumer epoch race Feb 4, 2026
@ChimdumebiNebolisa ChimdumebiNebolisa changed the title [test][client] Add deterministic regression for consumer epoch race [fix][client] Prevent epoch race in MultiTopicsConsumer batch receive Feb 5, 2026
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Feb 5, 2026
@@ -0,0 +1,185 @@
# Investigation: GitHub issue #25204 – MultiTopicsConsumer receives message with older ConsumerEpoch after redeliverUnacknowledgedMessages()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not commit the investigation document to the code base. You can post it to Gist and share the link in the PR description

@BewareMyPower BewareMyPower added type/bug The PR fixed a bug or issue reported a bug area/client labels Feb 6, 2026
* validations, so the epoch changes mid-batch. Without the fix, exactly one message is delivered
* (bug). The test asserts totalDelivered != 1.
*/
@Test(groups = "flaky")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it flaky? If the bug is fixed, this test should not be flaky

Comment on lines 136 to 139
Field consumersField = MultiTopicsConsumerImpl.class.getDeclaredField("consumers");
consumersField.setAccessible(true);
java.util.Map<String, ConsumerImpl<byte[]>> consumersMap =
(java.util.Map<String, ConsumerImpl<byte[]>>) consumersField.get(multiConsumer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use reflection. consumers is a protected field, you can access consumers directly

In addition, AI tends to use full qualified type name (java.util.Map) rather than Map with import java.util.Map. You can customize the AGENTS.md or manually review the AI generated code.

@ChimdumebiNebolisa
Copy link
Author

ChimdumebiNebolisa commented Feb 7, 2026

Thanks for the review.

I’ve updated the test to address the feedback:

  • Removed reflection for consumers and access it directly (it’s protected).
  • Removed the flaky grouping and eliminated Thread.sleep by coordinating with latches.
  • Replaced assertTrue(isEmpty()) with assertEquals(size, 0) and removed the redundant ConnectionHandler import.

The remaining reflection is only for private members with no stable test seam (startReceivingMessages, resumeReceivingFromPausedConsumersIfNeeded, and MessageImpl.consumerEpoch). The test passes deterministically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/client doc-not-needed Your PR changes do not impact docs type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] MultiTopicsConsumer still received message with older ConsumerEpoch after redeliverUnacknowledgeMessages()

2 participants