Skip to content

Commit f4a3881

Browse files
committed
Null-proof message ignored listener
The message ignored listener in the consumer coordinator calls the processed method on ignored messages to "simulate" the processing (e.g. increment a counter to provide a credit in the middle of the chunk). The MessageProcessedCallback is the chunk context in this case, which can be null, depending on the consumer flow strategy. The listener could trigger a null-pointer exception by calling the processed method on the context. This commit adds a null check to avoid the null-pointer exception. A null chunk context (MessageProcessedCallback) means the custom behavior of the consumer flow strategy is performed when the chunk starts, so the call to MessageHandler.Context#processed() is not necessary.
1 parent 66d7aee commit f4a3881

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -717,16 +717,18 @@ private ClientSubscriptionsManager(
717717
subscriptionTrackers.get(subscriptionId & 0xFF);
718718
if (subscriptionTracker != null) {
719719
// message at the beginning of the first chunk is ignored
720-
// we "simulate" the processing
721-
MessageHandlerContext messageHandlerContext =
722-
new MessageHandlerContext(
723-
offset,
724-
chunkTimestamp,
725-
committedChunkId,
726-
subscriptionTracker.consumer,
727-
(ConsumerFlowStrategy.MessageProcessedCallback) chunkContext);
728-
((ConsumerFlowStrategy.MessageProcessedCallback) chunkContext)
729-
.processed(messageHandlerContext);
720+
// we "simulate" the processing if possible
721+
if (chunkContext != null) {
722+
MessageHandlerContext messageHandlerContext =
723+
new MessageHandlerContext(
724+
offset,
725+
chunkTimestamp,
726+
committedChunkId,
727+
subscriptionTracker.consumer,
728+
(ConsumerFlowStrategy.MessageProcessedCallback) chunkContext);
729+
((ConsumerFlowStrategy.MessageProcessedCallback) chunkContext)
730+
.processed(messageHandlerContext);
731+
}
730732
} else {
731733
LOGGER.debug(
732734
"Could not find stream subscription {} in manager {}, node {} for message ignored listener",

0 commit comments

Comments
 (0)