Skip to content

Commit fbebced

Browse files
author
fanjianye
committed
add test for concurrently created consumer
1 parent 4085518 commit fbebced

File tree

1 file changed

+184
-0
lines changed

1 file changed

+184
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,13 @@
2222
import static org.testng.Assert.assertFalse;
2323
import static org.testng.Assert.assertTrue;
2424
import static org.testng.Assert.fail;
25+
import com.carrotsearch.hppc.ObjectHashSet;
26+
import com.carrotsearch.hppc.ObjectSet;
2527
import io.netty.channel.ChannelHandlerContext;
28+
import java.net.SocketAddress;
2629
import java.util.Map;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.CopyOnWriteArrayList;
2732
import java.util.concurrent.CountDownLatch;
2833
import java.util.concurrent.TimeUnit;
2934
import java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +43,10 @@
3843
import org.apache.pulsar.common.api.proto.ServerError;
3944
import org.apache.pulsar.common.protocol.Commands;
4045
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
46+
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
4147
import org.awaitility.Awaitility;
48+
import org.mockito.Mockito;
49+
import org.testng.Assert;
4250
import org.testng.annotations.AfterClass;
4351
import org.testng.annotations.BeforeClass;
4452
import org.testng.annotations.Test;
@@ -293,6 +301,182 @@ private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception
293301
mockBrokerService.resetHandleCloseConsumer();
294302
}
295303

