Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
remoteAddress, subscriptionName));
consumers.remove(consumerId, existingConsumerFuture);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three clients in your example, the pulsar client maintains its own connection pool, which means the client1 will not use the same connection as client2 or client3.

The code you changed is the code of ServerCnx(this object is not shared across multi clients), so it will not affect other consumers of other clients

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional background:

If you have two pulsar clients, the consumers registered will be like this:

  • pulsar-client-1
    • connection-1 (one-to-one relationship with ServerCnx)
      • consumer-1
      • consumer-2
    • connection-2
      • consumer-3
      • consumer-4
  • pulsar-client-2
    • connection-3
      • consumer-5
      • consumer-6
    • connection-4
      • consumer-7
      • consumer-8

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So only consumer-1 and consumer-2 can be in conflict since their consumer-id is not the same, so all things are OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is one client request three times. Because in our server log, "Subscribing on topic" occur three times in a short time from the same remoteAddress host:ip. Therefore they are in the same connection ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is one client request three times. Because in our server log, "Subscribing on topic" occur three times in a short time from the same remoteAddress host:ip. Therefore they are in the same connection ?

could you provide the logs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the log, we can not confirm if these three consumers are the same, because we can not know their consumer id. This log was been improved in the PR https://github.com/apache/pulsar/pull/20568/files#diff-1e0e8195fb5ec5a6d79acbc7d859c025a9b711f94e6ab37c94439e99b3202e84R1168

Copy link
Contributor

@poorbarcode poorbarcode Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time req-1 req-2 close connection
1 put a new consumerFuture
2 close connection
3 mark the consumerFuture as failed
4 close the connection
5 got consumerFuture(failed)
6 remove the failed future
7 add the consumer into the list and set(list.size = 1 and set.size = 1)
8 put the second consumerFuture
9 add the second consumer into the list and set(list.size = 2 and set.size = 1)
10 remove the consumer from the list and set(list.size = 1 and set.size = 0)

Do you want to say the three requests executed as above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, there is a PR trying to fix the concurrent call subscribe in the same client

I still think the current PR is trying to solve this issue is meaningful, and it would be nice to have a test that can reproduce it.

Copy link
Contributor

@poorbarcode poorbarcode Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TakaHiR07

Could you take a look at this PR #22283 , #22270 and modify the current one? If yes, I will close #22283, #22270 ❤️

commandSender.sendErrorResponse(requestId, error,
"Consumer that failed is already present on the connection");
} else {
Expand Down