Skip to content

Commit 8446500

Browse files
committed
Merge branch 'main' into bewaremypower/tableview
2 parents 0dc7284 + 26ab32f commit 8446500

File tree

4 files changed

+223
-15
lines changed

4 files changed

+223
-15
lines changed

pulsar/__init__.py

Lines changed: 91 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949

5050
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
5151
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \
52-
DeadLetterPolicyBuilder # noqa: F401
52+
DeadLetterPolicyBuilder, ConsumerCryptoFailureAction # noqa: F401
5353

5454
from pulsar.__about__ import __version__
5555

@@ -82,7 +82,7 @@ class MessageId:
8282
"""
8383

8484
def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
85-
self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
85+
self._msg_id: _pulsar.MessageId = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
8686

8787
earliest = _pulsar.MessageId.earliest
8888
latest = _pulsar.MessageId.latest
@@ -112,6 +112,24 @@ def __str__(self) -> str:
112112
"""
113113
return str(self._msg_id)
114114

115+
def __eq__(self, other) -> bool:
116+
return self._msg_id == other._msg_id
117+
118+
def __ne__(self, other) -> bool:
119+
return self._msg_id != other._msg_id
120+
121+
def __le__(self, other) -> bool:
122+
return self._msg_id <= other._msg_id
123+
124+
def __lt__(self, other) -> bool:
125+
return self._msg_id < other._msg_id
126+
127+
def __ge__(self, other) -> bool:
128+
return self._msg_id >= other._msg_id
129+
130+
def __gt__(self, other) -> bool:
131+
return self._msg_id > other._msg_id
132+
115133
@staticmethod
116134
def deserialize(message_id_bytes):
117135
"""
@@ -120,6 +138,14 @@ def deserialize(message_id_bytes):
120138
"""
121139
return _pulsar.MessageId.deserialize(message_id_bytes)
122140

141+
@classmethod
142+
def wrap(cls, msg_id: _pulsar.MessageId):
143+
"""
144+
Wrap the underlying MessageId type from the C extension to the Python type.
145+
"""
146+
self = cls()
147+
self._msg_id = msg_id
148+
return self
123149

124150
class Message:
125151
"""
@@ -171,9 +197,13 @@ def event_timestamp(self):
171197
"""
172198
return self._message.event_timestamp()
173199

174-
def message_id(self):
200+
def message_id(self) -> _pulsar.MessageId:
175201
"""
176202
The message ID that can be used to refer to this particular message.
203+
204+
Returns
205+
----------
206+
A `_pulsar.MessageId` object that represents where the message is persisted.
177207
"""
178208
return self._message.message_id()
179209