304+
@Test
305+
public void testConcurrentCreatedConsumerAndRetryAfterFix() throws Exception {
306+
concurrentConsumerCreatedAndRetry("persistent://prop/use/ns/t1", true);
307+
}
308+
309+
@Test
310+
public void testConcurrentCreatedConsumerAndRetryBeforeFix() throws Exception {
311+
concurrentConsumerCreatedAndRetry("persistent://prop/use/ns/t1", false);
312+
}
313+
314+
private void concurrentConsumerCreatedAndRetry(String topic, boolean isFix) throws Exception {
315+
316+
@Cleanup
317+
PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
318+
.operationTimeout(3, TimeUnit.SECONDS).build();
319+
final AtomicInteger subscribeCounter = new AtomicInteger(0);
320+
final AtomicInteger closeConsumerCounter = new AtomicInteger(0);
321+
final AtomicInteger subscribeFinishCounter = new AtomicInteger(0);
322+
323+
AtomicBoolean canExecute = new AtomicBoolean(false);
324+
325+
CopyOnWriteArrayList<org.apache.pulsar.broker.service.Consumer> consumerList = new CopyOnWriteArrayList<>();
326+
ObjectSet<org.apache.pulsar.broker.service.Consumer> consumerSet = new ObjectHashSet<>();
327+
ConcurrentLongHashMap<CompletableFuture<org.apache.pulsar.broker.service.Consumer>> consumers =
328+
ConcurrentLongHashMap.<CompletableFuture<org.apache.pulsar.broker.service.Consumer>>newBuilder()
329+
.expectedItems(8)
330+
.concurrencyLevel(1)
331+
.build();
332+
org.apache.pulsar.broker.service.Consumer mockConsumer =
333+
Mockito.mock(org.apache.pulsar.broker.service.Consumer.class);
334+
335+
AtomicReference<Long> id = new AtomicReference<>();
336+
AtomicReference<SocketAddress> address = new AtomicReference<>();
337+
338+
// subscribe 1: Success. Then broker restart, trigger client reconnect
339+
// subscribe 2: Broker not fully success, but client timeout then trigger handleClose and reconnect again
340+
// subscribe 3: Remove the failed existingConsumerFuture, then client reconnect again
341+
// subscribe 4: Execute concurrently with subscribe 2
342+
mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
343+
final long requestId = subscribe.getRequestId();
344+
final long consumerId = subscribe.getConsumerId();
345+
SocketAddress remoteAddress = ctx.channel().remoteAddress();
346+
int subscribeCount = subscribeCounter.incrementAndGet();
347+
348+
// ensure consumerId and clientAddress is the same during 4 handleSubscribe requests
349+
// so that add consumer is always the same consumer
350+
if (id.get() == null && address.get() == null) {
351+
id.set(consumerId);
352+
address.set(remoteAddress);
353+
} else {
354+
Assert.assertEquals(id.get(), consumerId);
355+
Assert.assertEquals(remoteAddress, address.get());
356+
}
357+
358+
CompletableFuture<org.apache.pulsar.broker.service.Consumer> consumerFuture = new CompletableFuture<>();
359+
CompletableFuture<org.apache.pulsar.broker.service.Consumer> existingConsumerFuture =
360+
consumers.putIfAbsent(consumerId, consumerFuture);
361+
362+
if (existingConsumerFuture != null) {
363+
if (!existingConsumerFuture.isDone()) {
364+
ctx.writeAndFlush(Commands.newError(requestId, ServerError.ServiceNotReady,
365+
"Consumer is already present on the connection"));
366+
} else if (existingConsumerFuture.isCompletedExceptionally()){
367+
ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
368+
"Consumer that failed is already present on the connection"));
369+
// the fix of pr-20583
370+
if (!isFix) {
371+
consumers.remove(consumerId, existingConsumerFuture);
372+
}
373+
// subscribe 3 finish
374+
} else {
375+
ctx.writeAndFlush(Commands.newSuccess(requestId));
376+
}
377+
subscribeFinishCounter.incrementAndGet();
378+
return;
379+
}
380+
381+
// simulate add consumer
382+
// must use asyncFuture to simulate, otherwise requests can not be concurrent
383+
CompletableFuture.supplyAsync(() -> {
384+
org.apache.pulsar.broker.service.Consumer consumer = mockConsumer;
385+
consumerSet.add(consumer);
386+
consumerList.add(consumer);
387+
return consumer;
388+
}).thenAccept(consumer -> {
389+
// block second subscribe, and ensure client timeout
390+
// make the subscribe 2 and 4 execute concurrently in broker
391+
if (subscribeCount == 2) {
392+
while (!canExecute.get()) {
393+
// wait until subscribe 4 finish add consumer
394+
}
395+
}
396+
397+
if (subscribeCount == 4) {
398+
canExecute.set(true);
399+
}
400+
401+
if (consumerFuture.complete(consumer)) {
402+
ctx.writeAndFlush(Commands.newSuccess(requestId));
403+
404+
if (subscribeCount == 1) {
405+
// simulate broker restart and trigger client reconnect
406+
if (consumerSet.removeAll(consumer) == 1) {
407+
consumerList.remove(consumer);
408+
}
409+
consumers.remove(consumerId, consumerFuture);
410+
ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1));
411+
// subscribe 1 finish
412+
} else {
413+
// subscribe 4 finish
414+
}
415+
} else {
416+
// cleared consumer created after timeout on client side
417+
// simulate close consumer
418+
if (consumerSet.removeAll(consumer) == 1) {
419+
consumerList.remove(consumer);
420+
}
421+
consumers.remove(consumerId, consumerFuture);
422+
// subscribe 2 finish
423+
}
424+
});
425+
subscribeFinishCounter.incrementAndGet();
426+
});
427+
428+
mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
429+
long requestId = closeConsumer.getRequestId();
430+
long consumerId = closeConsumer.getConsumerId();
431+
closeConsumerCounter.incrementAndGet();
432+
433+
CompletableFuture<org.apache.pulsar.broker.service.Consumer> consumerFuture = consumers.get(consumerId);
434+
if (consumerFuture == null) {
435+
ctx.writeAndFlush(Commands.newSuccess(requestId));
436+
return;
437+
}
438+
439+
if (!consumerFuture.isDone() && consumerFuture
440+
.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
441+
ctx.writeAndFlush(Commands.newSuccess(requestId));
442+
return;
443+
}
444+
445+
if (consumerFuture.isCompletedExceptionally()) {
446+
ctx.writeAndFlush(Commands.newSuccess(requestId));
447+
return;
448+
}
449+
450+
Assert.fail("should not go here");
451+
// simulate close consumer
452+
org.apache.pulsar.broker.service.Consumer consumer = consumerFuture.getNow(null);
453+
if (consumerSet.removeAll(consumer) == 1) {
454+
consumerList.remove(consumer);
455+
}
456+
consumers.remove(consumerId, consumerFuture);
457+
ctx.writeAndFlush(Commands.newSuccess(requestId));
458+
});
459+
460+
// Create consumer (subscribe) should succeed then upon closure, it should reattempt creation.
461+
client.newConsumer().topic(topic).subscriptionName("test").subscribe();
462+
463+
Awaitility.await().until(() -> closeConsumerCounter.get() == 1);
464+
Awaitility.await().until(() -> subscribeFinishCounter.get() == 4);
465+
466+
mockBrokerService.resetHandleSubscribe();
467+
mockBrokerService.resetHandleCloseConsumer();
468+
469+
if (isFix) {
470+
Assert.assertEquals(consumers.size(), 1);
471+
Assert.assertEquals(consumerSet.size(), 1);
472+
Assert.assertEquals(consumerList.size(), 1);
473+
} else {
474+
Assert.assertEquals(consumers.size(), 1);
475+
Assert.assertEquals(consumerSet.size(), 0);
476+
Assert.assertEquals(consumerList.size(), 1);
477+
}
478+
}
479+
296480
@Test
297481
public void testProducerFailDoesNotFailOtherProducer() throws Exception {
298482
producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");

0 commit comments

Comments
 (0)