Skip to content

Commit cac5e1d

Browse files
Avoid copying OpSendMsg when sending messages (apache#308)
Fixes apache#306 ### Motivation `OpSendMsg` is a struct whose size is 400 bytes. We should avoid the copy operation on it. ### Modifications Pass the `unique_ptr<OpSendMsg>` everywhere instead of `OpSendMsg`. - Use `unique_ptr<OpSendMsg>` as the element of the pending message queue in `ProducerImpl` and disable the copy constructor and assignment for `OpSendMsg`. - Add `SendArgument`, which includes the necessary fields to construct a `CommandSend` request. Use `shared_ptr` rather than `unique_ptr` to store `SendArgument` in `OpSendMsg` because the producer might need to resend the message so the `SendArgument` object could be shared by `ProducerImpl` and `ClientConnection`. This patch is more like a refactor because the compiler optimization might reduce unnecessary copying.
1 parent afb2fcb commit cac5e1d

15 files changed

+250
-302
lines changed

lib/BatchMessageContainer.cc

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <stdexcept>
2222

2323
#include "LogUtils.h"
24+
#include "OpSendMsg.h"
2425

2526
DECLARE_LOG_OBJECT()
2627

@@ -52,14 +53,10 @@ void BatchMessageContainer::clear() {
5253
LOG_DEBUG(*this << " clear() called");
5354
}
5455

55-
Result BatchMessageContainer::createOpSendMsg(OpSendMsg& opSendMsg,
56-
const FlushCallback& flushCallback) const {
57-
return createOpSendMsgHelper(opSendMsg, flushCallback, batch_);
58-
}
59-
60-
std::vector<Result> BatchMessageContainer::createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
61-
const FlushCallback& flushCallback) const {
62-
throw std::runtime_error("createOpSendMsgs is not supported for BatchMessageContainer");
56+
std::unique_ptr<OpSendMsg> BatchMessageContainer::createOpSendMsg(const FlushCallback& flushCallback) {
57+
auto op = createOpSendMsgHelper(flushCallback, batch_);
58+
clear();
59+
return op;
6360
}
6461

6562
void BatchMessageContainer::serialize(std::ostream& os) const {

lib/BatchMessageContainer.h

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,22 @@ class BatchMessageContainer : public BatchMessageContainerBase {
3939

4040
~BatchMessageContainer();
4141

42-
size_t getNumBatches() const override { return 1; }
42+
bool hasMultiOpSendMsgs() const override { return false; }
4343

4444
bool isFirstMessageToAdd(const Message& msg) const override { return batch_.empty(); }
4545

4646
bool add(const Message& msg, const SendCallback& callback) override;
4747

48-
void clear() override;
49-
50-
Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback) const override;
51-
52-
std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
53-
const FlushCallback& flushCallback) const override;
54-
5548
void serialize(std::ostream& os) const override;
5649

50+
std::unique_ptr<OpSendMsg> createOpSendMsg(const FlushCallback& flushCallback) override;
51+
5752
private:
5853
MessageAndCallbackBatch batch_;
5954
size_t numberOfBatchesSent_ = 0;
6055
double averageBatchSize_ = 0;
56+
57+
void clear() override;
6158
};
6259

6360
} // namespace pulsar

lib/BatchMessageContainerBase.cc

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,13 @@ BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& produce
3737
producerId_(producer.producerId_),
3838
msgCryptoWeakPtr_(producer.msgCrypto_) {}
3939

40-
Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
41-
const FlushCallback& flushCallback,
42-
const MessageAndCallbackBatch& batch) const {
43-
opSendMsg.sendCallback_ = batch.createSendCallback();
44-
opSendMsg.messagesCount_ = batch.messagesCount();
45-
opSendMsg.messagesSize_ = batch.messagesSize();
46-
47-
if (flushCallback) {
48-
auto sendCallback = opSendMsg.sendCallback_;
49-
opSendMsg.sendCallback_ = [sendCallback, flushCallback](Result result, const MessageId& id) {
50-
sendCallback(result, id);
51-
flushCallback(result);
52-
};
53-
}
40+
BatchMessageContainerBase::~BatchMessageContainerBase() {}
5441

42+
std::unique_ptr<OpSendMsg> BatchMessageContainerBase::createOpSendMsgHelper(
43+
const FlushCallback& flushCallback, const MessageAndCallbackBatch& batch) const {
44+
auto sendCallback = batch.createSendCallback(flushCallback);
5545
if (batch.empty()) {
56-
return ResultOperationNotSupported;
46+
return OpSendMsg::create(ResultOperationNotSupported, std::move(sendCallback));
5747
}
5848

5949
MessageImplPtr impl = batch.msgImpl();
@@ -70,45 +60,18 @@ Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
7060
SharedBuffer encryptedPayload;
7161
if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), producerConfig_.getCryptoKeyReader(),
7262
impl->metadata, impl->payload, encryptedPayload)) {
73-
return ResultCryptoError;
63+
return OpSendMsg::create(ResultCryptoError, std::move(sendCallback));
7464
}
7565
impl->payload = encryptedPayload;
7666
}
7767

