Skip to content

Commit 7bbfba5

Browse files
author
fanjianye
committed
add test for concurrently created consumer
1 parent 4085518 commit 7bbfba5

File tree

1 file changed

+187
-0
lines changed

1 file changed

+187
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,33 @@
2222
import static org.testng.Assert.assertFalse;
2323
import static org.testng.Assert.assertTrue;
2424
import static org.testng.Assert.fail;
25+
import com.carrotsearch.hppc.ObjectHashSet;
26+
import com.carrotsearch.hppc.ObjectSet;
2527
import io.netty.channel.ChannelHandlerContext;
28+
import java.net.SocketAddress;
2629
import java.util.Map;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.CopyOnWriteArrayList;
2732
import java.util.concurrent.CountDownLatch;
2833
import java.util.concurrent.TimeUnit;
2934
import java.util.concurrent.atomic.AtomicBoolean;
3035
import java.util.concurrent.atomic.AtomicInteger;
3136
import java.util.concurrent.atomic.AtomicReference;
3237
import lombok.Cleanup;
3338
import org.apache.bookkeeper.common.util.JsonUtil;
39+
import org.apache.commons.lang3.StringUtils;
40+
import org.apache.pulsar.broker.service.BrokerServiceException;
3441
import org.apache.pulsar.client.impl.ConsumerBase;
3542
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
3643
import org.apache.pulsar.client.impl.ProducerBase;
3744
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
3845
import org.apache.pulsar.common.api.proto.ServerError;
3946
import org.apache.pulsar.common.protocol.Commands;
4047
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
48+
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
4149
import org.awaitility.Awaitility;
50+
import org.mockito.Mockito;
51+
import org.testng.Assert;
4252
import org.testng.annotations.AfterClass;
4353
import org.testng.annotations.BeforeClass;
4454
import org.testng.annotations.Test;
@@ -293,6 +303,182 @@ private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception
293303
mockBrokerService.resetHandleCloseConsumer();
294304
}
295305

