Skip to content

Commit 1721e00

Browse files
Add MessageId::batchSize() and the MessageIdBuilder (#105)
Master issue: #87 ### Motivation To support batch index acknowledgment, we must provide a method to get the batch size of a batched message ID. ### Modifications Instead of adding another overload constructor to `MessageId`, this PR adds a `MessageIdBuilder` class to construct the `MessageId` in a more elegant way. The original constructor is counterintuitive because the partition index is the 1st argument. https://github.com/apache/pulsar-client-cpp/blob/74ef1a01f5c7a4604d251de6d040c433f9bbf56b/include/pulsar/MessageId.h#L47 Therefore, this PR marks it as deprecated and replace all invocations of it with the `MessageIdBuilder` usages. To verify the `MessageId::batchSize()`, the following tests are modified: - `BatchMessageTest.testBatchSizeInBytes`: the batch size is always 2 because of the `batchingMaxAllowedSizeInBytes` config. - `MessageChunkingTest.testEndToEnd`: the batch size field is not set (default: 0) because batching is disabled.
1 parent d396c90 commit 1721e00

23 files changed

+276
-63
lines changed

include/pulsar/MessageId.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ class PULSAR_PUBLIC MessageId {
3737
MessageId();
3838

3939
/**
40+
* @deprecated
41+
*
4042
* Construct the MessageId
4143
*
44+
* NOTE: This API still exists for backward compatibility, use MessageIdBuilder instead.
45+
*
4246
* @param partition the partition number of a topic
4347
* @param ledgerId the ledger id
4448
* @param entryId the entry id
@@ -88,6 +92,7 @@ class PULSAR_PUBLIC MessageId {
8892
int64_t entryId() const;
8993
int32_t batchIndex() const;
9094
int32_t partition() const;
95+
int32_t batchSize() const;
9196

9297
private:
9398
friend class ConsumerImpl;
@@ -102,11 +107,14 @@ class PULSAR_PUBLIC MessageId {
102107
friend class PulsarWrapper;
103108
friend class PulsarFriend;
104109
friend class NegativeAcksTracker;
110+
friend class MessageIdBuilder;
105111

106112
friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const MessageId& messageId);
107113

108114
typedef std::shared_ptr<MessageIdImpl> MessageIdImplPtr;
109115
MessageIdImplPtr impl_;
116+
117+
explicit MessageId(const MessageIdImplPtr& impl);
110118
};
111119

112120
typedef std::vector<MessageId> MessageIdList;

include/pulsar/MessageIdBuilder.h

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <pulsar/MessageId.h>
22+
23+
#include <memory>
24+
25+
namespace pulsar {
26+
27+
namespace proto {
28+
class MessageIdData;
29+
}
30+
31+
/**
32+
* The builder to build a MessageId.
33+
*
34+
* Example of building a single MessageId:
35+
*
36+
* ```c++
37+
* MessageId msgId = MessageIdBuilder()
38+
* .ledgerId(0L)
39+
* .entryId(0L)
40+
* .build();
41+
* ```
42+
*
43+
* Example of building a batched MessageId:
44+
*
45+
* ```c++
46+
* MessageId msgId = MessageIdBuilder()
47+
* .ledgerId(0L)
48+
* .entryId(0L)
49+
* .batchIndex(0)
50+
* .batchSize(2)
51+
* .build();
52+
* ```
53+
*/
54+
class PULSAR_PUBLIC MessageIdBuilder {
55+
public:
56+
explicit MessageIdBuilder();
57+
58+
/**
59+
* Create an instance that copies the data from messageId.
60+
*/
61+
static MessageIdBuilder from(const MessageId& messageId);
62+
63+
/**
64+
* Create an instance from the proto::MessageIdData instance.
65+
*
66+
* @note It's an internal API that converts the MessageIdData defined by PulsarApi.proto
67+
* @see https://github.com/apache/pulsar-client-cpp/blob/main/proto/PulsarApi.proto
68+
*/
69+
static MessageIdBuilder from(const proto::MessageIdData& messageIdData);
70+
71+
/**
72+
* Build a MessageId.
73+
*/
74+
MessageId build() const;
75+
76+
/**
77+
* Set the ledger ID field.
78+
*
79+
* Default: -1L
80+
*/
81+
MessageIdBuilder& ledgerId(int64_t ledgerId);
82+
83+
/**
84+
* Set the entry ID field.
85+
*
86+
* Default: -1L
87+
*/
88+
MessageIdBuilder& entryId(int64_t entryId);
89+
90+
/**
91+
* Set the partition index.
92+
*
93+
* Default: -1
94+
*/
95+
MessageIdBuilder& partition(int32_t partition);
96+
97+
/**
98+
* Set the batch index.
99+
*
100+
* Default: -1
101+
*/
102+
MessageIdBuilder& batchIndex(int32_t batchIndex);
103+
104+
/**
105+
* Set the batch size.
106+
*
107+
* Default: 0
108+
*/
109+
MessageIdBuilder& batchSize(int32_t batchSize);
110+
111+
private:
112+
std::shared_ptr<MessageIdImpl> impl_;
113+
};
114+
115+
} // namespace pulsar

lib/BatchAcknowledgementTracker.cc

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "BatchAcknowledgementTracker.h"
2020

2121
#include "LogUtils.h"
22+
#include "MessageIdUtil.h"
2223
#include "MessageImpl.h"
2324

2425
namespace pulsar {
@@ -71,8 +72,7 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
7172
return;
7273
}
7374

74-
MessageId batchMessageId =
75-
MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */);
75+
auto batchMessageId = discardBatch(messageId);
7676

