Skip to content

Commit 27e0092

Browse files
committed
Revert "Initial commit"
This reverts commit e8e3e23.
1 parent e8e3e23 commit 27e0092

15 files changed

+155
-498
lines changed

include/pulsar/EncryptionContext.h

Lines changed: 0 additions & 119 deletions
This file was deleted.

include/pulsar/Message.h

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,22 @@
1919
#ifndef MESSAGE_HPP_
2020
#define MESSAGE_HPP_
2121

22-
#include <pulsar/EncryptionContext.h>
2322
#include <pulsar/defines.h>
2423

2524
#include <map>
2625
#include <memory>
27-
#include <optional>
2826
#include <string>
2927

3028
#include "KeyValue.h"
3129
#include "MessageId.h"
3230

3331
namespace pulsar {
32+
namespace proto {
33+
class CommandMessage;
34+
class BrokerEntryMetadata;
35+
class MessageMetadata;
36+
class SingleMessageMetadata;
37+
} // namespace proto
3438

3539
class SharedBuffer;
3640
class MessageBuilder;
@@ -198,19 +202,19 @@ class PULSAR_PUBLIC Message {
198202
*/
199203
const std::string& getProducerName() const noexcept;
200204

201-
/**
202-
* @return the optional encryption context that is present when the message is encrypted, the pointer is
203-
* valid as the Message instance is alive
204-
*/
205-
std::optional<const EncryptionContext*> getEncryptionContext() const;
206-
207205
bool operator==(const Message& msg) const;
208206

209207
protected:
210208
typedef std::shared_ptr<MessageImpl> MessageImplPtr;
211209
MessageImplPtr impl_;
212210

213-
Message(const MessageImplPtr& impl);
211+
Message(MessageImplPtr& impl);
212+
Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
213+
proto::MessageMetadata& metadata, SharedBuffer& payload);
214+
/// Used for Batch Messages
215+
Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
216+
proto::MessageMetadata& metadata, SharedBuffer& payload,
217+
proto::SingleMessageMetadata& singleMetadata, const std::shared_ptr<std::string>& topicName);
214218
friend class PartitionedProducerImpl;
215219
friend class MultiTopicsConsumerImpl;
216220
friend class MessageBuilder;

lib/Commands.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -906,8 +906,7 @@ uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl
906906
}
907907

908908
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
909-
int32_t batchSize, const BatchMessageAckerPtr& acker,
910-
const optional<EncryptionContext>& encryptionContext) {
909+
int32_t batchSize, const BatchMessageAckerPtr& acker) {
911910
SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;
912911

913912
// Format of batch message
@@ -927,13 +926,12 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32
927926
const MessageId& m = batchedMessage.impl_->messageId;
928927
auto messageId = MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
929928
auto batchedMessageId = std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
929+
Message singleMessage(MessageId{batchedMessageId}, batchedMessage.impl_->brokerEntryMetadata,
930+
batchedMessage.impl_->metadata, payload, metadata,
931+
batchedMessage.impl_->topicName_);
932+
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
930933

931-
auto msgImpl = std::make_shared<MessageImpl>(messageId, batchedMessage.impl_->brokerEntryMetadata,
932-
batchedMessage.impl_->metadata, payload, metadata,
933-
batchedMessage.impl_->topicName_, encryptionContext);
934-
msgImpl->cnx_ = batchedMessage.impl_->cnx_;
935-
936-
return Message(msgImpl);
934+
return singleMessage;
937935
}
938936

939937
MessageIdImplPtr Commands::getMessageIdImpl(const MessageId& messageId) { return messageId.impl_; }

lib/Commands.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,7 @@ class Commands {
155155
const std::vector<Message>& messages);
156156

157157
static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
158-
int32_t batchSize, const BatchMessageAckerPtr& acker,
159-
const optional<EncryptionContext>& encryptionContext);
158+
int32_t batchSize, const BatchMessageAckerPtr& acker);
160159

161160
static MessageIdImplPtr getMessageIdImpl(const MessageId& messageId);
162161

lib/ConsumerImpl.cc

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -548,27 +548,25 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
548548
bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata,
549549
proto::MessageMetadata& metadata, SharedBuffer& payload) {
550550
LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes());
551-
if (!isChecksumValid) {
552-
// Message discarded for checksum error
553-
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch);
551+
552+
if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
553+
// Message was discarded or not consumed due to decryption failure
554554
return;
555555
}
556556

557-
auto encryptionContext = metadata.encryption_keys_size() > 0
558-
? optional<EncryptionContext>{std::in_place, metadata, false}
559-
: std::nullopt;
560-
561-
auto decryptResult = decryptMessageIfNeeded(cnx, encryptionContext, payload, msg.message_id());
562-
if (decryptResult == FAILED) {
563-
// Message was discarded due to decryption failure or not consumed due to decryption failure
557+
if (!isChecksumValid) {
558+
// Message discarded for checksum error
559+
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch);
564560
return;
565-
} else if (decryptResult == CONSUME_ENCRYPTED) {
566-
encryptionContext->isDecryptionFailed_ = true;
567561
}
568562

