Skip to content

Commit 2cb3cfe

Browse files
authored
feat: support batch index ack. (#139)
1 parent 8e6dd65 commit 2cb3cfe

File tree

5 files changed

+58
-1
lines changed

5 files changed

+58
-1
lines changed

pulsar/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,8 @@ def subscribe(self, topic, subscription_name,
692692
auto_ack_oldest_chunked_message_on_queue_full=False,
693693
start_message_id_inclusive=False,
694694
batch_receive_policy=None,
695-
key_shared_policy=None
695+
key_shared_policy=None,
696+
batch_index_ack_enabled=False,
696697
):
697698
"""
698699
Subscribe to the given topic and subscription combination.
@@ -779,6 +780,9 @@ def my_listener(consumer, message):
779780
Set the batch collection policy for batch receiving.
780781
key_shared_policy: class ConsumerKeySharedPolicy
781782
Set the key shared policy for use when the ConsumerType is KeyShared.
783+
batch_index_ack_enabled: Enable the batch index acknowledgement.
784+
It should be noted that this option can only work when the broker side also enables the batch index
785+
acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled` config in `broker.conf`.
782786
"""
783787
_check_type(str, subscription_name, 'subscription_name')
784788
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -800,6 +804,7 @@ def my_listener(consumer, message):
800804
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
801805
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
802806
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
807+
_check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
803808

804809
conf = _pulsar.ConsumerConfiguration()
805810
conf.consumer_type(consumer_type)
@@ -834,6 +839,7 @@ def my_listener(consumer, message):
834839

835840
if key_shared_policy:
836841
conf.key_shared_policy(key_shared_policy.policy())
842+
conf.batch_index_ack_enabled(batch_index_ack_enabled)
837843

838844
c = Consumer()
839845
if isinstance(topic, str):

src/config.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,9 @@ void export_config(py::module_& m) {
280280
return_value_policy::reference)
281281
.def("start_message_id_inclusive", &ConsumerConfiguration::isStartMessageIdInclusive)
282282
.def("start_message_id_inclusive", &ConsumerConfiguration::setStartMessageIdInclusive,
283+
return_value_policy::reference)
284+
.def("batch_index_ack_enabled", &ConsumerConfiguration::isBatchIndexAckEnabled)
285+
.def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
283286
return_value_policy::reference);
284287

285288
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")

tests/pulsar_test.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1617,6 +1617,48 @@ def test_acknowledge_failed(self):
16171617
consumer.acknowledge(msg_id)
16181618
client.close()
16191619

1620+
def test_batch_index_ack(self):
1621+
topic_name = 'test-batch-index-ack-3'
1622+
client = pulsar.Client('pulsar://localhost:6650')
1623+
producer = client.create_producer(topic_name,
1624+
batching_enabled=True,
1625+
batching_max_messages=100,
1626+
batching_max_publish_delay_ms=10000)
1627+
consumer = client.subscribe(topic_name,
1628+
subscription_name='test-batch-index-ack',
1629+
batch_index_ack_enabled=True)
1630+
1631+
# Make sure send 0~5 is a batch msg.
1632+
for i in range(5):
1633+
producer.send_async(b"hello-%d" % i, callback=None)
1634+
producer.flush()
1635+
1636+
# Receive msgs and just ack 0, 1 msgs
1637+
results = []
1638+
for i in range(5):
1639+
msg = consumer.receive()
1640+
print("receive from {}".format(msg.message_id()))
1641+
results.append(msg)
1642+
assert len(results) == 5
1643+
for i in range(2):
1644+
consumer.acknowledge(results[i])
1645+
time.sleep(0.2)
1646+
1647+
# Restart consumer after, just receive 2~5 msg.
1648+
consumer.close()
1649+
consumer = client.subscribe(topic_name,
1650+
subscription_name='test-batch-index-ack',
1651+
batch_index_ack_enabled=True)
1652+
results2 = []
1653+
for i in range(2, 5):
1654+
msg = consumer.receive()
1655+
results2.append(msg)
1656+
assert len(results2) == 3
1657+
# assert no more msgs.
1658+
with self.assertRaises(pulsar.Timeout):
1659+
consumer.receive(timeout_millis=1000)
1660+
1661+
client.close()
16201662

16211663

16221664
if __name__ == "__main__":

tests/test-conf/standalone-ssl.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ superUserRoles=localhost,superUser,admin
113113
brokerClientAuthenticationPlugin=
114114
brokerClientAuthenticationParameters=
115115

116+
# Enable batch index ACK
117+
acknowledgmentAtBatchIndexLevelEnabled=true
118+
116119
### --- BookKeeper Client --- ###
117120

118121
# Authentication plugin to use when connecting to bookies

tests/test-conf/standalone.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ superUserRoles=
100100
brokerClientAuthenticationPlugin=
101101
brokerClientAuthenticationParameters=
102102

103+
# Enable batch index ACK
104+
acknowledgmentAtBatchIndexLevelEnabled=true
105+
103106

104107
### --- BookKeeper Client --- ###
105108

0 commit comments

Comments
 (0)