Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,6 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
remoteAddress, subscriptionName));
consumers.remove(consumerId, existingConsumerFuture);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are three clients in your example, the pulsar client maintains its own connection pool, which means the client1 will not use the same connection as client2 or client3.

The code you changed is the code of ServerCnx(this object is not shared across multi clients), so it will not affect other consumers of other clients

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional background:

If you have two pulsar clients, the consumers registered will be like this:

  • pulsar-client-1
    • connection-1 (one-to-one relationship with ServerCnx)
      • consumer-1
      • consumer-2
    • connection-2
      • consumer-3
      • consumer-4
  • pulsar-client-2
    • connection-3
      • consumer-5
      • consumer-6
    • connection-4
      • consumer-7
      • consumer-8

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So only consumer-1 and consumer-2 can be in conflict since their consumer-id is not the same, so all things are OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is one client request three times. Because in our server log, "Subscribing on topic" occur three times in a short time from the same remoteAddress host:ip. Therefore they are in the same connection ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is one client request three times. Because in our server log, "Subscribing on topic" occur three times in a short time from the same remoteAddress host:ip. Therefore they are in the same connection ?

could you provide the logs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the log, we can not confirm if these three consumers are the same, because we can not know their consumer id. This log was been improved in the PR https://github.com/apache/pulsar/pull/20568/files#diff-1e0e8195fb5ec5a6d79acbc7d859c025a9b711f94e6ab37c94439e99b3202e84R1168

Copy link
Contributor

@poorbarcode poorbarcode Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time req-1 req-2 close connection
1 put a new consumerFuture
2 close connection
3 mark the consumerFuture as failed
4 close the connection
5 got consumerFuture(failed)
6 remove the failed future
7 add the consumer into the list and set(list.size = 1 and set.size = 1)
8 put the second consumerFuture
9 add the second consumer into the list and set(list.size = 2 and set.size = 1)
10 remove the consumer from the list and set(list.size = 1 and set.size = 0)

Do you want to say the three requests executed as above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, there is a PR trying to fix the concurrent call subscribe in the same client

I still think the current PR is trying to solve this issue is meaningful, and it would be nice to have a test that can reproduce it.

Copy link
Contributor

@poorbarcode poorbarcode Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TakaHiR07

Could you take a look at this PR #22283 , #22270 and modify the current one? If yes, I will close #22283, #22270 ❤️

commandSender.sendErrorResponse(requestId, error,
"Consumer that failed is already present on the connection");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -38,7 +43,10 @@
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -293,6 +301,182 @@ private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception
mockBrokerService.resetHandleCloseConsumer();
}

@Test
public void testConcurrentCreatedConsumerAndRetryAfterFix() throws Exception {
concurrentConsumerCreatedAndRetry("persistent://prop/use/ns/t1", true);
}

@Test
public void testConcurrentCreatedConsumerAndRetryBeforeFix() throws Exception {
concurrentConsumerCreatedAndRetry("persistent://prop/use/ns/t1", false);
}

private void concurrentConsumerCreatedAndRetry(String topic, boolean isFix) throws Exception {

@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
.operationTimeout(3, TimeUnit.SECONDS).build();
final AtomicInteger subscribeCounter = new AtomicInteger(0);
final AtomicInteger closeConsumerCounter = new AtomicInteger(0);
final AtomicInteger subscribeFinishCounter = new AtomicInteger(0);

AtomicBoolean canExecute = new AtomicBoolean(false);

CopyOnWriteArrayList<org.apache.pulsar.broker.service.Consumer> consumerList = new CopyOnWriteArrayList<>();
ObjectSet<org.apache.pulsar.broker.service.Consumer> consumerSet = new ObjectHashSet<>();
ConcurrentLongHashMap<CompletableFuture<org.apache.pulsar.broker.service.Consumer>> consumers =
ConcurrentLongHashMap.<CompletableFuture<org.apache.pulsar.broker.service.Consumer>>newBuilder()
.expectedItems(8)
.concurrencyLevel(1)
.build();
org.apache.pulsar.broker.service.Consumer mockConsumer =
Mockito.mock(org.apache.pulsar.broker.service.Consumer.class);

AtomicReference<Long> id = new AtomicReference<>();
AtomicReference<SocketAddress> address = new AtomicReference<>();

// subscribe 1: Success. Then broker restart, trigger client reconnect
// subscribe 2: Broker not fully success, but client timeout then trigger handleClose and reconnect again
// subscribe 3: Remove the failed existingConsumerFuture, then client reconnect again
// subscribe 4: Execute concurrently with subscribe 2
mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();
SocketAddress remoteAddress = ctx.channel().remoteAddress();
int subscribeCount = subscribeCounter.incrementAndGet();

// ensure consumerId and clientAddress is the same during 4 handleSubscribe requests
// so that add consumer is always the same consumer
if (id.get() == null && address.get() == null) {
id.set(consumerId);
address.set(remoteAddress);
} else {
Assert.assertEquals(id.get(), consumerId);
Assert.assertEquals(remoteAddress, address.get());
}

CompletableFuture<org.apache.pulsar.broker.service.Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<org.apache.pulsar.broker.service.Consumer> existingConsumerFuture =
consumers.putIfAbsent(consumerId, consumerFuture);

if (existingConsumerFuture != null) {
if (!existingConsumerFuture.isDone()) {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.ServiceNotReady,
"Consumer is already present on the connection"));
} else if (existingConsumerFuture.isCompletedExceptionally()){
ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
"Consumer that failed is already present on the connection"));
// the fix of pr-20583
if (!isFix) {
consumers.remove(consumerId, existingConsumerFuture);
}
// subscribe 3 finish
} else {
ctx.writeAndFlush(Commands.newSuccess(requestId));
}
subscribeFinishCounter.incrementAndGet();
return;
}

