@@ -59,6 +59,21 @@ DECLARE_LOG_OBJECT()
5959using std::chrono::milliseconds;
6060using std::chrono::seconds;
6161
62+ static boost::optional<MessageId> getStartMessageId (const boost::optional<MessageId>& startMessageId,
63+ bool inclusive) {
64+ if (!inclusive || !startMessageId) {
65+ return startMessageId;
66+ }
67+ // The default ledger id and entry id of a chunked message refer the fields of the last chunk. When the
68+ // start message id is inclusive, we need to start from the first chunk.
69+ auto chunkMsgIdImpl =
70+ dynamic_cast <const ChunkMessageIdImpl*>(Commands::getMessageIdImpl (startMessageId.value ()).get ());
71+ if (chunkMsgIdImpl) {
72+ return boost::optional<MessageId>{chunkMsgIdImpl->getChunkedMessageIds ().front ()};
73+ }
74+ return startMessageId;
75+ }
76+
6277ConsumerImpl::ConsumerImpl (const ClientImplPtr client, const std::string& topic,
6378 const std::string& subscriptionName, const ConsumerConfiguration& conf,
6479 bool isPersistent, const ConsumerInterceptorsPtr& interceptors,
@@ -91,7 +106,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
91106 messageListenerRunning_(!conf.isStartPaused()),
92107 negativeAcksTracker_(std::make_shared<NegativeAcksTracker>(client, *this , conf)),
93108 readCompacted_(conf.isReadCompacted()),
94- startMessageId_(startMessageId),
109+ startMessageId_(getStartMessageId( startMessageId, conf.isStartMessageIdInclusive()) ),
95110 maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
96111 autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
97112 expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()),
@@ -469,7 +484,15 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
469484
470485 auto & chunkedMsgCtx = it->second ;
471486 if (it == chunkedMessageCache_.end () || !chunkedMsgCtx.validateChunkId (chunkId)) {
472- if (it == chunkedMessageCache_.end ()) {
487+ auto startMessageId = startMessageId_.get ().value_or (MessageId::earliest ());
488+ if (!config_.isStartMessageIdInclusive () && startMessageId.ledgerId () == messageId.ledgerId () &&
489+ startMessageId.entryId () == messageId.entryId ()) {
490+ // When the start message id is not inclusive, the last chunk of the previous chunked message will
491+ // be delivered, which is expected and we only need to filter it out.
492+ chunkedMessageCache_.remove (uuid);
493+ LOG_INFO (" Filtered the chunked message before the start message id (uuid: "
494+ << uuid << " chunkId: " << chunkId << " , messageId: " << messageId << " )" );
495+ } else if (it == chunkedMessageCache_.end ()) {
473496 LOG_ERROR (" Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId
474497 << " , messageId: " << messageId << " )" );
475498 } else {
0 commit comments