Skip to content

Commit 6ebea5c

Browse files
Avoid calling serializeSingleMessageInBatchWithPayload each time a message is added (apache#309)
* Avoid calling serializeSingleMessageInBatchWithPayload each time a message is added ### Motivation Currently, each time a message is added to the batch message container, `serializeSingleMessageInBatchWithPayload` will be called. In this method, if the payload buffer's size is not enough, it will grow twice. After batch is cleared, the payload buffer will be reset. For example, here is a typical buffer size increament during a period of a batch: ``` increase buffer size from 0 to 1033 increase buffer size from 1033 to 2066 increase buffer size from 2066 to 4132 increase buffer size from 3099 to 6198 increase buffer size from 5165 to 10330 increase buffer size from 9297 to 18594 increase buffer size from 17561 to 35122 increase buffer size from 34089 to 68178 increase buffer size from 67145 to 134290 ``` ### Modifications Refactor the `MessageAndCallbackBatch` design, in `add` method, only store the message and callback. Provide a `createOpSendMsg` method to serialize the messages and callbacks into a `OpSendMsg`.
1 parent cac5e1d commit 6ebea5c

10 files changed

+141
-150
lines changed

lib/BatchMessageContainer.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ void BatchMessageContainer::clear() {
5454
}
5555

5656
std::unique_ptr<OpSendMsg> BatchMessageContainer::createOpSendMsg(const FlushCallback& flushCallback) {
57-
auto op = createOpSendMsgHelper(flushCallback, batch_);
57+
auto op = createOpSendMsgHelper(batch_);
58+
if (flushCallback) {
59+
op->addTrackerCallback(flushCallback);
60+
}
5861
clear();
5962
return op;
6063
}

lib/BatchMessageContainerBase.cc

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,10 @@
1818
*/
1919
#include "BatchMessageContainerBase.h"
2020

21-
#include "ClientConnection.h"
22-
#include "CompressionCodec.h"
2321
#include "MessageAndCallbackBatch.h"
2422
#include "MessageCrypto.h"
25-
#include "MessageImpl.h"
2623
#include "OpSendMsg.h"
2724
#include "ProducerImpl.h"
28-
#include "PulsarApi.pb.h"
2925
#include "SharedBuffer.h"
3026

3127
namespace pulsar {
@@ -40,38 +36,9 @@ BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& produce
4036
BatchMessageContainerBase::~BatchMessageContainerBase() {}
4137

4238
std::unique_ptr<OpSendMsg> BatchMessageContainerBase::createOpSendMsgHelper(
43-
const FlushCallback& flushCallback, const MessageAndCallbackBatch& batch) const {
44-
auto sendCallback = batch.createSendCallback(flushCallback);
45-
if (batch.empty()) {
46-
return OpSendMsg::create(ResultOperationNotSupported, std::move(sendCallback));
47-
}
48-
49-
MessageImplPtr impl = batch.msgImpl();
50-
impl->metadata.set_num_messages_in_batch(batch.size());
51-
auto compressionType = producerConfig_.getCompressionType();
52-
if (compressionType != CompressionNone) {
53-
impl->metadata.set_compression(static_cast<proto::CompressionType>(compressionType));
54-
impl->metadata.set_uncompressed_size(impl->payload.readableBytes());
55-
}
56-
impl->payload = CompressionCodecProvider::getCodec(compressionType).encode(impl->payload);
57-
58-
auto msgCrypto = msgCryptoWeakPtr_.lock();
59-
if (msgCrypto && producerConfig_.isEncryptionEnabled()) {
60-
SharedBuffer encryptedPayload;
61-
if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), producerConfig_.getCryptoKeyReader(),
62-
impl->metadata, impl->payload, encryptedPayload)) {
63-
return OpSendMsg::create(ResultCryptoError, std::move(sendCallback));
64-
}
65-
impl->payload = encryptedPayload;
66-
}
67-
68-
if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) {
69-
return OpSendMsg::create(ResultMessageTooBig, std::move(sendCallback));
70-
}
71-
72-
return OpSendMsg::create(impl->metadata, batch.messagesCount(), batch.messagesSize(),
73-
producerConfig_.getSendTimeout(), batch.createSendCallback(flushCallback),
74-
nullptr, producerId_, impl->payload);
39+
MessageAndCallbackBatch& batch) const {
40+
auto crypto = msgCryptoWeakPtr_.lock();
41+
return batch.createOpSendMsg(producerId_, producerConfig_, crypto.get());
7542
}
7643