7777
Lock lock(mutex_);
7878
if (ackType == CommandAck_AckType_Cumulative) {
@@ -114,9 +114,7 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId,
114114

115115
bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID, CommandAck_AckType ackType) {
116116
Lock lock(mutex_);
117-
// Remove batch index
118-
MessageId batchMessageId =
119-
MessageId(msgID.partition(), msgID.ledgerId(), msgID.entryId(), -1 /* Batch index */);
117+
auto batchMessageId = discardBatch(msgID);
120118

121119
TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
122120
if (pos == trackerMap_.end() ||
@@ -154,8 +152,7 @@ const MessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const
154152
Lock lock(mutex_);
155153

156154
// Remove batch index
157-
MessageId batchMessageId =
158-
MessageId(messageId.partition(), messageId.ledgerId(), messageId.entryId(), -1 /* Batch index */);
155+
auto batchMessageId = discardBatch(messageId);
159156
TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
160157

161158
// element not found

lib/ClientConnection.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
#include "ClientConnection.h"
2020

21+
#include <pulsar/MessageIdBuilder.h>
22+
2123
#include <fstream>
2224

2325
#include "Commands.h"
@@ -43,8 +45,7 @@ static const uint32_t DefaultBufferSize = 64 * 1024;
4345
static const int KeepAliveIntervalInSeconds = 30;
4446

4547
static MessageId toMessageId(const proto::MessageIdData& messageIdData) {
46-
return MessageId{messageIdData.partition(), static_cast<int64_t>(messageIdData.ledgerid()),
47-
static_cast<int64_t>(messageIdData.entryid()), messageIdData.batch_index()};
48+
return MessageIdBuilder::from(messageIdData).build();
4849
}
4950

5051
// Convert error codes from protobuf to client API Result
@@ -830,8 +831,7 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
830831
int producerId = sendReceipt.producer_id();
831832
uint64_t sequenceId = sendReceipt.sequence_id();
832833
const proto::MessageIdData& messageIdData = sendReceipt.message_id();
833-
MessageId messageId = MessageId(messageIdData.partition(), messageIdData.ledgerid(),
834-
messageIdData.entryid(), messageIdData.batch_index());
834+
auto messageId = toMessageId(messageIdData);
835835

836836
LOG_DEBUG(cnxString_ << "Got receipt for producer: " << producerId
837837
<< " -- msg: " << sequenceId << "-- message id: " << messageId);

lib/Commands.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "Commands.h"
2020

2121
#include <pulsar/MessageBuilder.h>
22+
#include <pulsar/MessageIdBuilder.h>
2223
#include <pulsar/Schema.h>
2324
#include <pulsar/Version.h>
2425

@@ -807,7 +808,8 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg,
807808
return msgMetadata.sequence_id();
808809
}
809810

810-
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) {
811+
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
812+
int32_t batchSize) {
811813
SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;
812814

813815
// Format of batch message
@@ -825,7 +827,7 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32
825827
uncompressedPayload.consume(payloadSize);
826828

827829
const MessageId& m = batchedMessage.impl_->messageId;
828-
MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), batchIndex);
830+
auto singleMessageId = MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
829831
Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata,
830832
batchedMessage.impl_->getTopicName());
831833
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;

lib/Commands.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ class Commands {
132132
static PULSAR_PUBLIC uint64_t serializeSingleMessageInBatchWithPayload(
133133
const Message& msg, SharedBuffer& batchPayLoad, unsigned long maxMessageSizeInBytes);
134134

135-
static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex);
135+
static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
136+
int32_t batchSize);
136137

