Skip to content

Commit c6ec048

Browse files
committed
(WIP) EncryptionContext
TODO: create context, refactor message
1 parent c64e0e9 commit c6ec048

File tree

8 files changed

+217
-35
lines changed

8 files changed

+217
-35
lines changed

include/pulsar/EncryptionContext.h

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <cstdint>
22+
#include <string>
23+
#include <unordered_map>
24+
25+
#include "CompressionType.h"
26+
#include "defines.h"
27+
28+
namespace pulsar {
29+
30+
namespace proto {
31+
class MessageMetadata;
32+
}
33+
34+
class Message;
35+
36+
struct PULSAR_PUBLIC EncryptionKey {
37+
std::string keyValue;
38+
std::unordered_map<std::string, std::string> metadata;
39+
};
40+
41+
/**
42+
* It contains encryption and compression information in it using which application can decrypt consumed
43+
* message with encrypted-payload.
44+
*/
45+
class PULSAR_PUBLIC EncryptionContext {
46+
public:
47+
explicit EncryptionContext()
48+
: compressionType_(CompressionNone),
49+
uncompressedMessageSize_(0),
50+
batchSize_(-1),
51+
isDecryptionFailed_(false) {}
52+
EncryptionContext(const EncryptionContext&) = default;
53+
EncryptionContext(const proto::MessageMetadata& metadata, bool isDecryptionFailed);
54+
55+
using KeysType = std::unordered_map<std::string, EncryptionKey>;
56+
const KeysType& keys() const noexcept { return keys_; }
57+
const std::string& param() const noexcept { return param_; }
58+
const std::string& algorithm() const noexcept { return algorithm_; }
59+
CompressionType compressionType() const noexcept { return compressionType_; }
60+
uint32_t uncompressedMessageSize() const noexcept { return uncompressedMessageSize_; }
61+
// Returns the batch size if the message is part of a batch, -1 otherwise
62+
int32_t batchSize() const noexcept { return batchSize_; }
63+
64+
/**
65+
* When the `ConsumerConfiguration#getCryptoFailureAction` is set to `CONSUME`, the message will still be
66+
* returned even if the decryption failed, in this case, the message payload is still not decrypted but
67+
* users have no way to know that. This method is provided to let users know whether the decryption
68+
* failed.
69+
*
70+
* @return whether the decryption failed
71+
*/
72+
bool isDecryptionFailed() const noexcept { return isDecryptionFailed_; }
73+
74+
static const EncryptionContext& empty() {
75+
static EncryptionContext context;
76+
return context;
77+
}
78+
79+
private:
80+
const KeysType keys_;
81+
const std::string param_;
82+
const std::string algorithm_;
83+
const CompressionType compressionType_;
84+
const uint32_t uncompressedMessageSize_;
85+
const int32_t batchSize_;
86+
const bool isDecryptionFailed_;
87+
88+
friend class MessageImpl;
89+
};
90+
91+
} // namespace pulsar

include/pulsar/Message.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
#ifndef MESSAGE_HPP_
2020
#define MESSAGE_HPP_
2121

22+
#include <pulsar/EncryptionContext.h>
2223
#include <pulsar/defines.h>
2324

2425
#include <map>
2526
#include <memory>
27+
#include <optional>
2628
#include <string>
2729

2830
#include "KeyValue.h"
@@ -202,6 +204,12 @@ class PULSAR_PUBLIC Message {
202204
*/
203205
const std::string& getProducerName() const noexcept;
204206

207+
/**
208+
* @return the optional encryption context that is present when the message is encrypted, the pointer is
209+
* valid as the Message instance is alive
210+
*/
211+
std::optional<const EncryptionContext*> getEncryptionContext() const;
212+
205213
bool operator==(const Message& msg) const;
206214

207215
protected:

lib/ConsumerImpl.cc

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -548,25 +548,21 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
548548
bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata,
549549
proto::MessageMetadata& metadata, SharedBuffer& payload) {
550550
LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes());
551-
552-
if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
553-
// Message was discarded or not consumed due to decryption failure
554-
return;
555-
}
556-
557551
if (!isChecksumValid) {
558552
// Message discarded for checksum error
559553
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch);
560554
return;
561555
}
562556

563-
auto redeliveryCount = msg.redelivery_count();
564-
const bool isMessageUndecryptable =
565-
metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() &&
566-
config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME;
557+
auto decryptResult = decryptMessageIfNeeded(cnx, msg, metadata, payload);
558+
if (decryptResult == FAILED) {
559+
// Message was discarded due to decryption failure or not consumed due to decryption failure
560+
return;
561+
}
567562