7868
if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) {
79-
return ResultMessageTooBig;
69+
return OpSendMsg::create(ResultMessageTooBig, std::move(sendCallback));
8070
}
8171

82-
opSendMsg.metadata_ = impl->metadata;
83-
opSendMsg.payload_ = impl->payload;
84-
opSendMsg.sequenceId_ = impl->metadata.sequence_id();
85-
opSendMsg.producerId_ = producerId_;
86-
opSendMsg.timeout_ = TimeUtils::now() + milliseconds(producerConfig_.getSendTimeout());
87-
88-
return ResultOk;
89-
}
90-
91-
void BatchMessageContainerBase::processAndClear(
92-
std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) {
93-
if (isEmpty()) {
94-
if (flushCallback) {
95-
// do nothing, flushCallback complete until the lastOpSend complete
96-
}
97-
} else {
98-
const auto numBatches = getNumBatches();
99-
if (numBatches == 1) {
100-
OpSendMsg opSendMsg;
101-
Result result = createOpSendMsg(opSendMsg, flushCallback);
102-
opSendMsgCallback(result, opSendMsg);
103-
} else if (numBatches > 1) {
104-
std::vector<OpSendMsg> opSendMsgs;
105-
std::vector<Result> results = createOpSendMsgs(opSendMsgs, flushCallback);
106-
for (size_t i = 0; i < results.size(); i++) {
107-
opSendMsgCallback(results[i], opSendMsgs[i]);
108-
}
109-
} // else numBatches is 0, do nothing
110-
}
111-
clear();
72+
return OpSendMsg::create(impl->metadata, batch.messagesCount(), batch.messagesSize(),
73+
producerConfig_.getSendTimeout(), batch.createSendCallback(flushCallback),
74+
nullptr, producerId_, impl->payload);
11275
}
11376

11477
} // namespace pulsar

lib/BatchMessageContainerBase.h

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
#include <boost/noncopyable.hpp>
2828
#include <memory>
29+
#include <stdexcept>
2930
#include <vector>
3031

