Skip to content

Commit c726ab7

Browse files
committed
[fix][broker] Remove consumer from cache if closed before creation
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 19b7c27 commit c726ab7

File tree

2 files changed

+10
-22
lines changed

2 files changed

+10
-22
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2229,13 +2229,15 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
22292229
// create operation will complete, the new consumer will be discarded.
22302230
log.info("[{}] Closed consumer before its creation was completed. consumerId={}",
22312231
remoteAddress, consumerId);
2232+
consumers.remove(consumerId, consumerFuture);
22322233
commandSender.sendSuccessResponse(requestId);
22332234
return;
22342235
}
22352236

22362237
if (consumerFuture.isCompletedExceptionally()) {
22372238
log.info("[{}] Closed consumer that already failed to be created. consumerId={}",
22382239
remoteAddress, consumerId);
2240+
consumers.remove(consumerId, consumerFuture);
22392241
commandSender.sendSuccessResponse(requestId);
22402242
return;
22412243
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2436,44 +2436,30 @@ public void testSubscribeBookieTimeout() throws Exception {
24362436
null /* assignedBrokerServiceUrl */, null /* assignedBrokerServiceUrlTls */);
24372437
channel.writeInbound(closeConsumer);
24382438

2439-
ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, //
2440-
successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0,
2441-
"test" /* consumer name */, 0 /* avoid reseting cursor */);
2442-
channel.writeInbound(subscribe2);
2443-
2444-
openTopicFail.get().run();
2445-
24462439
Object response;
24472440

24482441
// Close succeeds
24492442
response = getResponse();
24502443
assertEquals(response.getClass(), CommandSuccess.class);
24512444
assertEquals(((CommandSuccess) response).getRequestId(), 2);
24522445

2453-
// Subscribe fails
2454-
response = getResponse();
2455-
assertEquals(response.getClass(), CommandError.class);
2456-
assertEquals(((CommandError) response).getRequestId(), 3);
2446+
// We should not receive response for 1st consumer, since it was cancelled by the close
2447+
openTopicFail.get().run();
24572448

2458-
Awaitility.await().until(() -> !serverCnx.hasConsumer(1));
2449+
assertTrue(channel.outboundMessages().isEmpty());
2450+
assertFalse(serverCnx.hasConsumer(1));
24592451

2460-
ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, //
2461-
successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, 0,
2452+
ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, //
2453+
successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0,
24622454
"test" /* consumer name */, 0 /* avoid reseting cursor */);
2463-
channel.writeInbound(subscribe3);
2455+
channel.writeInbound(subscribe2);
24642456

24652457
openTopicSuccess.get().run();
24662458

24672459
// Subscribe succeeds
24682460
response = getResponse();
24692461
assertEquals(response.getClass(), CommandSuccess.class);
2470-
assertEquals(((CommandSuccess) response).getRequestId(), 4);
2471-
2472-
Thread.sleep(100);
2473-
2474-
// We should not receive response for 1st producer, since it was cancelled by the close
2475-
assertTrue(channel.outboundMessages().isEmpty());
2476-
assertTrue(channel.isActive());
2462+
assertEquals(((CommandSuccess) response).getRequestId(), 3);
24772463

24782464
channel.finish();
24792465
}

0 commit comments

Comments
 (0)