Skip to content

Commit e8e3e23

Browse files
committed
Initial commit
1 parent c64e0e9 commit e8e3e23

15 files changed

+498
-155
lines changed

include/pulsar/EncryptionContext.h

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <cstdint>
22+
#include <string>
23+
#include <unordered_map>
24+
#include <vector>
25+
26+
#include "CompressionType.h"
27+
#include "defines.h"
28+
29+
namespace pulsar {
30+
31+
namespace proto {
32+
class MessageMetadata;
33+
}
34+
35+
class Message;
36+
37+
struct PULSAR_PUBLIC EncryptionKey {
38+
std::string key;
39+
std::string value;
40+
std::unordered_map<std::string, std::string> metadata;
41+
42+
explicit EncryptionKey() = default;
43+
44+
// Support in-place construction
45+
EncryptionKey(const std::string& key, const std::string& value,
46+
const decltype(EncryptionKey::metadata)& metadata)
47+
: key(key), value(value), metadata(metadata) {}
48+
};
49+
50+
/**
51+
* It contains encryption and compression information in it using which application can decrypt consumed
52+
* message with encrypted-payload.
53+
*/
54+
class PULSAR_PUBLIC EncryptionContext {
55+
public:
56+
explicit EncryptionContext()
57+
: compressionType_(CompressionNone),
58+
uncompressedMessageSize_(0),
59+
batchSize_(-1),
60+
isDecryptionFailed_(false) {}
61+
EncryptionContext(const EncryptionContext&) = default;
62+
EncryptionContext(EncryptionContext&&) noexcept = default;
63+
EncryptionContext(const proto::MessageMetadata& metadata, bool isDecryptionFailed);
64+
65+
using KeysType = std::vector<EncryptionKey>;
66+
67+
/**
68+
* @return the map of encryption keys used for the message
69+
*/
70+
const KeysType& keys() const noexcept { return keys_; }
71+
72+
/**
73+
* @return the encryption parameter used for the message
74+
*/
75+
const std::string& param() const noexcept { return param_; }
76+
77+
/**
78+
* @return the encryption algorithm used for the message
79+
*/
80+
const std::string& algorithm() const noexcept { return algorithm_; }
81+
82+
/**
83+
* @return the compression type used for the message
84+
*/
85+
CompressionType compressionType() const noexcept { return compressionType_; }
86+
87+
/**
88+
* @return the uncompressed message size if the message is compressed, 0 otherwise
89+
*/
90+
uint32_t uncompressedMessageSize() const noexcept { return uncompressedMessageSize_; }
91+
92+
/**
93+
* @return the batch size if the message is part of a batch, -1 otherwise
94+
*/
95+
int32_t batchSize() const noexcept { return batchSize_; }
96+
97+
/**
98+
* When the `ConsumerConfiguration#getCryptoFailureAction` is set to `CONSUME`, the message will still be
99+
* returned even if the decryption failed, in this case, the message payload is still not decrypted but
100+
* users have no way to know that. This method is provided to let users know whether the decryption
101+
* failed.
102+
*
103+
* @return whether the decryption failed
104+
*/
105+
bool isDecryptionFailed() const noexcept { return isDecryptionFailed_; }
106+
107+
private:
108+
KeysType keys_;
109+
std::string param_;
110+
std::string algorithm_;
111+
CompressionType compressionType_;
112+
uint32_t uncompressedMessageSize_;
113+
int32_t batchSize_;
114+
bool isDecryptionFailed_;
115+
116+
friend class ConsumerImpl;
117+
};
118+
119+
} // namespace pulsar

include/pulsar/Message.h

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,18 @@
1919
#ifndef MESSAGE_HPP_
2020
#define MESSAGE_HPP_
2121

22+
#include <pulsar/EncryptionContext.h>
2223
#include <pulsar/defines.h>
2324

2425
#include <map>
2526
#include <memory>
27+
#include <optional>
2628
#include <string>
2729

2830
#include "KeyValue.h"
2931
#include "MessageId.h"
3032

