Skip to content

Commit 04a6d02

Browse files
Merge branch 'main' into async-last-message-id
2 parents eab1434 + 2704dd7 commit 04a6d02

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

pulsar/asyncio.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,18 @@ async def get_last_message_id(self) -> _pulsar.MessageId:
407407
self._consumer.get_last_message_id_async(functools.partial(_set_future, future))
408408
id = await future
409409
return id
410-
410+
411+
def redeliver_unacknowledged_messages(self):
412+
"""
413+
Redelivers all the unacknowledged messages. In failover mode, the
414+
request is ignored if the consumer is not active for the given topic. In
415+
shared mode, the consumer's messages to be redelivered are distributed
416+
across all the connected consumers. This is a non-blocking call and
417+
doesn't throw an exception. In case the connection breaks, the messages
418+
are redelivered after reconnect.
419+
"""
420+
self._consumer.redeliver_unacknowledged_messages()
421+
411422
def topic(self) -> str:
412423
"""
413424
Return the topic this consumer is subscribed to.

tests/asyncio_test.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,44 @@ async def test_consumer_get_last_message_id(self):
283283
await consumer.acknowledge(last_msg_id)
284284
await consumer.close()
285285

286+
async def test_async_dead_letter_policy(self):
287+
topic = f'asyncio-test-dlq-{time.time()}'
288+
dlq_topic = 'dlq-' + topic
289+
max_redeliver_count = 5
290+
291+
dlq_consumer = await self._client.subscribe(dlq_topic, "my-sub", consumer_type=pulsar.ConsumerType.Shared)
292+
consumer = await self._client.subscribe(topic, "my-sub", consumer_type=pulsar.ConsumerType.Shared,
293+
dead_letter_policy=pulsar.ConsumerDeadLetterPolicy(max_redeliver_count, dlq_topic, 'init-sub'))
294+
producer = await self._client.create_producer(topic)
295+
296+
# Sen num msgs.
297+
num = 10
298+
for i in range(num):
299+
await producer.send(b"hello-%d" % i)
300+
await producer.flush()
301+
302+
# Redelivery all messages maxRedeliverCountNum time.
303+
for i in range(1, num * max_redeliver_count + num + 1):
304+
msg = await consumer.receive()
305+
if i % num == 0:
306+
consumer.redeliver_unacknowledged_messages()
307+
print(f"Start redeliver msgs '{i}'")
308+
309+
with self.assertRaises(asyncio.TimeoutError):
310+
await asyncio.wait_for(consumer.receive(), 0.1)
311+
312+
for i in range(num):
313+
msg = await dlq_consumer.receive()
314+
self.assertTrue(msg)
315+
self.assertEqual(msg.data(), b"hello-%d" % i)
316+
dlq_consumer.acknowledge(msg)
317+
318+
with self.assertRaises(asyncio.TimeoutError):
319+
await asyncio.wait_for(dlq_consumer.receive(), 0.1)
320+
321+
await consumer.close()
322+
await dlq_consumer.close()
323+
286324
async def test_unsubscribe(self):
287325
topic = f'asyncio-test-unsubscribe-{time.time()}'
288326
sub = 'sub'

0 commit comments

Comments
 (0)