137138
static SharedBuffer newConsumerStats(uint64_t consumerId, uint64_t requestId);
138139

lib/ConsumerImpl.cc

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
#include "ConsumerImpl.h"
2020

21+
#include <pulsar/MessageIdBuilder.h>
22+
2123
#include <algorithm>
2224

2325
#include "AckGroupingTracker.h"
@@ -471,8 +473,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
471473
// Only a non-batched messages can be a chunk
472474
if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
473475
const auto& messageIdData = msg.message_id();
474-
MessageId messageId(messageIdData.partition(), messageIdData.ledgerid(), messageIdData.entryid(),
475-
messageIdData.batch_index());
476+
auto messageId = MessageIdBuilder::from(messageIdData).build();
476477
auto optionalPayload = processMessageChunk(payload, metadata, messageId, messageIdData, cnx);
477478
if (optionalPayload.is_present()) {
478479
payload = optionalPayload.value();
@@ -629,7 +630,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
629630

630631
for (int i = 0; i < batchSize; i++) {
631632
// This is a cheap copy since message contains only one shared pointer (impl_)
632-
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
633+
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize);
633634
msg.impl_->setRedeliveryCount(redeliveryCount);
634635
msg.impl_->setTopicName(batchedMessage.getTopicName());
635636
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
@@ -929,13 +930,17 @@ Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
929930
if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
930931
// There was at least one message pending in the queue
931932
const MessageId& nextMessageId = nextMessageInQueue.getMessageId();
932-
MessageId previousMessageId;
933-
if (nextMessageId.batchIndex() >= 0) {
934-
previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId(),
935-
nextMessageId.batchIndex() - 1);
936-
} else {
937-
previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId() - 1, -1);
938-
}
933+
auto previousMessageId = (nextMessageId.batchIndex() >= 0)
934+
? MessageIdBuilder()
935+
.ledgerId(nextMessageId.ledgerId())
936+
.entryId(nextMessageId.entryId())
937+
.batchIndex(nextMessageId.batchIndex() - 1)
938+
.batchSize(nextMessageId.batchSize())
939+
.build()
940+
: MessageIdBuilder()
941+
.ledgerId(nextMessageId.ledgerId())
942+
.entryId(nextMessageId.entryId() - 1)
943+
.build();
939944
return Optional<MessageId>::of(previousMessageId);
940945
} else if (lastDequedMessageId_ != MessageId::earliest()) {
941946
// If the queue was empty we need to restart from the message just after the last one that has been

lib/Message.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
#include <pulsar/Message.h>
2020
#include <pulsar/MessageBuilder.h>
21+
#include <pulsar/MessageIdBuilder.h>
2122
#include <pulsar/defines.h>
2223

2324
#include <iostream>
@@ -71,9 +72,7 @@ Message::Message(MessageImplPtr& impl) : impl_(impl) {}
7172
Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload,
7273
int32_t partition)
7374
: impl_(std::make_shared<MessageImpl>()) {
74-
impl_->messageId =
75-
MessageId(partition, msg.message_id().ledgerid(), msg.message_id().entryid(), /* batchId */
76-
-1);
75+
impl_->messageId = MessageIdBuilder::from(msg.message_id()).batchIndex(-1).build();
7776
impl_->metadata = metadata;
7877
impl_->payload = payload;
7978
}

lib/MessageAndCallbackBatch.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
#include "MessageAndCallbackBatch.h"
2020

21+
#include <pulsar/MessageIdBuilder.h>
22+
2123
#include "ClientConnection.h"
2224
#include "Commands.h"
2325
#include "LogUtils.h"
@@ -54,8 +56,7 @@ static void completeSendCallbacks(const std::vector<SendCallback>& callbacks, Re
5456
int32_t numOfMessages = static_cast<int32_t>(callbacks.size());
5557
LOG_DEBUG("Batch complete [Result = " << result << "] [numOfMessages = " << numOfMessages << "]");
5658
for (int32_t i = 0; i < numOfMessages; i++) {
57-
MessageId idInBatch(id.partition(), id.ledgerId(), id.entryId(), i);
58-
callbacks[i](result, idInBatch);
59+
callbacks[i](result, MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build());
5960
}
6061
}
6162

lib/MessageBatch.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ MessageBatch& MessageBatch::parseFrom(const SharedBuffer& payload, uint32_t batc
4747
batch_.clear();
4848

4949
for (int i = 0; i < batchSize; ++i) {
50-
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i));
50+
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i, batchSize));
5151
}
5252
return *this;
5353
}

0 commit comments

Comments
 (0)