3133
namespace pulsar {
32-
namespace proto {
33-
class CommandMessage;
34-
class BrokerEntryMetadata;
35-
class MessageMetadata;
36-
class SingleMessageMetadata;
37-
} // namespace proto
3834

3935
class SharedBuffer;
4036
class MessageBuilder;
@@ -202,19 +198,19 @@ class PULSAR_PUBLIC Message {
202198
*/
203199
const std::string& getProducerName() const noexcept;
204200

201+
/**
202+
* @return the optional encryption context that is present when the message is encrypted, the pointer is
203+
* valid as the Message instance is alive
204+
*/
205+
std::optional<const EncryptionContext*> getEncryptionContext() const;
206+
205207
bool operator==(const Message& msg) const;
206208

207209
protected:
208210
typedef std::shared_ptr<MessageImpl> MessageImplPtr;
209211
MessageImplPtr impl_;
210212

211-
Message(MessageImplPtr& impl);
212-
Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
213-
proto::MessageMetadata& metadata, SharedBuffer& payload);
214-
/// Used for Batch Messages
215-
Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata,
216-
proto::MessageMetadata& metadata, SharedBuffer& payload,
217-
proto::SingleMessageMetadata& singleMetadata, const std::shared_ptr<std::string>& topicName);
213+
Message(const MessageImplPtr& impl);
218214
friend class PartitionedProducerImpl;
219215
friend class MultiTopicsConsumerImpl;
220216
friend class MessageBuilder;

lib/Commands.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,8 @@ uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer& batchPayl
906906
}
907907

908908
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
909-
int32_t batchSize, const BatchMessageAckerPtr& acker) {
909+
int32_t batchSize, const BatchMessageAckerPtr& acker,
910+
const optional<EncryptionContext>& encryptionContext) {
910911
SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;
911912

912913
// Format of batch message
@@ -926,12 +927,13 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32
926927
const MessageId& m = batchedMessage.impl_->messageId;
927928
auto messageId = MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
928929
auto batchedMessageId = std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
929-
Message singleMessage(MessageId{batchedMessageId}, batchedMessage.impl_->brokerEntryMetadata,
930-
batchedMessage.impl_->metadata, payload, metadata,
931-
batchedMessage.impl_->topicName_);
932-
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
933930

934-
return singleMessage;
931+
auto msgImpl = std::make_shared<MessageImpl>(messageId, batchedMessage.impl_->brokerEntryMetadata,
932+
batchedMessage.impl_->metadata, payload, metadata,
933+
batchedMessage.impl_->topicName_, encryptionContext);
934+
msgImpl->cnx_ = batchedMessage.impl_->cnx_;
935+
936+
return Message(msgImpl);
935937
}
936938

937939
MessageIdImplPtr Commands::getMessageIdImpl(const MessageId& messageId) { return messageId.impl_; }

lib/Commands.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ class Commands {
155155
const std::vector<Message>& messages);
156156

157157
static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex,
158-
int32_t batchSize, const BatchMessageAckerPtr& acker);
158+
int32_t batchSize, const BatchMessageAckerPtr& acker,
159+
const optional<EncryptionContext>& encryptionContext);
159160

160161
static MessageIdImplPtr getMessageIdImpl(const MessageId& messageId);
161162

lib/ConsumerImpl.cc

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -548,25 +548,27 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
548548
bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata,
549549
proto::MessageMetadata& metadata, SharedBuffer& payload) {
550550
LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes());
551-
552-
if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
553-
// Message was discarded or not consumed due to decryption failure
554-
return;
555-
}
556-
557551
if (!isChecksumValid) {
558552
// Message discarded for checksum error
559553
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch);
560554
return;
561555
}
562556

563-
auto redeliveryCount = msg.redelivery_count();
564-
const bool isMessageUndecryptable =
565-
metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() &&
566-
config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME;
557+
auto encryptionContext = metadata.encryption_keys_size() > 0
558+
? optional<EncryptionContext>{std::in_place, metadata, false}
559+
: std::nullopt;
567560

561+
auto decryptResult = decryptMessageIfNeeded(cnx, encryptionContext, payload, msg.message_id());
562+
if (decryptResult == FAILED) {
563+
// Message was discarded due to decryption failure or not consumed due to decryption failure
564+
return;
565+
} else if (decryptResult == CONSUME_ENCRYPTED) {
566+
encryptionContext->isDecryptionFailed_ = true;
567+
}
568+
569+
auto redeliveryCount = msg.redelivery_count();
568570
const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
569-
if (!isMessageUndecryptable && !isChunkedMessage) {
571+
if (decryptResult == DECRYPTED && !isChunkedMessage) {
570572
if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) {
571573
// Message was discarded on decompression error
572574
return;
@@ -586,9 +588,9 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
586588
}
587589
}
588590

589-
Message m(messageId, brokerEntryMetadata, metadata, payload);
591+
Message m{std::make_shared<MessageImpl>(messageId, brokerEntryMetadata, metadata, payload, std::nullopt,
592+
getTopicPtr(), std::move(encryptionContext))};
590593
m.impl_->cnx_ = cnx.get();
591-
m.impl_->setTopicName(getTopicPtr());
592594
m.impl_->setRedeliveryCount(msg.redelivery_count());
593595

