Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public ConsumerBusyException(String msg) {
}
}

public static class ConsumerClosedException extends BrokerServiceException {
public ConsumerClosedException(String msg) {
super(msg);
}
}

public static class ProducerBusyException extends BrokerServiceException {
public ProducerBusyException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.ws.rs.NotAuthorizedException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import lombok.Getter;
Expand Down Expand Up @@ -89,9 +90,11 @@
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -1258,8 +1261,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
// epoch using different consumer futures, and only remove the consumer future from the map
// if subscribe failed .
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture =
consumers.putIfAbsent(consumerId, consumerFuture);
consumerFuture.exceptionally((ex) -> {
consumers.remove(consumerId, consumerFuture);
return null;
});
CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, consumerFuture);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
Expand All @@ -1274,7 +1280,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
consumers.remove(consumerId, consumerFuture);
consumerFuture.completeExceptionally(iae);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
return null;
}
Expand Down Expand Up @@ -1396,7 +1402,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (consumer.checkAndApplyTopicMigration()) {
log.info("[{}] Disconnecting consumer {} on migrated subscription on topic {} / {}",
remoteAddress, consumerId, subscriptionName, topicName);
consumers.remove(consumerId, consumerFuture);
consumerFuture.completeExceptionally(
new TopicMigratedException("Topic has been migrated"));
return;
}

Expand All @@ -1423,7 +1430,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
+ " after timeout on client side {}: {}",
remoteAddress, consumer, e.getMessage());
}
consumers.remove(consumerId, consumerFuture);
consumerFuture.completeExceptionally(new ConsumerClosedException(
"Cleared consumer created after timeout on client side"));
}

})
Expand All @@ -1450,7 +1458,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
log.info("consumer client doesn't support topic migration handling {}-{}-{}",
topicName, remoteAddress, consumerId);
}
consumers.remove(consumerId, consumerFuture);
consumerFuture.completeExceptionally(exception);
closeConsumer(consumerId, Optional.empty());
return null;
}
Expand All @@ -1471,21 +1479,20 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
BrokerServiceException.getClientErrorCode(exception.getCause()),
exception.getCause().getMessage());
}
consumers.remove(consumerId, consumerFuture);

return null;

});
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
consumers.remove(consumerId, consumerFuture);
consumerFuture.completeExceptionally(new NotAuthorizedException(msg));
writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
consumers.remove(consumerId, consumerFuture);
consumerFuture.completeExceptionally(ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
Expand Down Expand Up @@ -2229,13 +2236,15 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
// create operation will complete, the new consumer will be discarded.
log.info("[{}] Closed consumer before its creation was completed. consumerId={}",
remoteAddress, consumerId);
consumers.remove(consumerId, consumerFuture);
commandSender.sendSuccessResponse(requestId);
return;
}

if (consumerFuture.isCompletedExceptionally()) {
log.info("[{}] Closed consumer that already failed to be created. consumerId={}",
remoteAddress, consumerId);
consumers.remove(consumerId, consumerFuture);
commandSender.sendSuccessResponse(requestId);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2436,44 +2436,59 @@ public void testSubscribeBookieTimeout() throws Exception {
null /* assignedBrokerServiceUrl */, null /* assignedBrokerServiceUrlTls */);
channel.writeInbound(closeConsumer);

ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(subscribe2);

openTopicFail.get().run();

Object response;

// Close succeeds
response = getResponse();
assertEquals(response.getClass(), CommandSuccess.class);
assertEquals(((CommandSuccess) response).getRequestId(), 2);

// Subscribe fails
response = getResponse();
assertEquals(response.getClass(), CommandError.class);
assertEquals(((CommandError) response).getRequestId(), 3);
openTopicFail.get().run();

// We should not receive response for 1st consumer, since it was cancelled by the close
assertTrue(channel.outboundMessages().isEmpty());
assertTrue(channel.isActive());

Awaitility.await().until(() -> !serverCnx.hasConsumer(1));

ByteBuf subscribe3 = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 4 /* request id */, SubType.Exclusive, 0,
ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 3 /* request id */, SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(subscribe3);
channel.writeInbound(subscribe2);

openTopicSuccess.get().run();

// Subscribe succeeds
response = getResponse();
assertEquals(response.getClass(), CommandSuccess.class);
assertEquals(((CommandSuccess) response).getRequestId(), 4);
assertEquals(((CommandSuccess) response).getRequestId(), 3);

Thread.sleep(100);
channel.finish();
}

// We should not receive response for 1st producer, since it was cancelled by the close
assertTrue(channel.outboundMessages().isEmpty());
assertTrue(channel.isActive());
@Test
public void testCleanupCacheWhenHandleCloseConsumer() throws Exception {
resetChannel();
setChannelConnected();

CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
int consumerId = 1;

// Assumer a consumer has been added to the cache
serverCnx.getConsumers().put(1, consumerFuture);

ByteBuf closeConsumer = Commands.newCloseConsumer(consumerId /* consumer id */, 1 /* request id */,
null /* assignedBrokerServiceUrl */, null /* assignedBrokerServiceUrlTls */);
channel.writeInbound(closeConsumer);

Object response;

// Close succeeds
response = getResponse();
assertEquals(response.getClass(), CommandSuccess.class);
assertEquals(((CommandSuccess) response).getRequestId(), 1);

assertFalse(serverCnx.hasConsumer(consumerId));

channel.finish();
}
Expand Down
Loading