Skip to content

Commit 99cf882

Browse files
committed
address copilot comments
1 parent ee909be commit 99cf882

File tree

3 files changed

+17
-18
lines changed

3 files changed

+17
-18
lines changed

include/pulsar/EncryptionContext.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,11 @@ namespace proto {
3232
class MessageMetadata;
3333
}
3434

35-
class Message;
36-
3735
struct PULSAR_PUBLIC EncryptionKey {
3836
std::string key;
3937
std::string value;
4038
std::unordered_map<std::string, std::string> metadata;
4139

42-
explicit EncryptionKey() = default;
43-
4440
EncryptionKey(const std::string& key, const std::string& value,
4541
const decltype(EncryptionKey::metadata)& metadata)
4642
: key(key), value(value), metadata(metadata) {}
@@ -95,8 +91,9 @@ class PULSAR_PUBLIC EncryptionContext {
9591
bool isDecryptionFailed() const noexcept { return isDecryptionFailed_; }
9692

9793
/**
98-
* It should be used only internally but it's exposed so that `std::make_optional` can construct the
99-
* object in place with this constructor.
94+
* This constructor is public to allow in-place construction via std::optional
95+
* (e.g., `std::optional<EncryptionContext>(std::in_place, metadata, false)`),
96+
* but should not be used directly in application code.
10097
*/
10198
EncryptionContext(const proto::MessageMetadata&, bool);
10299

@@ -109,6 +106,8 @@ class PULSAR_PUBLIC EncryptionContext {
109106
int32_t batchSize_{-1};
110107
bool isDecryptionFailed_{false};
111108

109+
void setDecryptionFailed(bool failed) noexcept { isDecryptionFailed_ = failed; }
110+
112111
friend class ConsumerImpl;
113112
};
114113

lib/ConsumerImpl.cc

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -560,17 +560,17 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
560560
? optional<EncryptionContext>(std::in_place, metadata, false)
561561
: std::nullopt;
562562
const auto decryptionResult = decryptMessageIfNeeded(cnx, msg, encryptionContext, payload);
563-
if (decryptionResult == FAILED) {
563+
if (decryptionResult == DecryptionResult::FAILED) {
564564
// Message was discarded or not consumed due to decryption failure
565565
return;
566-
} else if (decryptionResult == CONSUME_ENCRYPTED && encryptionContext.has_value()) {
566+
} else if (decryptionResult == DecryptionResult::CONSUME_ENCRYPTED && encryptionContext.has_value()) {
567567
// Message is encrypted, but we let the application consume it as-is
568-
encryptionContext->isDecryptionFailed_ = true;
568+
encryptionContext->setDecryptionFailed(true);
569569
}
570570

571571
auto redeliveryCount = msg.redelivery_count();
572572
const bool isChunkedMessage = metadata.num_chunks_from_msg() > 1;
573-
if (decryptionResult == SUCCESS && !isChunkedMessage) {
573+
if (decryptionResult == DecryptionResult::SUCCESS && !isChunkedMessage) {
574574
if (!uncompressMessageIfNeeded(cnx, msg.message_id(), metadata, payload, true)) {
575575
// Message was discarded on decompression error
576576
return;
@@ -615,7 +615,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
615615
return;
616616
}
617617

618-
if (metadata.has_num_messages_in_batch() && decryptionResult == SUCCESS) {
618+
if (metadata.has_num_messages_in_batch() && decryptionResult == DecryptionResult::SUCCESS) {
619619
BitSet::Data words(msg.ack_set_size());
620620
for (int i = 0; i < words.size(); i++) {
621621
words[i] = msg.ack_set(i);
@@ -821,14 +821,14 @@ auto ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
821821
const optional<EncryptionContext>& context, SharedBuffer& payload)
822822
-> DecryptionResult {
823823
if (!context.has_value()) {
824-
return SUCCESS;
824+
return DecryptionResult::SUCCESS;
825825
}
826826

827827
// If KeyReader is not configured throw exception based on config param
828828
if (!config_.isEncryptionEnabled()) {
829829
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
830830
LOG_WARN(getName() << "CryptoKeyReader is not implemented. Consuming encrypted message.");
831-
return CONSUME_ENCRYPTED;
831+
return DecryptionResult::CONSUME_ENCRYPTED;
832832
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
833833
LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config "
834834
"is set to discard");
@@ -839,20 +839,20 @@ auto ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
839839
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
840840
unAckedMessageTrackerPtr_->add(messageId);
841841
}
842-
return FAILED;
842+
return DecryptionResult::FAILED;
843843
}
844844

845845
SharedBuffer decryptedPayload;
846846
if (msgCrypto_->decrypt(*context, payload, config_.getCryptoKeyReader(), decryptedPayload)) {
847847
payload = decryptedPayload;
848-
return SUCCESS;
848+
return DecryptionResult::SUCCESS;
849849
}
850850

851851
if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) {
852852
// Note, batch message will fail to consume even if config is set to consume
853853
LOG_WARN(
854854
getName() << "Decryption failed. Consuming encrypted message since config is set to consume.");
855-
return CONSUME_ENCRYPTED;
855+
return DecryptionResult::CONSUME_ENCRYPTED;
856856
} else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) {
857857
LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard");
858858
discardCorruptedMessage(cnx, msg.message_id(), CommandAck_ValidationError_DecryptionError);
@@ -861,7 +861,7 @@ auto ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const
861861
auto messageId = MessageIdBuilder::from(msg.message_id()).build();
862862
unAckedMessageTrackerPtr_->add(messageId);
863863
}
864-
return FAILED;
864+
return DecryptionResult::FAILED;
865865
}
866866

867867
bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx,

lib/ConsumerImpl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ class ConsumerImpl : public ConsumerImplBase {
195195
bool isPriorEntryIndex(int64_t idx);
196196
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, const BrokerConsumerStatsCallback&);
197197

198-
enum DecryptionResult
198+
enum class DecryptionResult : uint8_t
199199
{
200200
SUCCESS,
201201
CONSUME_ENCRYPTED,

0 commit comments

Comments
 (0)