594596
if (metadata.has_schema_version()) {
@@ -610,14 +612,16 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
610612
return;
611613
}
612614

613-
if (metadata.has_num_messages_in_batch()) {
615+
// When the decryption failed, the whole batch message will be treated as a single message.
616+
if (metadata.has_num_messages_in_batch() && decryptResult == DECRYPTED) {
614617
BitSet::Data words(msg.ack_set_size());
615618
for (int i = 0; i < words.size(); i++) {
616619
words[i] = msg.ack_set(i);
617620
}
618621
BitSet ackSet{std::move(words)};
619622
Lock lock(mutex_);
620-
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count());
623+
numOfMessageReceived =
624+
receiveIndividualMessagesFromBatch(cnx, m, ackSet, msg.redelivery_count(), encryptionContext);
621625
} else {
622626
// try convert key value data.
623627
m.impl_->convertPayloadToKeyValue(config_.getSchema());
@@ -742,9 +746,9 @@ void ConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
742746
}
743747

744748
// Zero Queue size is not supported with Batch Messages
745-
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx,
746-
Message& batchedMessage, const BitSet& ackSet,
747-
int redeliveryCount) {
749+
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(
750+
const ClientConnectionPtr& cnx, Message& batchedMessage, const BitSet& ackSet, int redeliveryCount,
751+
const optional<EncryptionContext>& encryptionContext) {
748752
auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
749753
LOG_DEBUG("Received Batch messages of size - " << batchSize
750754
<< " -- msgId: " << batchedMessage.getMessageId());
@@ -756,7 +760,8 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
756760
std::vector<Message> possibleToDeadLetter;
757761
for (int i = 0; i < batchSize; i++) {
758762
// This is a cheap copy since message contains only one shared pointer (impl_)
759-
Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
763+
Message msg =
764+
Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker, encryptionContext);
760765
msg.impl_->setRedeliveryCount(redeliveryCount);
761766
msg.impl_->setTopicName(batchedMessage.impl_->topicName_);
762767
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
@@ -812,50 +817,51 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
812817
return batchSize - skippedMessages;
813818
}
814819

815-
bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
816-
const proto::MessageMetadata& metadata, SharedBuffer& payload) {
817-
if (!metadata.encryption_keys_size()) {
818-
return true;
820+
auto ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx,
821+
const optional<EncryptionContext>& context, SharedBuffer& payload,
822+
const proto::MessageIdData& msgId) -> DecryptResult {
823+
if (!context.has_value()) {
824+
return DECRYPTED;
819825
}
820826

821827
// If KeyReader is not configured throw exception based on config param
822828
if (!config_.isEncryptionEnabled()) {
823829
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
824830
LOG_WARN(getName() << "CryptoKeyReader is not implemented. Consuming encrypted message.");
825-
return true;
831+
return CONSUME_ENCRYPTED;
826832
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
827833
LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config "
828834
"is set to discard");
829-
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError);
835+
discardCorruptedMessage(cnx, msgId, CommandAck_ValidationError_DecryptionError);
830836
} else {
831837
LOG_ERROR(getName() << "Message delivery failed since CryptoKeyReader is not implemented to "
832838
"consume encrypted message");
833-
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
839+
auto messageId = MessageIdBuilder::from(msgId).build();
834840
unAckedMessageTrackerPtr_->add(messageId);
835841
}
836-
return false;
842+
return FAILED;
837843
}
838844

839845
SharedBuffer decryptedPayload;
840-
if (msgCrypto_->decrypt(metadata, payload, config_.getCryptoKeyReader(), decryptedPayload)) {
846+
if (msgCrypto_->decrypt(*context, payload, config_.getCryptoKeyReader(), decryptedPayload)) {
841847
payload = decryptedPayload;
842-
return true;
848+
return DECRYPTED;
843849
}
844850

845851
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
846852
// Note, batch message will fail to consume even if config is set to consume
847853
LOG_WARN(
848854
getName() << "Decryption failed. Consuming encrypted message since config is set to consume.");
849-
return true;
855+
return CONSUME_ENCRYPTED;
850856
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
851857
LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard");
852-
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError);
858+
discardCorruptedMessage(cnx, msgId, CommandAck_ValidationError_DecryptionError);
853859
} else {
854860
LOG_ERROR(getName() << "Message delivery failed since unable to decrypt incoming message");
855-
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
861+
auto messageId = MessageIdBuilder::from(msgId).build();
856862
unAckedMessageTrackerPtr_->add(messageId);
857863
}
858-
return false;
864+
return FAILED;
859865
}
860866

861867
bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,

0 commit comments

Comments
 (0)