306+
@Test
307+
public void testConcurrentCreatedConsumerAndRetryAfterFix() throws Exception {
308+
concurrentConsumerCreatedAndRetry("persistent://prop/use/ns/t1", true);
309+
}
310+
311+
@Test
312+
public void testConcurrentCreatedConsumerAndRetryBeforeFix() throws Exception {
313+
concurrentConsumerCreatedAndRetry("persistent://prop/use/ns/t1", false);
314+
}
315+
316+
private void concurrentConsumerCreatedAndRetry(String topic, boolean isFix) throws Exception {
317+
318+
@Cleanup
319+
PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
320+
.operationTimeout(3, TimeUnit.SECONDS).build();
321+
final AtomicInteger subscribeCounter = new AtomicInteger(0);
322+
final AtomicInteger closeConsumerCounter = new AtomicInteger(0);
323+
final AtomicInteger subscribeFinishCounter = new AtomicInteger(0);
324+
325+
AtomicBoolean canExecute = new AtomicBoolean(false);
326+
327+
CopyOnWriteArrayList<org.apache.pulsar.broker.service.Consumer> consumerList = new CopyOnWriteArrayList<>();
328+
ObjectSet<org.apache.pulsar.broker.service.Consumer> consumerSet = new ObjectHashSet<>();
329+
ConcurrentLongHashMap<CompletableFuture<org.apache.pulsar.broker.service.Consumer>> consumers =
330+
ConcurrentLongHashMap.<CompletableFuture<org.apache.pulsar.broker.service.Consumer>>newBuilder()
331+
.expectedItems(8)
332+
.concurrencyLevel(1)
333+
.build();
334+
org.apache.pulsar.broker.service.Consumer mockConsumer =
335+
Mockito.mock(org.apache.pulsar.broker.service.Consumer.class);
336+
337+
AtomicReference<Long> id = new AtomicReference<>();
338+
AtomicReference<SocketAddress> address = new AtomicReference<>();
339+
340+
// subscribe 1: Success. Then broker restart, trigger client reconnect
341+
// subscribe 2: Broker not fully success, but client timeout then trigger handleClose and reconnect again
342+
// subscribe 3: Remove the failed existingConsumerFuture, then client reconnect again
343+
// subscribe 4: Execute concurrently with subscribe 2
344+
mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
345+
final long requestId = subscribe.getRequestId();
346+
final long consumerId = subscribe.getConsumerId();
347+
SocketAddress remoteAddress = ctx.channel().remoteAddress();
348+
int subscribeCount = subscribeCounter.incrementAndGet();
349+
350+
// ensure consumerId and clientAddress is the same during 4 handleSubscribe requests
351+
// so that add consumer is always the same consumer
352+
if (id.get() == null && address.get() == null) {
353+
id.set(consumerId);
354+
address.set(remoteAddress);
355+
} else {
356+
Assert.assertEquals(id.get(), consumerId);
357+
Assert.assertEquals(remoteAddress, address.get());
358+
}
359+
360+
CompletableFuture<org.apache.pulsar.broker.service.Consumer> consumerFuture = new CompletableFuture<>();
361+
CompletableFuture<org.apache.pulsar.broker.service.Consumer> existingConsumerFuture =
362+
consumers.putIfAbsent(consumerId, consumerFuture);
363+
364+
if (existingConsumerFuture != null) {
365+
if (!existingConsumerFuture.isDone()) {
366+
ctx.writeAndFlush(Commands.newError(requestId, ServerError.ServiceNotReady,
367+
"Consumer is already present on the connection"));
368+
} else if (existingConsumerFuture.isCompletedExceptionally()){
369+
ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
370+
"Consumer that failed is already present on the connection"));
371+
// the fix of pr-20583
372+
if (!isFix) {
373+
consumers.remove(consumerId, existingConsumerFuture);
374+
}
375+
// subscribe 3 finish
376+
} else {
377+
ctx.writeAndFlush(Commands.newSuccess(requestId));
378+
}
379+
subscribeFinishCounter.incrementAndGet();
380+
return;
381+
}
382+
383+
// simulate add consumer
384+
// must use asyncFuture to simulate, otherwise requests can not be concurrent
385+
CompletableFuture.supplyAsync(() -> {
386+
org.apache.pulsar.broker.service.Consumer consumer = mockConsumer;
387+
consumerSet.add(consumer);
388+
consumerList.add(consumer);
389+
return consumer;
390+
}).thenAccept(consumer -> {
391+
// block second subscribe, and ensure client timeout
392+
// make the subscribe 2 and 4 execute concurrently in broker
393+
if (subscribeCount == 2) {
394+
while (!canExecute.get()) {
395+
// wait until subscribe 4 finish add consumer
396+
}
397+
}
398+
399+
if (subscribeCount == 4) {
400+
canExecute.set(true);
401+
}
402+
403+
if (consumerFuture.complete(consumer)) {
404+
ctx.writeAndFlush(Commands.newSuccess(requestId));
405+
406+
if (subscribeCount == 1) {
407+
// simulate broker restart and trigger client reconnect
408+
if (consumerSet.removeAll(consumer) == 1) {
409+
consumerList.remove(consumer);
410+
}
411+
consumers.remove(consumerId, consumerFuture);
412+
ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1));
413+
// subscribe 1 finish
414+
} else {
415+
// subscribe 4 finish
416+
}
417+
} else {
418+
// cleared consumer created after timeout on client side
419+
// simulate close consumer
420+
if (consumerSet.removeAll(consumer) == 1) {
421+
consumerList.remove(consumer);
422+
}
423+
consumers.remove(consumerId, consumerFuture);
424+
// subscribe 2 finish
425+
}
426+
});
427+
subscribeFinishCounter.incrementAndGet();
428+
});
429+
430+
mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
431+
long requestId = closeConsumer.getRequestId();
432+
long consumerId = closeConsumer.getConsumerId();
433+
closeConsumerCounter.incrementAndGet();
434+
435+
CompletableFuture<org.apache.pulsar.broker.service.Consumer> consumerFuture = consumers.get(consumerId);
436+
if (consumerFuture == null) {
437+
ctx.writeAndFlush(Commands.newSuccess(requestId));
438+
return;
439+
}
440+
441+
if (!consumerFuture.isDone() && consumerFuture
442+
.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
443+
ctx.writeAndFlush(Commands.newSuccess(requestId));
444+
return;
445+
}
446+
447+
if (consumerFuture.isCompletedExceptionally()) {
448+
ctx.writeAndFlush(Commands.newSuccess(requestId));
449+
return;
450+
}
451+
452+
Assert.fail("should not go here");
453+
// simulate close consumer
454+
org.apache.pulsar.broker.service.Consumer consumer = consumerFuture.getNow(null);
455+
if (consumerSet.removeAll(consumer) == 1) {
456+
consumerList.remove(consumer);
457+
}
458+
consumers.remove(consumerId, consumerFuture);
459+
ctx.writeAndFlush(Commands.newSuccess(requestId));
460+
});
461+
462+
// Create consumer (subscribe) should succeed then upon closure, it should reattempt creation.
463+
client.newConsumer().topic(topic).subscriptionName("test").subscribe();
464+
465+
Awaitility.await().until(() -> closeConsumerCounter.get() == 1);
466+
Awaitility.await().until(() -> subscribeFinishCounter.get() == 4);
467+
468+
mockBrokerService.resetHandleSubscribe();
469+
mockBrokerService.resetHandleCloseConsumer();
470+
471+
if (isFix) {
472+
Assert.assertEquals(consumers.size(), 1);
473+
Assert.assertEquals(consumerSet.size(), 1);
474+
Assert.assertEquals(consumerList.size(), 1);
475+
} else {
476+
Assert.assertEquals(consumers.size(), 1);
477+
Assert.assertEquals(consumerSet.size(), 0);
478+
Assert.assertEquals(consumerList.size(), 1);
479+
}
480+
}
481+
296482
@Test
297483
public void testProducerFailDoesNotFailOtherProducer() throws Exception {
298484
producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
@@ -494,6 +680,7 @@ public void testPartitionedSubscribeFailAfterRetryTimeout() throws Exception {
494680
subscribeFailAfterRetryTimeout("persistent://prop/use/ns/part-t1");
495681
}
496682

683+
// 2
497684
private void subscribeFailAfterRetryTimeout(String topic) throws Exception {
498685
@Cleanup
499686
PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())

0 commit comments

Comments
 (0)