Skip to content

Commit c5c177a

Browse files
authored
[feat] Support consumer batch receive. (#33)
1 parent 1554e39 commit c5c177a

File tree

4 files changed

+87
-2
lines changed

4 files changed

+87
-2
lines changed

pulsar/__init__.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import _pulsar
4747

4848
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
49-
LoggerLevel # noqa: F401
49+
LoggerLevel, BatchReceivePolicy # noqa: F401
5050

5151
from pulsar.exceptions import *
5252

@@ -657,7 +657,8 @@ def subscribe(self, topic, subscription_name,
657657
replicate_subscription_state_enabled=False,
658658
max_pending_chunked_message=10,
659659
auto_ack_oldest_chunked_message_on_queue_full=False,
660-
start_message_id_inclusive=False
660+
start_message_id_inclusive=False,
661+
batch_receive_policy=None
661662
):
662663
"""
663664
Subscribe to the given topic and subscription combination.
@@ -740,6 +741,8 @@ def my_listener(consumer, message):
740741
if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
741742
start_message_id_inclusive: bool, default=False
742743
Set the consumer to include the given position of any reset operation like Consumer::seek.
744+
batch_receive_policy: class ConsumerBatchReceivePolicy
745+
Set the batch collection policy for batch receiving.
743746
"""
744747
_check_type(str, subscription_name, 'subscription_name')
745748
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -759,6 +762,7 @@ def my_listener(consumer, message):
759762
_check_type(int, max_pending_chunked_message, 'max_pending_chunked_message')
760763
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
761764
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
765+
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
762766

763767
conf = _pulsar.ConsumerConfiguration()
764768
conf.consumer_type(consumer_type)
@@ -788,6 +792,8 @@ def my_listener(consumer, message):
788792
conf.max_pending_chunked_message(max_pending_chunked_message)
789793
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
790794
conf.start_message_id_inclusive(start_message_id_inclusive)
795+
if batch_receive_policy:
796+
conf.batch_receive_policy(batch_receive_policy.policy())
791797

792798
c = Consumer()
793799
if isinstance(topic, str):
@@ -1237,6 +1243,20 @@ def receive(self, timeout_millis=None):
12371243
m._schema = self._schema
12381244
return m
12391245

1246+
def batch_receive(self):
1247+
"""
1248+
Batch receiving messages.
1249+
1250+
This calls blocks until has enough messages or wait timeout, more details to see {@link BatchReceivePolicy}.
1251+
"""
1252+
messages = []
1253+
msgs = self._consumer.batch_receive()
1254+
for msg in msgs:
1255+
m = Message()
1256+
m._message = msg
1257+
messages.append(m)
1258+
return messages
1259+
12401260
def acknowledge(self, message):
12411261
"""
12421262
Acknowledge the reception of a single message.
@@ -1354,6 +1374,32 @@ def get_last_message_id(self):
13541374
"""
13551375
return self._consumer.get_last_message_id()
13561376

1377+
class ConsumerBatchReceivePolicy:
1378+
"""
1379+
Batch receive policy can limit the number and bytes of messages in a single batch,
1380+
and can specify a timeout for waiting for enough messages for this batch.
1381+
1382+
A batch receive action is completed as long as any one of the conditions (the batch has enough number
1383+
or size of messages, or the waiting timeout is passed) are met.
1384+
"""
1385+
def __init__(self, max_num_message, max_num_bytes, timeout_ms):
1386+
"""
1387+
Wrapper BatchReceivePolicy.
1388+
1389+
Parameters
1390+
----------
1391+
1392+
max_num_message: Max num message, if less than 0, it means no limit. default: -1
1393+
max_num_bytes: Max num bytes, if less than 0, it means no limit. default: 10 * 1024 * 1024
1394+
timeout_ms: If less than 0, it means no limit. default: 100
1395+
"""
1396+
self._policy = BatchReceivePolicy(max_num_message, max_num_bytes, timeout_ms)
1397+
1398+
def policy(self):
1399+
"""
1400+
Returns the actual one BatchReceivePolicy.
1401+
"""
1402+
return self._policy
13571403

13581404
class Reader:
13591405
"""

src/config.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ void export_config() {
253253
.def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>())
254254
.def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>());
255255

256+
class_<BatchReceivePolicy>("BatchReceivePolicy", init<int, int, long>())
257+
.def("getTimeoutMs", &BatchReceivePolicy::getTimeoutMs)
258+
.def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages)
259+
.def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes);
260+
256261
class_<ConsumerConfiguration>("ConsumerConfiguration")
257262
.def("consumer_type", &ConsumerConfiguration::getConsumerType)
258263
.def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>())
@@ -267,6 +272,8 @@ void export_config() {
267272
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
268273
.def("consumer_name", &ConsumerConfiguration::getConsumerName,
269274
return_value_policy<copy_const_reference>())
275+
.def("batch_receive_policy", &ConsumerConfiguration::getBatchReceivePolicy, return_value_policy<copy_const_reference>())
276+
.def("batch_receive_policy", &ConsumerConfiguration::setBatchReceivePolicy)
270277
.def("consumer_name", &ConsumerConfiguration::setConsumerName)
271278
.def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
272279
.def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs)

src/consumer.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
4242
return msg;
4343
}
4444

45+
Messages Consumer_batch_receive(Consumer& consumer) {
46+
Messages msgs;
47+
Result res;
48+
Py_BEGIN_ALLOW_THREADS res = consumer.batchReceive(msgs);
49+
Py_END_ALLOW_THREADS
50+
CHECK_RESULT(res);
51+
return msgs;
52+
}
53+
4554
void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }
4655

4756
void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
@@ -103,6 +112,7 @@ void export_consumer() {
103112
.def("unsubscribe", &Consumer_unsubscribe)
104113
.def("receive", &Consumer_receive)
105114
.def("receive", &Consumer_receive_timeout)
115+
.def("batch_receive", &Consumer_batch_receive)
106116
.def("acknowledge", &Consumer_acknowledge)
107117
.def("acknowledge", &Consumer_acknowledge_message_id)
108118
.def("acknowledge_cumulative", &Consumer_acknowledge_cumulative)

tests/pulsar_test.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
AuthenticationToken,
4040
InitialPosition,
4141
CryptoKeyReader,
42+
ConsumerBatchReceivePolicy,
4243
)
4344
from pulsar.schema import JsonSchema, Record, Integer
4445

@@ -1064,6 +1065,27 @@ def test_topics_pattern_consumer(self):
10641065
consumer.receive(100)
10651066
client.close()
10661067

1068+
def test_batch_receive(self):
1069+
client = Client(self.serviceUrl)
1070+
topic = "my-python-topic-batch-receive-" + str(time.time())
1071+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
1072+
start_message_id_inclusive=True, batch_receive_policy=ConsumerBatchReceivePolicy(10, -1, -1))
1073+
producer = client.create_producer(topic)
1074+
1075+
1076+
for i in range(10):
1077+
if i > 0:
1078+
time.sleep(0.02)
1079+
producer.send(b"hello-%d" % i)
1080+
1081+
msgs = consumer.batch_receive()
1082+
i = 0
1083+
for msg in msgs:
1084+
self.assertEqual(msg.data(), b"hello-%d" % i)
1085+
i += 1
1086+
1087+
client.close()
1088+
10671089
def test_message_id(self):
10681090
s = MessageId.earliest.serialize()
10691091
self.assertEqual(MessageId.deserialize(s), MessageId.earliest)

0 commit comments

Comments
 (0)