// simulate add consumer
// must use asyncFuture to simulate, otherwise requests can not be concurrent
CompletableFuture.supplyAsync(() -> {
org.apache.pulsar.broker.service.Consumer consumer = mockConsumer;
consumerSet.add(consumer);
consumerList.add(consumer);
return consumer;
}).thenAccept(consumer -> {
// block second subscribe, and ensure client timeout
// make the subscribe 2 and 4 execute concurrently in broker
if (subscribeCount == 2) {
while (!canExecute.get()) {
// wait until subscribe 4 finish add consumer
}
}

if (subscribeCount == 4) {
canExecute.set(true);
}

if (consumerFuture.complete(consumer)) {
ctx.writeAndFlush(Commands.newSuccess(requestId));

if (subscribeCount == 1) {
// simulate broker restart and trigger client reconnect
if (consumerSet.removeAll(consumer) == 1) {
consumerList.remove(consumer);
}
consumers.remove(consumerId, consumerFuture);
ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1));
// subscribe 1 finish
} else {
// subscribe 4 finish
}
} else {
// cleared consumer created after timeout on client side
// simulate close consumer
if (consumerSet.removeAll(consumer) == 1) {
consumerList.remove(consumer);
}
consumers.remove(consumerId, consumerFuture);
// subscribe 2 finish
}
});
subscribeFinishCounter.incrementAndGet();
});

mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
long requestId = closeConsumer.getRequestId();
long consumerId = closeConsumer.getConsumerId();
closeConsumerCounter.incrementAndGet();

CompletableFuture<org.apache.pulsar.broker.service.Consumer> consumerFuture = consumers.get(consumerId);
if (consumerFuture == null) {
ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
}

if (!consumerFuture.isDone() && consumerFuture
.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
}

if (consumerFuture.isCompletedExceptionally()) {
ctx.writeAndFlush(Commands.newSuccess(requestId));
return;
}

Assert.fail("should not go here");
// simulate close consumer
org.apache.pulsar.broker.service.Consumer consumer = consumerFuture.getNow(null);
if (consumerSet.removeAll(consumer) == 1) {
consumerList.remove(consumer);
}
consumers.remove(consumerId, consumerFuture);
ctx.writeAndFlush(Commands.newSuccess(requestId));
});

// Create consumer (subscribe) should succeed then upon closure, it should reattempt creation.
client.newConsumer().topic(topic).subscriptionName("test").subscribe();

Awaitility.await().until(() -> closeConsumerCounter.get() == 1);
Awaitility.await().until(() -> subscribeFinishCounter.get() == 4);

mockBrokerService.resetHandleSubscribe();
mockBrokerService.resetHandleCloseConsumer();

if (isFix) {
Assert.assertEquals(consumers.size(), 1);
Assert.assertEquals(consumerSet.size(), 1);
Assert.assertEquals(consumerList.size(), 1);
} else {
Assert.assertEquals(consumers.size(), 1);
Assert.assertEquals(consumerSet.size(), 0);
Assert.assertEquals(consumerList.size(), 1);
}
}

@Test
public void testProducerFailDoesNotFailOtherProducer() throws Exception {
producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
Expand Down