7744
} // namespace pulsar

lib/BatchMessageContainerBase.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ class BatchMessageContainerBase : public boost::noncopyable {
109109
void updateStats(const Message& msg);
110110
void resetStats();
111111

112-
std::unique_ptr<OpSendMsg> createOpSendMsgHelper(const FlushCallback& flushCallback,
113-
const MessageAndCallbackBatch& batch) const;
112+
std::unique_ptr<OpSendMsg> createOpSendMsgHelper(MessageAndCallbackBatch& flushCallback) const;
114113

115114
virtual void clear() = 0;
116115
};

lib/BatchMessageKeyBasedContainer.cc

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -74,22 +74,19 @@ void BatchMessageKeyBasedContainer::clear() {
7474

7575
std::vector<std::unique_ptr<OpSendMsg>> BatchMessageKeyBasedContainer::createOpSendMsgs(
7676
const FlushCallback& flushCallback) {
77-
// Sorted the batches by sequence id
78-
std::vector<const MessageAndCallbackBatch*> sortedBatches;
79-
for (const auto& kv : batches_) {
80-
sortedBatches.emplace_back(&kv.second);
77+
// Store raw pointers to use std::sort
78+
std::vector<OpSendMsg*> rawOpSendMsgs;
79+
for (auto& kv : batches_) {
80+
rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
8181
}
82-
std::sort(sortedBatches.begin(), sortedBatches.end(),
83-
[](const MessageAndCallbackBatch* lhs, const MessageAndCallbackBatch* rhs) {
84-
return lhs->sequenceId() < rhs->sequenceId();
85-
});
82+
std::sort(rawOpSendMsgs.begin(), rawOpSendMsgs.end(), [](const OpSendMsg* lhs, const OpSendMsg* rhs) {
83+
return lhs->sendArgs->sequenceId < rhs->sendArgs->sequenceId;
84+
});
85+
rawOpSendMsgs.back()->addTrackerCallback(flushCallback);
8686

87-
std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{sortedBatches.size()};
88-
for (size_t i = 0; i + 1 < opSendMsgs.size(); i++) {
89-
opSendMsgs[i].reset(createOpSendMsgHelper(nullptr, *sortedBatches[i]).release());
90-
}
91-
if (!opSendMsgs.empty()) {
92-
opSendMsgs.back().reset(createOpSendMsgHelper(flushCallback, *sortedBatches.back()).release());
87+
std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{rawOpSendMsgs.size()};
88+
for (size_t i = 0; i < opSendMsgs.size(); i++) {
89+
opSendMsgs[i].reset(rawOpSendMsgs[i]);
9390
}
9491
clear();
9592
return opSendMsgs;

lib/Commands.cc

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
#include "BatchedMessageIdImpl.h"
3131
#include "BitSet.h"
3232
#include "ChunkMessageIdImpl.h"
33-
#include "LogUtils.h"
3433
#include "MessageImpl.h"
3534
#include "OpSendMsg.h"
3635
#include "PulsarApi.pb.h"
@@ -40,8 +39,6 @@
4039
using namespace pulsar;
4140
namespace pulsar {
4241

43-
DECLARE_LOG_OBJECT();
44-
4542
using proto::AuthData;
4643
using proto::BaseCommand;
4744
using proto::CommandAck;
@@ -836,10 +833,14 @@ void Commands::initBatchMessageMetadata(const Message& msg, pulsar::proto::Messa
836833
}
837834
}
838835

839-
uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad,
840-
unsigned long maxMessageSizeInBytes) {
841-
const auto& msgMetadata = msg.impl_->metadata;
842-
SingleMessageMetadata metadata;
836+
// The overhead of constructing and destructing a SingleMessageMetadata is higher than allocating and
837+
// deallocating memory for a byte array, so use a thread local SingleMessageMetadata and serialize it to a
838+
// byte array.
839+
static std::pair<std::unique_ptr<char[]>, size_t> serializeSingleMessageMetadata(
840+
const proto::MessageMetadata& msgMetadata, size_t payloadSize) {
841+
thread_local SingleMessageMetadata metadata;
842+
metadata.Clear();
843+
metadata.set_payload_size(payloadSize);
843844
if (msgMetadata.has_partition_key()) {
844845
metadata.set_partition_key(msgMetadata.partition_key());
845846
}
@@ -862,34 +863,38 @@ uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message& msg,
862863
metadata.set_sequence_id(msgMetadata.sequence_id());
863864
}
864865

866+
size_t size = metadata.ByteSizeLong();
867+
std::unique_ptr<char[]> data{new char[size]};
868+
metadata.SerializeToArray(data.get(), size);
869+
return std::make_pair(std::move(data), size);
870+
}
871+
872+
uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayload,
873+
const std::vector<Message>& messages) {
874+
assert(!messages.empty());
875+
size_t size = sizeof(uint32_t) * messages.size();
876+
877+
std::vector<std::pair<std::unique_ptr<char[]>, size_t>> singleMetadataBuffers(messages.size());
878+
for (size_t i = 0; i < messages.size(); i++) {
879+
const auto& impl = messages[i].impl_;
880+
singleMetadataBuffers[i] =
881+
serializeSingleMessageMetadata(impl->metadata, impl->payload.readableBytes());
882+
size += singleMetadataBuffers[i].second;
883+
size += messages[i].getLength();
884+
}
885+
865886
// Format of batch message
866887
// Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]
888+
batchPayload = SharedBuffer::allocate(size);
889+
for (size_t i = 0; i < messages.size(); i++) {
890+
auto msgMetadataSize = singleMetadataBuffers[i].second;
891+
batchPayload.writeUnsignedInt(msgMetadataSize);
892+
batchPayload.write(singleMetadataBuffers[i].first.get(), msgMetadataSize);
893+
const auto& payload = messages[i].impl_->payload;
894+
batchPayload.write(payload.data(), payload.readableBytes());
895+
}
867896

868-
int payloadSize = msg.impl_->payload.readableBytes();
869-
metadata.set_payload_size(payloadSize);
870-
871-
int msgMetadataSize = metadata.ByteSize();
872-
873-
unsigned long requiredSpace = sizeof(uint32_t) + msgMetadataSize + payloadSize;
874-
if (batchPayLoad.writableBytes() <= sizeof(uint32_t) + msgMetadataSize + payloadSize) {
875-
LOG_DEBUG("remaining size of batchPayLoad buffer ["
876-
<< batchPayLoad.writableBytes() << "] can't accomodate new payload [" << requiredSpace
877-
<< "] - expanding the batchPayload buffer");
878-
uint32_t new_size =
879-
std::min(batchPayLoad.readableBytes() * 2, static_cast<uint32_t>(maxMessageSizeInBytes));
880-
new_size = std::max(new_size, batchPayLoad.readableBytes() + static_cast<uint32_t>(requiredSpace));
881-
SharedBuffer buffer = SharedBuffer::allocate(new_size);
882-
// Adding batch created so far
883-
buffer.write(batchPayLoad.data(), batchPayLoad.readableBytes());
884-
batchPayLoad = buffer;
885-
}
886-
// Adding the new message
887-
batchPayLoad.writeUnsignedInt(msgMetadataSize);
888-
metadata.SerializeToArray(batchPayLoad.mutableData(), msgMetadataSize);
889-
batchPayLoad.bytesWritten(msgMetadataSize);
890-
batchPayLoad.write(msg.impl_->payload.data(), payloadSize);
891-
892-
return msgMetadata.sequence_id();
897+
return messages.back().impl_->metadata.sequence_id();
893898
}
894899

895900
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,

lib/Commands.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ class Commands {
148148

149149
static void initBatchMessageMetadata(const Message& msg, pulsar::proto::MessageMetadata& batchMetadata);
150150

151-
static PULSAR_PUBLIC uint64_t serializeSingleMessageInBatchWithPayload(
152-
const Message& msg, SharedBuffer& batchPayLoad, unsigned long maxMessageSizeInBytes);
151+
static uint64_t serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayload,
152+
const std::vector<Message>& messages);
153153

154154
static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
155155
int32_t batchSize, const BatchMessageAckerPtr& acker);

lib/MessageAndCallbackBatch.cc

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,59 +22,82 @@
2222

2323
#include "ClientConnection.h"
2424
#include "Commands.h"
25-
#include "LogUtils.h"
26-
#include "MessageImpl.h"
27-
28-
DECLARE_LOG_OBJECT()
25+
#include "CompressionCodec.h"
26+
#include "MessageCrypto.h"
27+
#include "OpSendMsg.h"
28+
#include "PulsarApi.pb.h"
2929

3030
namespace pulsar {
3131

32+
MessageAndCallbackBatch::MessageAndCallbackBatch() {}
33+
34+
MessageAndCallbackBatch::~MessageAndCallbackBatch() {}
35+
3236
void MessageAndCallbackBatch::add(const Message& msg, const SendCallback& callback) {
33-
if (empty()) {
34-
msgImpl_.reset(new MessageImpl);
35-
Commands::initBatchMessageMetadata(msg, msgImpl_->metadata);
37+
if (callbacks_.empty()) {
38+
metadata_.reset(new proto::MessageMetadata);
39+
Commands::initBatchMessageMetadata(msg, *metadata_);
3640
}
37-
LOG_DEBUG(" Before serialization payload size in bytes = " << msgImpl_->payload.readableBytes());
38-
sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg, msgImpl_->payload,
39-
ClientConnection::getMaxMessageSize());
40-
LOG_DEBUG(" After serialization payload size in bytes = " << msgImpl_->payload.readableBytes());
41+
messages_.emplace_back(msg);
4142
callbacks_.emplace_back(callback);
42-
43-
++messagesCount_;
4443
messagesSize_ += msg.getLength();
4544
}
4645

46+
std::unique_ptr<OpSendMsg> MessageAndCallbackBatch::createOpSendMsg(
47+
uint64_t producerId, const ProducerConfiguration& producerConfig, MessageCrypto* crypto) {
48+
auto callback = createSendCallback();
49+
if (empty()) {
50+
return OpSendMsg::create(ResultOperationNotSupported, std::move(callback));
51+
}
52+
53+
// TODO: Store payload as a field and support shrinking
54+
SharedBuffer payload;
55+
auto sequenceId = Commands::serializeSingleMessagesToBatchPayload(payload, messages_);
56+
metadata_->set_sequence_id(sequenceId);
57+
metadata_->set_num_messages_in_batch(messages_.size());
58+
auto compressionType = producerConfig.getCompressionType();
59+
if (compressionType != CompressionNone) {
60+
metadata_->set_compression(static_cast<proto::CompressionType>(compressionType));
61+
metadata_->set_uncompressed_size(payload.readableBytes());
62+
}
63+
payload = CompressionCodecProvider::getCodec(compressionType).encode(payload);
64+
65+
if (producerConfig.isEncryptionEnabled() && crypto) {
66+
SharedBuffer encryptedPayload;
67+
if (!crypto->encrypt(producerConfig.getEncryptionKeys(), producerConfig.getCryptoKeyReader(),
68+
*metadata_, payload, encryptedPayload)) {
69+
return OpSendMsg::create(ResultCryptoError, std::move(callback));
70+
}
71+
payload = encryptedPayload;
72+
}
73+
74+
if (payload.readableBytes() > ClientConnection::getMaxMessageSize()) {
75+
return OpSendMsg::create(ResultMessageTooBig, std::move(callback));
76+
}
77+
78+
auto op = OpSendMsg::create(*metadata_, callbacks_.size(), messagesSize_, producerConfig.getSendTimeout(),
79+
std::move(callback), nullptr, producerId, payload);
80+
clear();
81+
return op;
82+
}
83+
4784
void MessageAndCallbackBatch::clear() {
48-
msgImpl_.reset();
85+
messages_.clear();
4986
callbacks_.clear();
50-
messagesCount_ = 0;
5187
messagesSize_ = 0;
5288
}
5389

5490
static void completeSendCallbacks(const std::vector<SendCallback>& callbacks, Result result,
5591
const MessageId& id) {
5692
int32_t numOfMessages = static_cast<int32_t>(callbacks.size());
57-
LOG_DEBUG("Batch complete [Result = " << result << "] [numOfMessages = " << numOfMessages << "]");
5893
for (int32_t i = 0; i < numOfMessages; i++) {
5994
callbacks[i](result, MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build());
6095
}
6196
}
6297

63-
void MessageAndCallbackBatch::complete(Result result, const MessageId& id) const {
64-
completeSendCallbacks(callbacks_, result, id);
65-
}
66-
67-
SendCallback MessageAndCallbackBatch::createSendCallback(const FlushCallback& flushCallback) const {
98+
SendCallback MessageAndCallbackBatch::createSendCallback() const {
6899
const auto& callbacks = callbacks_;
69-
if (flushCallback) {
70-
return [callbacks, flushCallback](Result result, const MessageId& id) {
71-
completeSendCallbacks(callbacks, result, id);
72-
flushCallback(result);
73-
};
74-
} else {
75-
return [callbacks] // save a copy of `callbacks_`
76-
(Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); };
77-
}
100+
return [callbacks](Result result, const MessageId& id) { completeSendCallbacks(callbacks, result, id); };
78101
}
79102

80103
} // namespace pulsar

0 commit comments

Comments
 (0)