Skip to content

Commit 5775987

Browse files
committed
simplify the implementation
1 parent 27e0092 commit 5775987

File tree

11 files changed

+393
-49
lines changed

11 files changed

+393
-49
lines changed

include/pulsar/EncryptionContext.h

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#pragma once
20+
21+
#include <cstdint>
22+
#include <string>
23+
#include <unordered_map>
24+
#include <vector>
25+
26+
#include "CompressionType.h"
27+
#include "defines.h"
28+
29+
namespace pulsar {
30+
31+
namespace proto {
32+
class MessageMetadata;
33+
}
34+
35+
class Message;
36+
37+
struct PULSAR_PUBLIC EncryptionKey {
38+
std::string key;
39+
std::string value;
40+
std::unordered_map<std::string, std::string> metadata;
41+
42+
explicit EncryptionKey() = default;
43+
44+
EncryptionKey(const std::string& key, const std::string& value,
45+
const decltype(EncryptionKey::metadata)& metadata)
46+
: key(key), value(value), metadata(metadata) {}
47+
};
48+
49+
/**
50+
* It contains encryption and compression information in it using which application can decrypt consumed
51+
* message with encrypted-payload.
52+
*/
53+
class PULSAR_PUBLIC EncryptionContext {
54+
public:
55+
explicit EncryptionContext() = default;
56+
EncryptionContext(const EncryptionContext&) = default;
57+
EncryptionContext(EncryptionContext&&) noexcept = default;
58+
EncryptionContext& operator=(const EncryptionContext&) = default;
59+
EncryptionContext& operator=(EncryptionContext&&) noexcept = default;
60+
61+
using KeysType = std::vector<EncryptionKey>;
62+
63+
/**
64+
* @return the map of encryption keys used for the message
65+
*/
66+
const KeysType& keys() const noexcept { return keys_; }
67+
68+
/**
69+
* @return the encryption parameter used for the message
70+
*/
71+
const std::string& param() const noexcept { return param_; }
72+
73+
/**
74+
* @return the encryption algorithm used for the message
75+
*/
76+
const std::string& algorithm() const noexcept { return algorithm_; }
77+
78+
/**
79+
* @return the compression type used for the message
80+
*/
81+
CompressionType compressionType() const noexcept { return compressionType_; }
82+
83+
/**
84+
* @return the uncompressed message size if the message is compressed, 0 otherwise
85+
*/
86+
uint32_t uncompressedMessageSize() const noexcept { return uncompressedMessageSize_; }
87+
88+
/**
89+
* @return the batch size if the message is part of a batch, -1 otherwise
90+
*/
91+
int32_t batchSize() const noexcept { return batchSize_; }
92+
93+
/**
94+
* When the `ConsumerConfiguration#getCryptoFailureAction` is set to `CONSUME`, the message will still be
95+
* returned even if the decryption failed, in this case, the message payload is still not decrypted but
96+
* users have no way to know that. This method is provided to let users know whether the decryption
97+
* failed.
98+
*
99+
* @return whether the decryption failed
100+
*/
101+
bool isDecryptionFailed() const noexcept { return isDecryptionFailed_; }
102+
103+
// It should be used only internally but it's exposed so that `std::make_optional` can construct the
104+
// object in place with this constructor.
105+
EncryptionContext(const proto::MessageMetadata& metadata, bool isDecryptionFailed);
106+
107+
private:
108+
KeysType keys_;
109+
std::string param_;
110+
std::string algorithm_;
111+
CompressionType compressionType_{CompressionNone};
112+
uint32_t uncompressedMessageSize_{0};
113+
int32_t batchSize_{-1};
114+
bool isDecryptionFailed_{false};
115+
116+
friend class ConsumerImpl;
117+
};
118+
119+
} // namespace pulsar

include/pulsar/Message.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
#ifndef MESSAGE_HPP_
2020
#define MESSAGE_HPP_
2121

22+
#include <pulsar/EncryptionContext.h>
2223
#include <pulsar/defines.h>
2324

2425
#include <map>
2526
#include <memory>
27+
#include <optional>
2628
#include <string>
2729

