Skip to content

Commit a5bbff5

Browse files
committed
fix tests
1 parent dffd685 commit a5bbff5

File tree

3 files changed

+74
-36
lines changed

3 files changed

+74
-36
lines changed

lib/ConsumerImpl.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,8 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
608608
return;
609609
}
610610

611-
if (metadata.has_num_messages_in_batch()) {
611+
// When the decryption failed, the whole batch message will be treated as a single message.
612+
if (metadata.has_num_messages_in_batch() && decryptResult == DECRYPTED) {
612613
BitSet::Data words(msg.ack_set_size());
613614
for (int i = 0; i < words.size(); i++) {
614615
words[i] = msg.ack_set(i);

tests/EncryptionTests.cc

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,70 +20,105 @@
2020
#include <pulsar/Client.h>
2121
#include <pulsar/ConsumerCryptoFailureAction.h>
2222

23+
#include <stdexcept>
24+
25+
#include "PulsarApi.pb.h"
26+
#include "lib/CompressionCodec.h"
27+
#include "lib/MessageCrypto.h"
28+
#include "lib/SharedBuffer.h"
29+
#include "tests/PulsarFriend.h"
30+
2331
static std::string lookupUrl = "pulsar://localhost:6650";
2432

2533
using namespace pulsar;
2634

35+
static CryptoKeyReaderPtr getDefaultCryptoKeyReader() {
36+
return std::make_shared<DefaultCryptoKeyReader>(TEST_CONF_DIR "/public-key.client-rsa.pem",
37+
TEST_CONF_DIR "/private-key.client-rsa.pem");
38+
}
39+
40+
static std::vector<std::string> decryptValue(const Message& message) {
41+
if (!message.getEncryptionContext().has_value()) {
42+
return {message.getDataAsString()};
43+
}
44+
auto context = message.getEncryptionContext().value();
45+
if (!context->isDecryptionFailed()) {
46+
return {message.getDataAsString()};
47+
}
48+
49+
MessageCrypto crypto{"test", false};
50+
auto msgImpl = PulsarFriend::getMessageImplPtr(message);
51+
SharedBuffer decryptedPayload;
52+
// TODO: change the parameters to get context from EncryptionContext directly
53+
if (!crypto.decrypt(msgImpl->metadata, msgImpl->payload, getDefaultCryptoKeyReader(), decryptedPayload)) {
54+
throw std::runtime_error("Decryption failed");
55+
}
56+
57+
SharedBuffer uncompressedPayload;
58+
if (!CompressionCodecProvider::getCodec(context->compressionType())
59+
.decode(decryptedPayload, context->uncompressedMessageSize(), uncompressedPayload)) {
60+
throw std::runtime_error("Decompression failed");
61+
}
62+
63+
std::vector<std::string> values;
64+
if (auto batchSize = message.getEncryptionContext().value()->batchSize(); batchSize > 0) {
65+
for (decltype(batchSize) i = 0; i < batchSize; i++) {
66+
auto singleMetaSize = uncompressedPayload.readUnsignedInt();
67+
proto::SingleMessageMetadata singleMeta;
68+
singleMeta.ParseFromArray(uncompressedPayload.data(), singleMetaSize);
69+
uncompressedPayload.consume(singleMetaSize);
70+
71+
auto payload = uncompressedPayload.slice(0, singleMeta.payload_size());
72+
uncompressedPayload.consume(payload.readableBytes());
73+
values.emplace_back(payload.data(), payload.readableBytes());
74+
}
75+
} else {
76+
// non-batched message
77+
values.emplace_back(uncompressedPayload.data(), uncompressedPayload.readableBytes());
78+
}
79+
return values;
80+
}
81+
2782
TEST(EncryptionTests, testEncryptionContext) {
2883
Client client{lookupUrl};
2984
std::string topic = "test-encryption-context-" + std::to_string(time(nullptr));
3085

3186
ProducerConfiguration producerConf;
3287
producerConf.setCompressionType(CompressionLZ4);
3388
producerConf.addEncryptionKey("client-rsa.pem");
34-
producerConf.setCryptoKeyReader(std::make_shared<DefaultCryptoKeyReader>(
35-
TEST_CONF_DIR "/public-key.client-rsa.pem", TEST_CONF_DIR "/private-key.client-rsa.pem"));
36-
// TODO: enable batching
37-
producerConf.setBatchingEnabled(false);
89+
producerConf.setCryptoKeyReader(getDefaultCryptoKeyReader());
3890

3991
Producer producer;
4092
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
4193

94+
std::vector<std::string> sentValues;
95+
auto send = [&producer, &sentValues](const std::string& value) {
96+
Message msg = MessageBuilder().setContent(value).build();
97+
producer.sendAsync(msg, nullptr);
98+
sentValues.emplace_back(value);
99+
};
100+
42101
for (int i = 0; i < 5; i++) {
43-
producer.sendAsync(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), nullptr);
102+
send("msg-" + std::to_string(i));
44103
}
45104
producer.flush();
46-
producer.send(MessageBuilder().setContent("last-msg").build());
105+
send("last-msg");
47106

48107
ConsumerConfiguration consumerConf;
49108
consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest);
50109
consumerConf.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
51110
Consumer consumer;
52111
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumerConf, consumer));
53112

54-
std::vector<Message> messages;
55-
for (int i = 0; i < 6; i++) {
113+
std::vector<std::string> values;
114+
for (int i = 0; i < 2; i++) {
56115
Message msg;
57116
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
58-
messages.emplace_back(msg);
59-
}
60-
61-
// TODO: improve it
62-
for (int i = 0; i < 6; i++) {
63-
auto context = messages[i].getEncryptionContext();
64-
if (context.has_value()) {
65-
std::cout << i << " keys: ";
66-
for (auto&& key : context.value()->keys()) {
67-
std::cout << key.first << " => " << key.second.value;
68-
if (!key.second.metadata.empty()) {
69-
std::cout << " (metadata: ";
70-
for (auto&& entry : key.second.metadata) {
71-
std::cout << entry.first << "=" << entry.second << " ";
72-
}
73-
std::cout << ")";
74-
}
75-
std::cout << " ";
76-
}
77-
std::cout << ", algorithm: " << context.value()->algorithm()
78-
<< ", param: " << context.value()->param()
79-
<< ", compressionType: " << context.value()->compressionType()
80-
<< ", uncompressedMessageSize: " << context.value()->uncompressedMessageSize()
81-
<< ", batchSize: " << context.value()->batchSize()
82-
<< ", isDecryptionFailed: " << context.value()->isDecryptionFailed() << "\n";
83-
} else {
84-
std::cerr << i << " no encryption context\n";
117+
for (auto&& value : decryptValue(msg)) {
118+
values.emplace_back(value);
85119
}
86120
}
121+
ASSERT_EQ(values, sentValues);
87122

88123
client.close();
89124
}

tests/PulsarFriend.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,8 @@ class PulsarFriend {
217217
return waitUntil(std::chrono::seconds(3),
218218
[producerImpl] { return !producerImpl->getCnx().expired(); });
219219
}
220+
221+
static auto getMessageImplPtr(const Message& message) { return message.impl_; }
220222
};
221223
} // namespace pulsar
222224

0 commit comments

Comments
 (0)