@@ -490,7 +520,9 @@ def __init__(self, service_url,
490520
tls_validate_hostname=False,
491521
logger=None,
492522
connection_timeout_ms=10000,
493-
listener_name=None
523+
listener_name=None,
524+
tls_private_key_file_path: Optional[str] = None,
525+
tls_certificate_file_path: Optional[str] = None,
494526
):
495527
"""
496528
Create a new Pulsar client instance.
@@ -556,6 +588,10 @@ def __init__(self, service_url,
556588
Listener name for lookup. Clients can use listenerName to choose one of the listeners as
557589
the service URL to create a connection to the broker as long as the network is accessible.
558590
``advertisedListeners`` must be enabled in broker side.
591+
tls_private_key_file_path: str, optional
592+
The path to the TLS private key file
593+
tls_certificate_file_path: str, optional
594+
The path to the TLS certificate file.
559595
"""
560596
_check_type(str, service_url, 'service_url')
561597
_check_type_or_none(Authentication, authentication, 'authentication')
@@ -571,6 +607,8 @@ def __init__(self, service_url,
571607
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
572608
_check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
573609
_check_type_or_none(str, listener_name, 'listener_name')
610+
_check_type_or_none(str, tls_private_key_file_path, 'tls_private_key_file_path')
611+
_check_type_or_none(str, tls_certificate_file_path, 'tls_certificate_file_path')
574612

575613
conf = _pulsar.ClientConfiguration()
576614
if authentication:
@@ -602,6 +640,10 @@ def __init__(self, service_url,
602640
conf.tls_trust_certs_file_path(certifi.where())
603641
conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
604642
conf.tls_validate_hostname(tls_validate_hostname)
643+
if tls_private_key_file_path is not None:
644+
conf.tls_private_key_file_path(tls_private_key_file_path)
645+
if tls_certificate_file_path is not None:
646+
conf.tls_certificate_file_path(tls_certificate_file_path)
605647
self._client = _pulsar.Client(service_url, conf)
606648
self._consumers = []
607649

@@ -835,6 +877,7 @@ def subscribe(self, topic, subscription_name,
835877
batch_index_ack_enabled=False,
836878
regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly,
837879
dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None,
880+
crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
838881
):
839882
"""
840883
Subscribe to the given topic and subscription combination.
@@ -938,6 +981,19 @@ def my_listener(consumer, message):
938981
stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
939982
exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
940983
automatically.
984+
crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
985+
Set the behavior when the decryption fails. The default is to fail the message.
986+
987+
Supported actions:
988+
989+
* ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
990+
* ConsumerCryptoFailureAction.DISCARD:
991+
Message is silently acknowledged and not delivered to the application.
992+
* ConsumerCryptoFailureAction.CONSUME:
993+
Deliver the encrypted message to the application. It's the application's responsibility
994+
to decrypt the message. If message is also compressed, decompression will fail. If the
995+
message contains batch messages, client will not be able to retrieve individual messages
996+
in the batch.
941997
"""
942998
_check_type(str, subscription_name, 'subscription_name')
943999
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -961,6 +1017,7 @@ def my_listener(consumer, message):
9611017
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
9621018
_check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
9631019
_check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode')
1020+
_check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
9641021

9651022
conf = _pulsar.ConsumerConfiguration()
9661023
conf.consumer_type(consumer_type)
@@ -999,6 +1056,7 @@ def my_listener(consumer, message):
9991056
conf.batch_index_ack_enabled(batch_index_ack_enabled)
10001057
if dead_letter_policy:
10011058
conf.dead_letter_policy(dead_letter_policy.policy())
1059+
conf.crypto_failure_action(crypto_failure_action)
10021060

10031061
c = Consumer()
10041062
if isinstance(topic, str):
@@ -1027,7 +1085,8 @@ def create_reader(self, topic, start_message_id,
10271085
subscription_role_prefix=None,
10281086
is_read_compacted=False,
10291087
crypto_key_reader: Union[None, CryptoKeyReader] = None,
1030-
start_message_id_inclusive=False
1088+
start_message_id_inclusive=False,
1089+
crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
10311090
):
10321091
"""
10331092
Create a reader on a particular topic
@@ -1088,6 +1147,19 @@ def my_listener(reader, message):
10881147
and private key decryption messages for the consumer
10891148
start_message_id_inclusive: bool, default=False
10901149
Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
1150+
crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
1151+
Set the behavior when the decryption fails. The default is to fail the message.
1152+
1153+
Supported actions:
1154+
1155+
* ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
1156+
* ConsumerCryptoFailureAction.DISCARD:
1157+
Message is silently acknowledged and not delivered to the application.
1158+
* ConsumerCryptoFailureAction.CONSUME:
1159+
Deliver the encrypted message to the application. It's the application's responsibility
1160+
to decrypt the message. If message is also compressed, decompression will fail. If the
1161+
message contains batch messages, client will not be able to retrieve individual messages
1162+
in the batch.
10911163
"""
10921164

10931165
# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
@@ -1103,6 +1175,7 @@ def my_listener(reader, message):
11031175
_check_type(bool, is_read_compacted, 'is_read_compacted')
11041176
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
11051177
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
1178+
_check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
11061179

11071180
conf = _pulsar.ReaderConfiguration()
11081181
if reader_listener:
@@ -1117,6 +1190,7 @@ def my_listener(reader, message):
11171190
if crypto_key_reader:
11181191
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
11191192
conf.start_message_id_inclusive(start_message_id_inclusive)
1193+
conf.crypto_failure_action(crypto_failure_action)
11201194

11211195
c = Reader()
11221196
c._reader = self._client.create_reader(topic, start_message_id, conf)
@@ -1256,7 +1330,7 @@ def send(self, content,
12561330
event_timestamp=None,
12571331
deliver_at=None,
12581332
deliver_after=None,
1259-
):
1333+
) -> _pulsar.MessageId:
12601334
"""
12611335
Publish a message on the topic. Blocks until the message is acknowledged
12621336
@@ -1289,6 +1363,10 @@ def send(self, content,
12891363
The timestamp is milliseconds and based on UTC
12901364
deliver_after: optional
12911365
Specify a delay in timedelta for the delivery of the messages.
1366+
1367+
Returns
1368+
----------
1369+
A `_pulsar.MessageId` object that represents where the message is persisted.
12921370
"""
12931371
msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id,
12941372
replication_clusters, disable_replication, event_timestamp,
@@ -1527,7 +1605,7 @@ def batch_receive(self):
15271605
messages.append(m)
15281606
return messages
15291607

1530-
def acknowledge(self, message):
1608+
def acknowledge(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]):
15311609
"""
15321610
Acknowledge the reception of a single message.
15331611
@@ -1536,7 +1614,7 @@ def acknowledge(self, message):
15361614
15371615
Parameters
15381616
----------
1539-
message : Message, _pulsar.Message, _pulsar.MessageId
1617+
message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
15401618
The received message or message id.
15411619
15421620
Raises
@@ -1546,10 +1624,12 @@ def acknowledge(self, message):
15461624
"""
15471625
if isinstance(message, Message):
15481626
self._consumer.acknowledge(message._message)
1627+
elif isinstance(message, MessageId):
1628+
self._consumer.acknowledge(message._msg_id)
15491629
else:
15501630
self._consumer.acknowledge(message)
15511631

1552-
def acknowledge_cumulative(self, message):
1632+
def acknowledge_cumulative(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]):
15531633
"""
15541634
Acknowledge the reception of all the messages in the stream up to (and
15551635
including) the provided message.
@@ -1570,6 +1650,8 @@ def acknowledge_cumulative(self, message):
15701650
"""
15711651
if isinstance(message, Message):
15721652
self._consumer.acknowledge_cumulative(message._message)
1653+
elif isinstance(message, MessageId):
1654+
self._consumer.acknowledge_cumulative(message._msg_id)
15731655
else:
15741656
self._consumer.acknowledge_cumulative(message)
15751657

src/config.cc

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,14 @@ void export_config(py::module_& m) {
166166
return_value_policy::copy)
167167
.def("tls_trust_certs_file_path", &ClientConfiguration::setTlsTrustCertsFilePath,
168168
return_value_policy::reference)
169+
.def("tls_private_key_file_path", &ClientConfiguration::getTlsPrivateKeyFilePath,
170+
return_value_policy::copy)
171+
.def("tls_private_key_file_path", &ClientConfiguration::setTlsPrivateKeyFilePath,
172+
return_value_policy::reference)
173+
.def("tls_certificate_file_path", &ClientConfiguration::getTlsCertificateFilePath,
174+
return_value_policy::copy)
175+
.def("tls_certificate_file_path", &ClientConfiguration::setTlsCertificateFilePath,
176+
return_value_policy::reference)
169177
.def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection)
170178
.def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection,
171179
return_value_policy::reference)
@@ -305,7 +313,11 @@ void export_config(py::module_& m) {
305313
.def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
306314
return_value_policy::reference)
307315
.def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
308-
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
316+
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy)
317+
.def("crypto_failure_action", &ConsumerConfiguration::getCryptoFailureAction,
318+
return_value_policy::copy)
319+
.def("crypto_failure_action", &ConsumerConfiguration::setCryptoFailureAction,
320+
return_value_policy::reference);
309321

310322
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
311323
.def(init<>())
@@ -323,5 +335,9 @@ void export_config(py::module_& m) {
323335
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
324336
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
325337
.def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
326-
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
338+
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference)
339+
.def("crypto_failure_action", &ReaderConfiguration::getCryptoFailureAction,
340+
return_value_policy::copy)
341+
.def("crypto_failure_action", &ReaderConfiguration::setCryptoFailureAction,
342+
return_value_policy::reference);
327343
}

src/enums.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "utils.h"
2020
#include <pulsar/CompressionType.h>
2121
#include <pulsar/ConsumerConfiguration.h>
22+
#include <pulsar/ConsumerCryptoFailureAction.h>
2223
#include <pulsar/ProducerConfiguration.h>
2324
#include <pulsar/KeySharedPolicy.h>
2425
#include <pybind11/pybind11.h>
@@ -140,4 +141,9 @@ void export_enums(py::module_& m) {
140141
.value("Info", Logger::LEVEL_INFO)
141142
.value("Warn", Logger::LEVEL_WARN)
142143
.value("Error", Logger::LEVEL_ERROR);
144+
145+
enum_<ConsumerCryptoFailureAction>(m, "ConsumerCryptoFailureAction")
146+
.value("FAIL", ConsumerCryptoFailureAction::FAIL)
147+
.value("DISCARD", ConsumerCryptoFailureAction::DISCARD)
148+
.value("CONSUME", ConsumerCryptoFailureAction::CONSUME);
143149
}

0 commit comments

Comments
 (0)