2830
#include "KeyValue.h"
@@ -202,6 +204,12 @@ class PULSAR_PUBLIC Message {
202204
*/
203205
const std::string& getProducerName() const noexcept;
204206

207+
/**
208+
* @return the optional encryption context that is present when the message is encrypted, the pointer is
209+
* valid as the Message instance is alive
210+
*/
211+
std::optional<const EncryptionContext*> getEncryptionContext() const;
212+
205213
bool operator==(const Message& msg) const;
206214

207215
protected:

lib/ConsumerImpl.cc

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "ConsumerImpl.h"
2020

2121
#include <pulsar/DeadLetterPolicyBuilder.h>
22+
#include <pulsar/EncryptionContext.h>
2223
#include <pulsar/MessageIdBuilder.h>
2324

2425
#include <algorithm>
@@ -549,24 +550,27 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
549550
proto::MessageMetadata& metadata, SharedBuffer& payload) {
550551
LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes());
551552

552-
if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
553-
// Message was discarded or not consumed due to decryption failure
554-
return;
555-
}
556-
557553
if (!isChecksumValid) {
558554
// Message discarded for checksum error
559555
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_ChecksumMismatch);
560556
return;
561557
}
562558

563-
auto redeliveryCount = msg.redelivery_count();
564-
const bool isMessageUndecryptable =
565-
metadata.encryption_keys_size() > 0 && !config_.getCryptoKeyReader().get() &&
566-
config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME;
559+
auto encryptionContext = metadata.encryption_keys_size() > 0
560+
? optional<EncryptionContext>(std::in_place, metadata, false)
561+
: std::nullopt;
562+
const auto decryptionResult = decryptMessageIfNeeded(cnx, msg, encryptionContext, payload);
563+
if (decryptionResult == FAILED) {
564+
// Message was discarded or not consumed due to decryption failure
565+
return;
566+
} else if (decryptionResult == CONSUME_ENCRYPTED && encryptionContext.has_value()) {
567+
// Message is encrypted, but we let the application consume it as-is
568+
encryptionContext->isDecryptionFailed_ = true;
569+
}
567570

571+
auto redeliveryCount = msg.redelivery_count();
568572
const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
569-
if (!isMessageUndecryptable && !isChunkedMessage) {
573+
if (decryptionResult == SUCCESS && !isChunkedMessage) {
570574
if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) {
571575
// Message was discarded on decompression error
572576
return;
@@ -590,6 +594,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
590594
m.impl_->cnx_ = cnx.get();
591595
m.impl_->setTopicName(getTopicPtr());
592596
m.impl_->setRedeliveryCount(msg.redelivery_count());
597+
m.impl_->encryptionContext_ = std::move(encryptionContext);
593598

594599
if (metadata.has_schema_version()) {
595600
m.impl_->setSchemaVersion(metadata.schema_version());
@@ -610,7 +615,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
610615
return;
611616
}
612617

613-
if (metadata.has_num_messages_in_batch()) {
618+
if (metadata.has_num_messages_in_batch() && decryptionResult == SUCCESS) {
614619
BitSet::Data words(msg.ack_set_size());
615620
for (int i = 0; i < words.size(); i++) {
616621
words[i] = msg.ack_set(i);
@@ -812,17 +817,18 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
812817
return batchSize - skippedMessages;
813818
}
814819

815-
bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
816-
const proto::MessageMetadata& metadata, SharedBuffer& payload) {
817-
if (!metadata.encryption_keys_size()) {
818-
return true;
820+
auto ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
821+
const optional<EncryptionContext>& context, SharedBuffer& payload)
822+
-> DecryptionResult {
823+
if (!context.has_value()) {
824+
return SUCCESS;
819825
}
820826

821827
// If KeyReader is not configured throw exception based on config param
822828
if (!config_.isEncryptionEnabled()) {
823829
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
824830
LOG_WARN(getName() << "CryptoKeyReader is not implemented. Consuming encrypted message.");
825-
return true;
831+
return CONSUME_ENCRYPTED;
826832
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
827833
LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config "
828834
"is set to discard");
@@ -833,20 +839,20 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
833839
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
834840
unAckedMessageTrackerPtr_->add(messageId);
835841
}
836-
return false;
842+
return FAILED;
837843
}
838844

839845
SharedBuffer decryptedPayload;
840-
if (msgCrypto_->decrypt(metadata, payload, config_.getCryptoKeyReader(), decryptedPayload)) {
846+
if (msgCrypto_->decrypt(*context, payload, config_.getCryptoKeyReader(), decryptedPayload)) {
841847
payload = decryptedPayload;
842-
return true;
848+
return SUCCESS;
843849
}
844850

845851
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
846852
// Note, batch message will fail to consume even if config is set to consume
847853
LOG_WARN(
848854
getName() << "Decryption failed. Consuming encrypted message since config is set to consume.");
849-
return true;
855+
return CONSUME_ENCRYPTED;
850856
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
851857
LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard");
852858
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError);
@@ -855,7 +861,7 @@ bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
855861
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
856862
unAckedMessageTrackerPtr_->add(messageId);
857863
}
858-
return false;
864+
return FAILED;
859865
}
860866

861867
bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,

lib/ConsumerImpl.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,15 @@ class ConsumerImpl : public ConsumerImplBase {
195195
bool isPriorEntryIndex(int64_t idx);
196196
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, const BrokerConsumerStatsCallback&);
197197

198-
bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
199-
const proto::MessageMetadata& metadata, SharedBuffer& payload);
198+
enum DecryptionResult
199+
{
200+
SUCCESS,
201+
CONSUME_ENCRYPTED,
202+
FAILED
203+
};
204+
DecryptionResult decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
205+
const optional<EncryptionContext>& context,
206+
SharedBuffer& payload);
200207