563+
auto redeliveryCount = msg.redelivery_count();
568564
const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
569-
if (!isMessageUndecryptable && !isChunkedMessage) {
565+
if (decryptResult == DECRYPTED && !isChunkedMessage) {
570566
if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) {
571567
// Message was discarded on decompression error
572568
return;
@@ -586,6 +582,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
586582
}
587583
}
588584

585+
// TODO: create encryption context
589586
Message m(messageId, brokerEntryMetadata, metadata, payload);
590587
m.impl_->cnx_ = cnx.get();
591588
m.impl_->setTopicName(getTopicPtr());
@@ -812,17 +809,18 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
812809
return batchSize - skippedMessages;
813810
}
814811

815-
bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
816-
const proto::MessageMetadata& metadata, SharedBuffer& payload) {
817-
if (!metadata.encryption_keys_size()) {
818-
return true;
812+
auto ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
813+
const proto::MessageMetadata& metadata, SharedBuffer& payload)
814+
-> DecryptResult {
815+
if (metadata.encryption_keys_size() == 0) {
816+
return DECRYPTED;
819817
}
820818

821819
// If KeyReader is not configured throw exception based on config param
822820
if (!config_.isEncryptionEnabled()) {
823821
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
824822
LOG_WARN(getName() << "CryptoKeyReader is not implemented. Consuming encrypted message.");
825-
return true;
823+
return CONSUME_ENCRYPTED;
826824
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
827825
LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config "
828826
"is set to discard");
@@ -833,20 +831,20 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
833831
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
834832
unAckedMessageTrackerPtr_->add(messageId);
835833
}
836-
return false;
834+
return FAILED;
837835
}
838836

839837
SharedBuffer decryptedPayload;
840838
if (msgCrypto_->decrypt(metadata, payload, config_.getCryptoKeyReader(), decryptedPayload)) {
841839
payload = decryptedPayload;
842-
return true;
840+
return DECRYPTED;
843841
}
844842

845843
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
846844
// Note, batch message will fail to consume even if config is set to consume
847845
LOG_WARN(
848846
getName() << "Decryption failed. Consuming encrypted message since config is set to consume.");
849-
return true;
847+
return CONSUME_ENCRYPTED;
850848
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
851849
LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard");
852850
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError);
@@ -855,7 +853,7 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
855853
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
856854
unAckedMessageTrackerPtr_->add(messageId);
857855
}
858-
return false;
856+
return FAILED;
859857
}
860858

861859
bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,

lib/ConsumerImpl.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,14 @@ class ConsumerImpl : public ConsumerImplBase {
195195
bool isPriorEntryIndex(int64_t idx);
196196
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, const BrokerConsumerStatsCallback&);
197197

198-
bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
199-
const proto::MessageMetadata& metadata, SharedBuffer& payload);
198+
enum DecryptResult
199+
{
200+
DECRYPTED,
201+
CONSUME_ENCRYPTED,
202+
FAILED
203+
};
204+
DecryptResult decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
205+
const proto::MessageMetadata& metadata, SharedBuffer& payload);
200206

201207
// TODO - Convert these functions to lambda when we move to C++11
202208
Result receiveHelper(Message& msg);

lib/EncryptionContext.cc

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <pulsar/EncryptionContext.h>
20+
21+
#include <unordered_map>
22+
23+
#include "PulsarApi.pb.h"
24+
25+
namespace pulsar {
26+
27+
static EncryptionContext::KeysType encryptedKeysFromMetadata(const proto::MessageMetadata& msgMetadata) {
28+
EncryptionContext::KeysType keys;
29+
for (auto&& key : msgMetadata.encryption_keys()) {
30+
decltype(EncryptionKey::metadata) keyMetadata;
31+
for (auto&& entry : key.metadata()) {
32+
keyMetadata[entry.key()] = entry.value();
33+
}
34+
keys[key.key()] = EncryptionKey{key.value(), keyMetadata};
35+
}
36+
return keys;
37+
}
38+
39+
EncryptionContext::EncryptionContext(const proto::MessageMetadata& msgMetadata, bool isDecryptionFailed)
40+
41+
: keys_(encryptedKeysFromMetadata(msgMetadata)),
42+
param_(msgMetadata.encryption_param()),
43+
algorithm_(msgMetadata.encryption_algo()),
44+
compressionType_(static_cast<CompressionType>(msgMetadata.compression())),
45+
uncompressedMessageSize_(msgMetadata.uncompressed_size()),
46+
batchSize_(msgMetadata.has_num_messages_in_batch() ? msgMetadata.num_messages_in_batch() : -1),
47+
isDecryptionFailed_(isDecryptionFailed) {}
48+
49+
} // namespace pulsar

