Skip to content

Commit 8197100

Browse files
committed
Support ConsumerCryptoFailureAction for consumer and reader
1 parent 6df05a1 commit 8197100

File tree

4 files changed

+107
-4
lines changed

4 files changed

+107
-4
lines changed

pulsar/__init__.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949

5050
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
5151
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \
52-
DeadLetterPolicyBuilder # noqa: F401
52+
DeadLetterPolicyBuilder, ConsumerCryptoFailureAction # noqa: F401
5353

5454
from pulsar.__about__ import __version__
5555

@@ -846,6 +846,7 @@ def subscribe(self, topic, subscription_name,
846846
batch_index_ack_enabled=False,
847847
regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly,
848848
dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None,
849+
crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
849850
):
850851
"""
851852
Subscribe to the given topic and subscription combination.
@@ -949,6 +950,19 @@ def my_listener(consumer, message):
949950
stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
950951
exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
951952
automatically.
953+
crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
954+
Set the behavior when the decryption fails. The default is to fail the message.
955+
956+
Supported actions:
957+
958+
* ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
959+
* ConsumerCryptoFailureAction.DISCARD:
960+
Message is silently acknowledged and not delivered to the application.
961+
* ConsumerCryptoFailureAction.CONSUME:
962+
Deliver the encrypted message to the application. It's the application's responsibility
963+
to decrypt the message. If message is also compressed, decompression will fail. If the
964+
message contains batch messages, client will not be able to retrieve individual messages
965+
in the batch.
952966
"""
953967
_check_type(str, subscription_name, 'subscription_name')
954968
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -972,6 +986,7 @@ def my_listener(consumer, message):
972986
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
973987
_check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
974988
_check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode')
989+
_check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
975990

976991
conf = _pulsar.ConsumerConfiguration()
977992
conf.consumer_type(consumer_type)
@@ -1010,6 +1025,7 @@ def my_listener(consumer, message):
10101025
conf.batch_index_ack_enabled(batch_index_ack_enabled)
10111026
if dead_letter_policy:
10121027
conf.dead_letter_policy(dead_letter_policy.policy())
1028+
conf.crypto_failure_action(crypto_failure_action)
10131029

10141030
c = Consumer()
10151031
if isinstance(topic, str):
@@ -1038,7 +1054,8 @@ def create_reader(self, topic, start_message_id,
10381054
subscription_role_prefix=None,
10391055
is_read_compacted=False,
10401056
crypto_key_reader: Union[None, CryptoKeyReader] = None,
1041-
start_message_id_inclusive=False
1057+
start_message_id_inclusive=False,
1058+
crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
10421059
):
10431060
"""
10441061
Create a reader on a particular topic
@@ -1099,6 +1116,19 @@ def my_listener(reader, message):
10991116
and private key decryption messages for the consumer
11001117
start_message_id_inclusive: bool, default=False
11011118
Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
1119+
crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
1120+
Set the behavior when the decryption fails. The default is to fail the message.
1121+
1122+
Supported actions:
1123+
1124+
* ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
1125+
* ConsumerCryptoFailureAction.DISCARD:
1126+
Message is silently acknowledged and not delivered to the application.
1127+
* ConsumerCryptoFailureAction.CONSUME:
1128+
Deliver the encrypted message to the application. It's the application's responsibility
1129+
to decrypt the message. If message is also compressed, decompression will fail. If the
1130+
message contains batch messages, client will not be able to retrieve individual messages
1131+
in the batch.
11021132
"""
11031133

11041134
# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
@@ -1114,6 +1144,7 @@ def my_listener(reader, message):
11141144
_check_type(bool, is_read_compacted, 'is_read_compacted')
11151145
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
11161146
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
1147+
_check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
11171148

11181149
conf = _pulsar.ReaderConfiguration()
11191150
if reader_listener:
@@ -1128,6 +1159,7 @@ def my_listener(reader, message):
11281159
if crypto_key_reader:
11291160
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
11301161
conf.start_message_id_inclusive(start_message_id_inclusive)
1162+
conf.crypto_failure_action(crypto_failure_action)
11311163

11321164
c = Reader()
11331165
c._reader = self._client.create_reader(topic, start_message_id, conf)

