Skip to content

Conversation

@nodece
Copy link
Member

@nodece nodece commented Aug 18, 2025

Motivation

When a consumer sends a subscribe command and then immediately sends a close command while the subscription is still in progress, the broker logs messages like:

2025-08-15T14:42:52,889+0800 [pulsar-io-4-40] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.184.74.6:44058] Closing consumer: consumerId=13296
2025-08-15T14:42:52,889+0800 [pulsar-io-4-40] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.184.74.6:44058] Closed consumer before its creation was completed. consumerId=13296
2025-08-15T14:42:53,258+0800 [pulsar-io-4-40] WARN  org.apache.pulsar.broker.service.ServerCnx - [/10.184.74.6:44058][persistent://public/10000/__change_events-partition-0][multiTopicsReader-f3f7fd855d] A failed consumer with id is already present on the connection, consumerId=13296

This happens because the consumer remains in the connection's cache even after it is closed before the subscription is fully established. As a result, any new subscription attempt with the same consumer ID fails with A failed consumer with id is already present on the connection.

This regression was introduced by #22283.

Modifications

  • Remove consumer from cache if closed before creation in the handleCloseConsumer.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 18, 2025
response = getResponse();
assertEquals(response.getClass(), CommandError.class);
assertEquals(((CommandError) response).getRequestId(), 3);
// We should not receive response for 1st consumer, since it was cancelled by the close
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to respond to the client even if the subscribe request was cancelled by a close command, otherwise the client will be left hanging.

This is a TODO.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens because the consumer remains in the connection's cache even after it is closed before the subscription is fully established. As a result, any new subscription attempt with the same consumer ID fails with A failed consumer with id is already present on the connection.

I think this PR makes sense to me, but I'm just wondering why would a client perform a new subscription attempt with the same consumer ID if it already closed the consumer? Explaining the scenario in the PR description would be useful context.

@nodece
Copy link
Member Author

nodece commented Aug 18, 2025

This happens because the consumer remains in the connection's cache even after it is closed before the subscription is fully established. As a result, any new subscription attempt with the same consumer ID fails with A failed consumer with id is already present on the connection.

I think this PR makes sense to me, but I'm just wondering why would a client perform a new subscription attempt with the same consumer ID if it already closed the consumer? Explaining the scenario in the PR description would be useful context.

@lhotari Yes, that's correct. I guess that the issue occurs when:

  1. The client sends a subscribe request, but the broker is still processing it.
  2. The client times out waiting for the response and sends a close subscribe request.
  3. If the client then attempts to send a new subscribe request, the broker may still have a "failed" consumer cached from the previous request, resulting in the error: A failed consumer with id is already present on the connection.

The producer has same bugfix: https://github.com/apache/pulsar/blob/v3.0.12/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L2073

@dao-jun
Copy link
Member

dao-jun commented Aug 19, 2025

I personally think we don't need to remove the Consumer Future unless it's done.
Maybe we should introduce Subscription creation timeout for the case, to avoid sub creation costs long time.

@nodece
Copy link
Member Author

nodece commented Aug 19, 2025

I personally think we don't need to remove the Consumer Future unless it's done. Maybe we should introduce Subscription creation timeout for the case, to avoid sub creation costs long time.

This is an async(improvement) issue and is not related to this PR.

When the consumer sends a close request, the broker will complete the future. We must remove the future at this point; Otherwise, the broker cache retains the failed consumer, and subsequent subscribe requests will hit the error:
A failed consumer with id is already present on the connection.

Please see #24638 (comment).

@lhotari
Copy link
Member

lhotari commented Aug 19, 2025

  1. If the client then attempts to send a new subscribe request, the broker may still have a "failed" consumer cached from the previous request, resulting in the error: A failed consumer with id is already present on the connection.

@nodece I think that we need @poorbarcode to explain why #22283 was needed since that seems to intentionally change the behavior and let the failed consumer future remain in the map.

@lhotari
Copy link
Member

lhotari commented Aug 19, 2025

@nodece @poorbarcode
Based on this comment #22283 (comment), I get the understanding that race conditions should be prevented while subscribing to the topic and that's why the solution was added.

Instead of leaving the failed future in the map to prevent race conditions, perhaps org.apache.pulsar.common.util.FutureUtil.Sequencer could be used to execute subscription requests sequentially so that race conditions could be avoided?

@nodece
Copy link
Member Author

nodece commented Aug 19, 2025

@nodece @poorbarcode Based on this comment #22283 (comment), I get the understanding that race conditions should be prevented while subscribing to the topic and that's why the solution was added.

Once a subscription (including close) has completed, the consumer can reuse the same consumer ID (final value) to send a new subscription request.

The main goal here seems to be preventing a consumer from sending multiple concurrent subscription requests. However, the old code also blocks reuse of the same consumer ID even after the subscription has done, which looks more like a bug since at that point the consumer should be allowed to resume with the same ID.

I also noticed that we already handle similar concurrency cases in the ledger module, so we could apply the same approach here for newNonDurableCursor. This would keep concurrency control in the lower layer, and reduce unnecessary work in the upper layer:
https://github.com/apache/pulsar/blob/v3.0.12/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L978-L1005

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@nodece nodece force-pushed the fix-close-consumer branch from c726ab7 to df530aa Compare August 19, 2025 10:37
@nodece
Copy link
Member Author

nodece commented Aug 19, 2025

A non-durable cursor bugfix: #24643

@nodece
Copy link
Member Author

nodece commented Aug 19, 2025

/pulsarbot rerun-failure-checks

@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 91.66667% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 74.28%. Comparing base (34f8657) to head (df530aa).

Files with missing lines Patch % Lines
...va/org/apache/pulsar/broker/service/ServerCnx.java 90.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24638      +/-   ##
============================================
- Coverage     74.28%   74.28%   -0.01%     
+ Complexity    33164    32784     -380     
============================================
  Files          1882     1882              
  Lines        146855   146858       +3     
  Branches      16867    16867              
============================================
- Hits         109094   109089       -5     
+ Misses        29088    29087       -1     
- Partials       8673     8682       +9     
Flag Coverage Δ
inttests 26.64% <33.33%> (-0.10%) ⬇️
systests 23.36% <33.33%> (-0.07%) ⬇️
unittests 73.76% <91.66%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
.../pulsar/broker/service/BrokerServiceException.java 86.55% <100.00%> (+0.22%) ⬆️
...va/org/apache/pulsar/broker/service/ServerCnx.java 73.00% <90.00%> (+0.35%) ⬆️

... and 96 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@poorbarcode
Copy link
Contributor

@nodece

Not a bug. Once the consumer checks the request timeout, it will send the close subscription request to the broker.

Could you write a test for this case?

@nodece
Copy link
Member Author

nodece commented Aug 20, 2025

@nodece

Not a bug. Once the consumer checks the request timeout, it will send the close subscription request to the broker.

Could you write a test for this case?

@poorbarcode We only need to verify the logic in handleSubscribe and handleCloseConsumer in ServerCnx.java.

The existing test testSubscribeBookieTimeout already covers this scenario and validates the changes introduced in this PR.

@nodece nodece requested a review from poorbarcode August 20, 2025 10:49
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@nodece nodece requested a review from BewareMyPower August 20, 2025 11:40
@nodece
Copy link
Member Author

nodece commented Aug 20, 2025

/pulsarbot rerun-failure-checks

@nodece
Copy link
Member Author

nodece commented Aug 21, 2025

Thanks for the review and discussion. After re-evaluating the logic, I realized that the current behavior is correct: The consumer should only be removed from the cache once the subscription completes successfully.

Removing it earlier could lead to inconsistent states. I'll close this PR.

@nodece nodece closed this Aug 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants