Skip to content

Commit b441e68

Browse files
committed
fix message impl ctor
1 parent 2654dff commit b441e68

File tree

6 files changed

+55
-24
lines changed

6 files changed

+55
-24
lines changed

include/pulsar/Message.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,6 @@ class PULSAR_PUBLIC Message {
217217
MessageImplPtr impl_;
218218

219219
Message(MessageImplPtr& impl);
220-
Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
221-
proto::MessageMetadata& metadata, SharedBuffer& payload);
222-
/// Used for Batch Messages
223220
Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
224221
proto::MessageMetadata& metadata, SharedBuffer& payload,
225222
proto::SingleMessageMetadata& singleMetadata, const std::shared_ptr<std::string>& topicName);

lib/Commands.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -926,12 +926,13 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32
926926
const MessageId& m = batchedMessage.impl_->messageId;
927927
auto messageId = MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
928928
auto batchedMessageId = std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
929-
Message singleMessage(MessageId{batchedMessageId}, batchedMessage.impl_->brokerEntryMetadata,
930-
batchedMessage.impl_->metadata, payload, metadata,
931-
batchedMessage.impl_->topicName_);
932-
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
933929

934-
return singleMessage;
930+
auto msgImpl = std::make_shared<MessageImpl>(messageId, batchedMessage.impl_->brokerEntryMetadata,
931+
batchedMessage.impl_->metadata, payload, metadata,
932+
batchedMessage.impl_->topicName_, false);
933+
msgImpl->cnx_ = batchedMessage.impl_->cnx_;
934+
935+
return Message(msgImpl);
935936
}
936937

937938
MessageIdImplPtr Commands::getMessageIdImpl(const MessageId& messageId) { return messageId.impl_; }

lib/ConsumerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
583583
}
584584

585585
auto msgImpl =
586-
std::make_shared<MessageImpl>(messageId, metadata, brokerEntryMetadata, payload, std::nullopt,
586+
std::make_shared<MessageImpl>(messageId, brokerEntryMetadata, metadata, payload, std::nullopt,
587587
getTopicPtr(), decryptResult == CONSUME_ENCRYPTED);
588588
Message m(msgImpl);
589589
m.impl_->cnx_ = cnx.get();

lib/Message.cc

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,6 @@ Message::Message() : impl_() {}
7171

7272
Message::Message(MessageImplPtr& impl) : impl_(impl) {}
7373

74-
Message::Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
75-
proto::MessageMetadata& metadata, SharedBuffer& payload)
76-
: impl_(std::make_shared<MessageImpl>()) {
77-
impl_->messageId = messageId;
78-
impl_->brokerEntryMetadata = brokerEntryMetadata;
79-
impl_->metadata = metadata;
80-
impl_->payload = payload;
81-
}
82-
8374
Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& brokerEntryMetadata,
8475
proto::MessageMetadata& metadata, SharedBuffer& payload,
8576
proto::SingleMessageMetadata& singleMetadata, const std::shared_ptr<std::string>& topicName)

lib/MessageImpl.cc

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,55 @@
2020

2121
#include <utility>
2222

23+
#include "PulsarApi.pb.h"
24+
2325
namespace pulsar {
2426

25-
MessageImpl::MessageImpl(const MessageId& messageId, const proto::MessageMetadata& metadata,
26-
const proto::BrokerEntryMetadata& brokerEntryMetadata, const SharedBuffer& payload,
27+
MessageImpl::MessageImpl(const MessageId& messageId, const proto::BrokerEntryMetadata& brokerEntryMetadata,
28+
const proto::MessageMetadata& metadata, const SharedBuffer& payload,
2729
const optional<proto::SingleMessageMetadata>& singleMetadata,
2830
const std::shared_ptr<std::string>& topicName, bool isDecryptionFailed)
29-
: encryptionContext_(std::in_place, metadata, isDecryptionFailed) {}
31+
: messageId(messageId),
32+
brokerEntryMetadata(brokerEntryMetadata),
33+
metadata(metadata),
34+
payload(payload),
35+
topicName_(topicName),
36+
encryptionContext_(std::in_place, metadata, isDecryptionFailed) {
37+
if (singleMetadata.has_value()) {
38+
this->metadata.clear_properties();
39+
if (singleMetadata->properties_size() > 0) {
40+
this->metadata.mutable_properties()->Reserve(singleMetadata->properties_size());
41+
for (int i = 0; i < singleMetadata->properties_size(); i++) {
42+
auto keyValue = proto::KeyValue().New();
43+
*keyValue = singleMetadata->properties(i);
44+
this->metadata.mutable_properties()->AddAllocated(keyValue);
45+
}
46+
}
47+
if (singleMetadata->has_partition_key()) {
48+
this->metadata.set_partition_key(singleMetadata->partition_key());
49+
} else {
50+
this->metadata.clear_partition_key();
51+
}
52+
53+
if (singleMetadata->has_ordering_key()) {
54+
this->metadata.set_ordering_key(singleMetadata->ordering_key());
55+
} else {
56+
this->metadata.clear_ordering_key();
57+
}
58+
59+
if (singleMetadata->has_event_time()) {
60+
this->metadata.set_event_time(singleMetadata->event_time());
61+
} else {
62+
this->metadata.clear_event_time();
63+
}
64+
65+
if (singleMetadata->has_sequence_id()) {
66+
this->metadata.set_sequence_id(singleMetadata->sequence_id());
67+
} else {
68+
this->metadata.clear_sequence_id();
69+
}
70+
}
71+
}
3072

3173
const Message::StringMap& MessageImpl::properties() {
3274
if (properties_.size() == 0) {

lib/MessageImpl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ class MessageImpl {
4242
public:
4343
// TODO: remove this default constructor
4444
explicit MessageImpl() : encryptionContext_(std::nullopt) {}
45-
MessageImpl(const MessageId& messageId, const proto::MessageMetadata& metadata,
46-
const proto::BrokerEntryMetadata& brokerEntryMetadata, const SharedBuffer& payload,
45+
MessageImpl(const MessageId& messageId, const proto::BrokerEntryMetadata& brokerEntryMetadata,
46+
const proto::MessageMetadata& metadata, const SharedBuffer& payload,
4747
const optional<proto::SingleMessageMetadata>& singleMetadata,
4848
const std::shared_ptr<std::string>& topicName, bool undecryptedPayload);
4949

@@ -81,11 +81,11 @@ class MessageImpl {
8181
friend class PulsarWrapper;
8282
friend class MessageBuilder;
8383

84+
MessageId messageId;
8485
proto::BrokerEntryMetadata brokerEntryMetadata;
8586
proto::MessageMetadata metadata;
8687
SharedBuffer payload;
8788
std::shared_ptr<KeyValueImpl> keyValuePtr;
88-
MessageId messageId;
8989
ClientConnection* cnx_;
9090
std::shared_ptr<std::string> topicName_;
9191
int redeliveryCount_;

0 commit comments

Comments
 (0)