201208
// TODO - Convert these functions to lambda when we move to C++11
202209
Result receiveHelper(Message& msg);

lib/EncryptionContext.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <pulsar/EncryptionContext.h>
20+
21+
#include "PulsarApi.pb.h"
22+
23+
namespace pulsar {
24+
25+
static EncryptionContext::KeysType encryptedKeysFromMetadata(const proto::MessageMetadata& msgMetadata) {
26+
EncryptionContext::KeysType keys;
27+
for (auto&& key : msgMetadata.encryption_keys()) {
28+
decltype(EncryptionKey::metadata) metadata;
29+
for (int i = 0; i < key.metadata_size(); i++) {
30+
const auto& entry = key.metadata(i);
31+
metadata[entry.key()] = entry.value();
32+
}
33+
keys.emplace_back(key.key(), key.value(), std::move(metadata));
34+
}
35+
return keys;
36+
}
37+
38+
EncryptionContext::EncryptionContext(const proto::MessageMetadata& msgMetadata, bool isDecryptionFailed)
39+
40+
: keys_(encryptedKeysFromMetadata(msgMetadata)),
41+
param_(msgMetadata.encryption_param()),
42+
algorithm_(msgMetadata.encryption_algo()),
43+
compressionType_(static_cast<CompressionType>(msgMetadata.compression())),
44+
uncompressedMessageSize_(msgMetadata.uncompressed_size()),
45+
batchSize_(msgMetadata.has_num_messages_in_batch() ? msgMetadata.num_messages_in_batch() : -1),
46+
isDecryptionFailed_(isDecryptionFailed) {}
47+
48+
} // namespace pulsar

lib/Message.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,13 @@ const std::string& Message::getProducerName() const noexcept {
220220
return impl_->metadata.producer_name();
221221
}
222222

223+
std::optional<const EncryptionContext*> Message::getEncryptionContext() const {
224+
if (!impl_ || !impl_->encryptionContext_.has_value()) {
225+
return std::nullopt;
226+
}
227+
return &impl_->encryptionContext_.value();
228+
}
229+
223230
bool Message::operator==(const Message& msg) const { return getMessageId() == msg.getMessageId(); }
224231

225232
KeyValue Message::getKeyValueData() const { return KeyValue(impl_->keyValuePtr); }

0 commit comments

Comments
 (0)