3132
namespace pulsar {
@@ -44,14 +45,9 @@ class BatchMessageContainerBase : public boost::noncopyable {
4445
public:
4546
BatchMessageContainerBase(const ProducerImpl& producer);
4647

47-
virtual ~BatchMessageContainerBase() {}
48+
virtual ~BatchMessageContainerBase();
4849

49-
/**
50-
* Get number of batches in the batch message container
51-
*
52-
* @return number of batches
53-
*/
54-
virtual size_t getNumBatches() const = 0;
50+
virtual bool hasMultiOpSendMsgs() const = 0;
5551

5652
/**
5753
* Check the message will be the 1st message to be added to the batch
@@ -73,32 +69,14 @@ class BatchMessageContainerBase : public boost::noncopyable {
7369
*/
7470
virtual bool add(const Message& msg, const SendCallback& callback) = 0;
7571

76-
/**
77-
* Clear the batch message container
78-
*/
79-
virtual void clear() = 0;
80-
81-
/**
82-
* Create a OpSendMsg object to send
83-
*
84-
* @param opSendMsg the OpSendMsg object to create
85-
* @param flushCallback the callback to trigger after the OpSendMsg was completed
86-
* @return ResultOk if create successfully
87-
* @note OpSendMsg's sendCallback_ must be set even if it failed
88-
*/
89-
virtual Result createOpSendMsg(OpSendMsg& opSendMsg,
90-
const FlushCallback& flushCallback = nullptr) const = 0;
72+
virtual std::unique_ptr<OpSendMsg> createOpSendMsg(const FlushCallback& flushCallback = nullptr) {
73+
throw std::runtime_error("createOpSendMsg is not supported");
74+
}
9175

92-
/**
93-
* Create a OpSendMsg list to send
94-
*
95-
* @param opSendMsgList the OpSendMsg list to create
96-
* @param flushCallback the callback to trigger after the OpSendMsg was completed
97-
* @return all create results of `opSendMsgs`, ResultOk means create successfully
98-
* @note OpSendMsg's sendCallback_ must be set even if it failed
99-
*/
100-
virtual std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
101-
const FlushCallback& flushCallback = nullptr) const = 0;
76+
virtual std::vector<std::unique_ptr<OpSendMsg>> createOpSendMsgs(
77+
const FlushCallback& flushCallback = nullptr) {
78+
throw std::runtime_error("createOpSendMsgs is not supported");
79+
}
10280

10381
/**
10482
* Serialize into a std::ostream for logging
@@ -110,9 +88,6 @@ class BatchMessageContainerBase : public boost::noncopyable {
11088
bool hasEnoughSpace(const Message& msg) const noexcept;
11189
bool isEmpty() const noexcept;
11290

113-
void processAndClear(std::function<void(Result, const OpSendMsg&)> opSendMsgCallback,
114-
FlushCallback flushCallback);
115-
11691
protected:
11792
// references to ProducerImpl's fields
11893
const std::shared_ptr<std::string> topicName_;
@@ -134,8 +109,10 @@ class BatchMessageContainerBase : public boost::noncopyable {
134109
void updateStats(const Message& msg);
135110
void resetStats();
136111

137-
Result createOpSendMsgHelper(OpSendMsg& opSendMsg, const FlushCallback& flushCallback,
138-
const MessageAndCallbackBatch& batch) const;
112+
std::unique_ptr<OpSendMsg> createOpSendMsgHelper(const FlushCallback& flushCallback,
113+
const MessageAndCallbackBatch& batch) const;
114+
115+
virtual void clear() = 0;
139116
};
140117

141118
inline bool BatchMessageContainerBase::hasEnoughSpace(const Message& msg) const noexcept {

lib/BatchMessageKeyBasedContainer.cc

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,8 @@ void BatchMessageKeyBasedContainer::clear() {
7272
LOG_DEBUG(*this << " clear() called");
7373
}
7474

75-
Result BatchMessageKeyBasedContainer::createOpSendMsg(OpSendMsg& opSendMsg,
76-
const FlushCallback& flushCallback) const {
77-
if (batches_.size() < 1) {
78-
return ResultOperationNotSupported;
79-
}
80-
return createOpSendMsgHelper(opSendMsg, flushCallback, batches_.begin()->second);
81-
}
82-
83-
std::vector<Result> BatchMessageKeyBasedContainer::createOpSendMsgs(
84-
std::vector<OpSendMsg>& opSendMsgs, const FlushCallback& flushCallback) const {
75+
std::vector<std::unique_ptr<OpSendMsg>> BatchMessageKeyBasedContainer::createOpSendMsgs(
76+
const FlushCallback& flushCallback) {
8577
// Sorted the batches by sequence id
8678
std::vector<const MessageAndCallbackBatch*> sortedBatches;
8779
for (const auto& kv : batches_) {
@@ -92,18 +84,15 @@ std::vector<Result> BatchMessageKeyBasedContainer::createOpSendMsgs(
9284
return lhs->sequenceId() < rhs->sequenceId();
9385
});
9486

95-
size_t numBatches = sortedBatches.size();
96-
opSendMsgs.resize(numBatches);
97-
98-
std::vector<Result> results(numBatches);
99-
for (size_t i = 0; i + 1 < numBatches; i++) {
100-
results[i] = createOpSendMsgHelper(opSendMsgs[i], nullptr, *sortedBatches[i]);
87+
std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{sortedBatches.size()};
88+
for (size_t i = 0; i + 1 < opSendMsgs.size(); i++) {
89+
opSendMsgs[i].reset(createOpSendMsgHelper(nullptr, *sortedBatches[i]).release());
10190
}
102-
if (numBatches > 0) {
103-
// Add flush callback to the last batch
104-
results.back() = createOpSendMsgHelper(opSendMsgs.back(), flushCallback, *sortedBatches.back());
91+
if (!opSendMsgs.empty()) {
92+
opSendMsgs.back().reset(createOpSendMsgHelper(flushCallback, *sortedBatches.back()).release());
10593
}
106-
return results;
94+
clear();
95+
return opSendMsgs;
10796
}
10897

10998
void BatchMessageKeyBasedContainer::serialize(std::ostream& os) const {

lib/BatchMessageKeyBasedContainer.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,13 @@ class BatchMessageKeyBasedContainer : public BatchMessageContainerBase {
3232

3333
~BatchMessageKeyBasedContainer();
3434

35-
size_t getNumBatches() const override { return batches_.size(); }
35+
bool hasMultiOpSendMsgs() const override { return true; }
3636

3737
bool isFirstMessageToAdd(const Message& msg) const override;
3838

3939
bool add(const Message& msg, const SendCallback& callback) override;
4040

41-
void clear() override;
42-
43-
Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback) const override;
44-
45-
std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
46-
const FlushCallback& flushCallback) const override;
41+
std::vector<std::unique_ptr<OpSendMsg>> createOpSendMsgs(const FlushCallback& flushCallback) override;
4742

4843
void serialize(std::ostream& os) const override;
4944

@@ -53,8 +48,7 @@ class BatchMessageKeyBasedContainer : public BatchMessageContainerBase {
5348
size_t numberOfBatchesSent_ = 0;
5449
double averageBatchSize_ = 0;
5550

56-
Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback,
57-
MessageAndCallbackBatch& batch) const;
51+
void clear() override;
5852
};
5953

6054
} // namespace pulsar

lib/ClientConnection.cc

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,37 +1025,30 @@ void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
10251025
std::bind(&ClientConnection::handleSend, shared_from_this(), std::placeholders::_1, cmd)));
10261026
}
10271027

1028-
void ClientConnection::sendMessage(const OpSendMsg& opSend) {
1028+
void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) {
10291029
Lock lock(mutex_);
1030-
1031-
if (pendingWriteOperations_++ == 0) {
1032-
// Write immediately to socket
1033-
if (tlsSocket_) {
1030+
if (pendingWriteOperations_++ > 0) {
1031+
pendingWriteBuffers_.emplace_back(args);
1032+
return;
1033+
}
1034+
auto self = shared_from_this();
1035+
auto sendMessageInternal = [this, self, args] {
1036+
BaseCommand outgoingCmd;
1037+
auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
1038+
asyncWrite(buffer, customAllocReadHandler(std::bind(&ClientConnection::handleSendPair,
1039+
shared_from_this(), std::placeholders::_1)));
1040+
};
1041+
if (tlsSocket_) {
10341042
#if BOOST_VERSION >= 106600
1035-
boost::asio::post(strand_,
1036-
std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend));
1043+
boost::asio::post(strand_, sendMessageInternal);
10371044
#else
1038-
strand_.post(std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend));
1045+
strand_.post(sendMessageInternal);
10391046
#endif
1040-
} else {
1041-
sendMessageInternal(opSend);
1042-
}
10431047
} else {
1044-
// Queue to send later
1045-
pendingWriteBuffers_.push_back(opSend);
1048+
sendMessageInternal();
10461049
}
10471050
}
10481051

1049-
void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) {
1050-
BaseCommand outgoingCmd;
1051-
PairSharedBuffer buffer =
1052-
Commands::newSend(outgoingBuffer_, outgoingCmd, opSend.producerId_, opSend.sequenceId_,
1053-
getChecksumType(), opSend.metadata_, opSend.payload_);
1054-
1055-
asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
1056-
shared_from_this(), std::placeholders::_1)));
1057-
}
1058-
10591052
void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) {
10601053
if (err) {
10611054
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
@@ -1088,13 +1081,12 @@ void ClientConnection::sendPendingCommands() {
10881081
customAllocWriteHandler(std::bind(&ClientConnection::handleSend, shared_from_this(),
10891082
std::placeholders::_1, buffer)));
10901083
} else {
1091-
assert(any.type() == typeid(OpSendMsg));
1084+
assert(any.type() == typeid(std::shared_ptr<SendArguments>));
10921085

1093-
const OpSendMsg& op = boost::any_cast<const OpSendMsg&>(any);
1086+
auto args = boost::any_cast<std::shared_ptr<SendArguments>>(any);
10941087
BaseCommand outgoingCmd;
10951088
PairSharedBuffer buffer =
1096-
Commands::newSend(outgoingBuffer_, outgoingCmd, op.producerId_, op.sequenceId_,
1097-
getChecksumType(), op.metadata_, op.payload_);
1089+
Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
10981090

10991091
asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
11001092
shared_from_this(), std::placeholders::_1)));

lib/ClientConnection.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ typedef std::weak_ptr<ConsumerImpl> ConsumerImplWeakPtr;
6969
class LookupDataResult;
7070
class BrokerConsumerStatsImpl;
7171
class PeriodicTask;
72-
73-
struct OpSendMsg;
72+
struct SendArguments;
7473

7574
namespace proto {
7675
class BaseCommand;
@@ -153,8 +152,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
153152

154153
void sendCommand(const SharedBuffer& cmd);
155154
void sendCommandInternal(const SharedBuffer& cmd);
156-
void sendMessage(const OpSendMsg& opSend);
157-
void sendMessageInternal(const OpSendMsg& opSend);
155+
void sendMessage(const std::shared_ptr<SendArguments>& args);
158156

159157
void registerProducer(int producerId, ProducerImplPtr producer);
160158
void registerConsumer(int consumerId, ConsumerImplPtr consumer);

0 commit comments

Comments
 (0)