lib/Message.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19+
#include <pulsar/EncryptionContext.h>
1920
#include <pulsar/Message.h>
2021
#include <pulsar/MessageBuilder.h>
2122
#include <pulsar/MessageIdBuilder.h>
@@ -220,6 +221,13 @@ const std::string& Message::getProducerName() const noexcept {
220221
return impl_->metadata.producer_name();
221222
}
222223

224+
std::optional<const EncryptionContext*> Message::getEncryptionContext() const {
225+
if (!impl_ || !impl_->encryptionContext_.has_value()) {
226+
return std::nullopt;
227+
}
228+
return {&(*impl_->encryptionContext_)};
229+
}
230+
223231
bool Message::operator==(const Message& msg) const { return getMessageId() == msg.getMessageId(); }
224232

225233
KeyValue Message::getKeyValueData() const { return KeyValue(impl_->keyValuePtr); }

lib/MessageImpl.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,18 @@
1717
* under the License.
1818
*/
1919
#include "MessageImpl.h"
20+
#include <utility>
2021

2122
namespace pulsar {
2223

24+
MessageImpl::MessageImpl(const MessageId& messageId, const proto::MessageMetadata& metadata,
25+
const proto::BrokerEntryMetadata& brokerEntryMetadata, const SharedBuffer& payload,
26+
const optional<proto::SingleMessageMetadata>& singleMetadata,
27+
const std::shared_ptr<std::string>& topicName, bool undecryptedPayload)
28+
: encryptionContext_(std::in_place, metadata, undecryptedPayload) {
29+
// TODO:
30+
}
31+
2332
const Message::StringMap& MessageImpl::properties() {
2433
if (properties_.size() == 0) {
2534
for (int i = 0; i < metadata.properties_size(); i++) {

lib/MessageImpl.h

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,19 @@
1919
#ifndef LIB_MESSAGEIMPL_H_
2020
#define LIB_MESSAGEIMPL_H_
2121

22+
#include <pulsar/EncryptionContext.h>
2223
#include <pulsar/Message.h>
2324
#include <pulsar/MessageId.h>
2425

26+
#include <memory>
27+
#include <optional>
28+
2529
#include "KeyValueImpl.h"
2630
#include "PulsarApi.pb.h"
2731
#include "SharedBuffer.h"
2832

29-
using namespace pulsar;
33+
using std::optional;
34+
3035
namespace pulsar {
3136

3237
class PulsarWrapper;
@@ -35,19 +40,14 @@ class BatchMessageContainer;
3540

3641
class MessageImpl {
3742
public:
38-
const Message::StringMap& properties();
43+
// TODO: remove this default constructor
44+
explicit MessageImpl() : encryptionContext_(std::nullopt) {}
45+
MessageImpl(const MessageId& messageId, const proto::MessageMetadata& metadata,
46+
const proto::BrokerEntryMetadata& brokerEntryMetadata, const SharedBuffer& payload,
47+
const optional<proto::SingleMessageMetadata>& singleMetadata,
48+
const std::shared_ptr<std::string>& topicName, bool undecryptedPayload);
3949

40-
proto::BrokerEntryMetadata brokerEntryMetadata;
41-
proto::MessageMetadata metadata;
42-
SharedBuffer payload;
43-
std::shared_ptr<KeyValueImpl> keyValuePtr;
44-
MessageId messageId;
45-
ClientConnection* cnx_;
46-
std::shared_ptr<std::string> topicName_;
47-
int redeliveryCount_;
48-
bool hasSchemaVersion_;
49-
const std::string* schemaVersion_;
50-
std::weak_ptr<class ConsumerImpl> consumerPtr_;
50+
const Message::StringMap& properties();
5151

5252
const std::string& getPartitionKey() const;
5353
bool hasPartitionKey() const;
@@ -81,6 +81,19 @@ class MessageImpl {
8181
friend class PulsarWrapper;
8282
friend class MessageBuilder;
8383

84+
proto::BrokerEntryMetadata brokerEntryMetadata;
85+
proto::MessageMetadata metadata;
86+
SharedBuffer payload;
87+
std::shared_ptr<KeyValueImpl> keyValuePtr;
88+
MessageId messageId;
89+
ClientConnection* cnx_;
90+
std::shared_ptr<std::string> topicName_;
91+
int redeliveryCount_;
92+
bool hasSchemaVersion_;
93+
const std::string* schemaVersion_;
94+
std::weak_ptr<class ConsumerImpl> consumerPtr_;
95+
const optional<EncryptionContext> encryptionContext_;
96+
8497
private:
8598
void setReplicationClusters(const std::vector<std::string>& clusters);
8699
void setProperty(const std::string& name, const std::string& value);

0 commit comments

Comments
 (0)