src/config.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,11 @@ void export_config(py::module_& m) {
313313
.def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
314314
return_value_policy::reference)
315315
.def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
316-
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
316+
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy)
317+
.def("crypto_failure_action", &ConsumerConfiguration::getCryptoFailureAction,
318+
return_value_policy::copy)
319+
.def("crypto_failure_action", &ConsumerConfiguration::setCryptoFailureAction,
320+
return_value_policy::reference);
317321

318322
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
319323
.def(init<>())
@@ -331,5 +335,9 @@ void export_config(py::module_& m) {
331335
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
332336
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
333337
.def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
334-
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
338+
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference)
339+
.def("crypto_failure_action", &ReaderConfiguration::getCryptoFailureAction,
340+
return_value_policy::copy)
341+
.def("crypto_failure_action", &ReaderConfiguration::setCryptoFailureAction,
342+
return_value_policy::reference);
335343
}

src/enums.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "utils.h"
2020
#include <pulsar/CompressionType.h>
2121
#include <pulsar/ConsumerConfiguration.h>
22+
#include <pulsar/ConsumerCryptoFailureAction.h>
2223
#include <pulsar/ProducerConfiguration.h>
2324
#include <pulsar/KeySharedPolicy.h>
2425
#include <pybind11/pybind11.h>
@@ -140,4 +141,9 @@ void export_enums(py::module_& m) {
140141
.value("Info", Logger::LEVEL_INFO)
141142
.value("Warn", Logger::LEVEL_WARN)
142143
.value("Error", Logger::LEVEL_ERROR);
144+
145+
enum_<ConsumerCryptoFailureAction>(m, "ConsumerCryptoFailureAction")
146+
.value("FAIL", ConsumerCryptoFailureAction::FAIL)
147+
.value("DISCARD", ConsumerCryptoFailureAction::DISCARD)
148+
.value("CONSUME", ConsumerCryptoFailureAction::CONSUME);
143149
}

tests/pulsar_test.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,63 @@ def test_encryption(self):
482482

483483
client.close()
484484

485+
def test_encryption_failure(self):
486+
publicKeyPath = CERTS_DIR + "public-key.client-rsa.pem"
487+
privateKeyPath = CERTS_DIR + "private-key.client-rsa.pem"
488+
crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
489+
client = Client(self.serviceUrl)
490+
topic = "my-python-test-end-to-end-encryption-failure-" + str(time.time())
491+
producer = client.create_producer(
492+
topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
493+
)
494+
producer.send(b"msg-0")
495+
496+
def verify_next_message(value: bytes):
497+
consumer = client.subscribe(topic, subscription,
498+
crypto_key_reader=crypto_key_reader)
499+
msg = consumer.receive(3000)
500+
self.assertEqual(msg.data(), value)
501+
consumer.acknowledge(msg)
502+
consumer.close()
503+
504+
subscription = "my-sub"
505+
consumer = client.subscribe(topic, subscription,
506+
initial_position=InitialPosition.Earliest,
507+
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.FAIL)
508+
with self.assertRaises(pulsar.Timeout):
509+
consumer.receive(3000)
510+
consumer.close()
511+
producer.send(b"msg-1")
512+
verify_next_message(b"msg-0") # msg-0 won't be skipped
513+
514+
consumer = client.subscribe(topic, subscription,
515+
initial_position=InitialPosition.Earliest,
516+
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.DISCARD)
517+
with self.assertRaises(pulsar.Timeout):
518+
consumer.receive(3000)
519+
consumer.close()
520+
521+
producer.send(b"msg-2")
522+
verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD
523+
524+
# Encrypted messages will be consumed since the crypto failure action is CONSUME
525+
consumer = client.subscribe(topic, 'another-sub',
526+
initial_position=InitialPosition.Earliest,
527+
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
528+
for i in range(3):
529+
msg = consumer.receive(3000)
530+
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
531+
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
532+
533+
reader = client.create_reader(topic, MessageId.earliest,
534+
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
535+
for i in range(3):
536+
msg = reader.read_next(3000)
537+
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
538+
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
539+
540+
client.close()
541+
485542
def test_tls_auth3(self):
486543
authPlugin = "tls"
487544
authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)

0 commit comments

Comments
 (0)