569563
auto redeliveryCount = msg.redelivery_count();
564+
const bool isMessageUndecryptable =
565+
metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() &&
566+
config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME;
567+
570568
const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
571-
if (decryptResult == DECRYPTED && !isChunkedMessage) {
569+
if (!isMessageUndecryptable && !isChunkedMessage) {
572570
if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) {
573571
// Message was discarded on decompression error
574572
return;
@@ -588,9 +586,9 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
588586
}
589587
}
590588

591-
Message m{std::make_shared<MessageImpl>(messageId, brokerEntryMetadata, metadata, payload, std::nullopt,
592-
getTopicPtr(), std::move(encryptionContext))};
589+
Message m(messageId, brokerEntryMetadata, metadata, payload);
593590
m.impl_->cnx_ = cnx.get();
591+
m.impl_->setTopicName(getTopicPtr());
594592
m.impl_->setRedeliveryCount(msg.redelivery_count());
595593

596594
if (metadata.has_schema_version()) {
@@ -612,16 +610,14 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
612610
return;
613611
}
614612

615-
// When the decryption failed, the whole batch message will be treated as a single message.
616-
if (metadata.has_num_messages_in_batch() && decryptResult == DECRYPTED) {
613+
if (metadata.has_num_messages_in_batch()) {
617614
BitSet::Data words(msg.ack_set_size());
618615
for (int i = 0; i < words.size(); i++) {
619616
words[i] = msg.ack_set(i);
620617
}
621618
BitSet ackSet{std::move(words)};
622619
Lock lock(mutex_);
623-
numOfMessageReceived =
624-
receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count(), encryptionContext);
620+
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count());
625621
} else {
626622
// try convert key value data.
627623
m.impl_->convertPayloadToKeyValue(config_.getSchema());
@@ -746,9 +742,9 @@ void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
746742
}
747743

748744
// Zero Queue size is not supported with Batch Messages
749-
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(
750-
const ClientConnectionPtr& cnx, Message& batchedMessage, const BitSet& ackSet, int redeliveryCount,
751-
const optional<EncryptionContext>& encryptionContext) {
745+
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx,
746+
Message& batchedMessage, const BitSet& ackSet,
747+
int redeliveryCount) {
752748
auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
753749
LOG_DEBUG("Received Batch messages of size - " << batchSize
754750
<< " -- msgId: " << batchedMessage.getMessageId());
@@ -760,8 +756,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(
760756
std::vector<Message> possibleToDeadLetter;
761757
for (int i = 0; i < batchSize; i++) {
762758
// This is a cheap copy since message contains only one shared pointer (impl_)
763-
Message msg =
764-
Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker, encryptionContext);
759+
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
765760
msg.impl_->setRedeliveryCount(redeliveryCount);
766761
msg.impl_->setTopicName(batchedMessage.impl_->topicName_);
767762
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
@@ -817,51 +812,50 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(
817812
return batchSize - skippedMessages;
818813
}
819814

820-
auto ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx,
821-
const optional<EncryptionContext>& context, SharedBuffer& payload,
822-
const proto::MessageIdData& msgId) -> DecryptResult {
823-
if (!context.has_value()) {
824-
return DECRYPTED;
815+
bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
816+
const proto::MessageMetadata& metadata, SharedBuffer& payload) {
817+
if (!metadata.encryption_keys_size()) {
818+
return true;
825819
}
826820

827821
// If KeyReader is not configured throw exception based on config param
828822
if (!config_.isEncryptionEnabled()) {
829823
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
830824
LOG_WARN(getName() << "CryptoKeyReader is not implemented. Consuming encrypted message.");
831-
return CONSUME_ENCRYPTED;
825+
return true;
832826
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
833827
LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config "
834828
"is set to discard");
835-
discardCorruptedMessage(cnx, msgId, CommandAck_ValidationError_DecryptionError);
829+
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError);
836830
} else {
837831
LOG_ERROR(getName() << "Message delivery failed since CryptoKeyReader is not implemented to "
838832
"consume encrypted message");
839-
auto messageId = MessageIdBuilder::from(msgId).build();
833+
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
840834
unAckedMessageTrackerPtr_->add(messageId);
841835
}
842-
return FAILED;
836+
return false;
843837
}
844838

845839
SharedBuffer decryptedPayload;
846-
if (msgCrypto_->decrypt(*context, payload, config_.getCryptoKeyReader(), decryptedPayload)) {
840+
if (msgCrypto_->decrypt(metadata, payload, config_.getCryptoKeyReader(), decryptedPayload)) {
847841
payload = decryptedPayload;
848-
return DECRYPTED;
842+
return true;
849843
}
850844

851845
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
852846
// Note, batch message will fail to consume even if config is set to consume
853847
LOG_WARN(
854848
getName() << "Decryption failed. Consuming encrypted message since config is set to consume.");
855-
return CONSUME_ENCRYPTED;
849+
return true;
856850
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
857851
LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard");
858-
discardCorruptedMessage(cnx, msgId, CommandAck_ValidationError_DecryptionError);
852+
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError);
859853
} else {
860854
LOG_ERROR(getName() << "Message delivery failed since unable to decrypt incoming message");
861-
auto messageId = MessageIdBuilder::from(msgId).build();
855+
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
862856
unAckedMessageTrackerPtr_->add(messageId);
863857
}
864-
return FAILED;
858+
return false;
865859
}
866860

867861
bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,

0 commit comments

Comments
 (0)