Skip to content

Commit 08c6f9a

Browse files
committed
support seek and fix tests
1 parent c232dfe commit 08c6f9a

File tree

3 files changed

+99
-4
lines changed

3 files changed

+99
-4
lines changed

pulsar/asyncio.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,36 @@ async def unsubscribe(self) -> None:
235235
self._consumer.unsubscribe_async(functools.partial(_set_future, future, value=None))
236236
await future
237237

238+
async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
239+
"""
240+
Reset the subscription associated with this consumer to a specific
241+
message id or publish timestamp asynchronously.
242+
243+
The message id can either be a specific message or represent the first
244+
or last messages in the topic.
245+
246+
Parameters
247+
----------
248+
messageid : MessageId or int
249+
The message id for seek, OR an integer event time (timestamp) to
250+
seek to
251+
252+
Raises
253+
------
254+
PulsarException
255+
"""
256+
future = asyncio.get_running_loop().create_future()
257+
if isinstance(messageid, pulsar.MessageId):
258+
msg_id = messageid._msg_id
259+
elif isinstance(messageid, int):
260+
msg_id = messageid
261+
else:
262+
raise ValueError(f"invalid messageid type {type(messageid)}")
263+
self._consumer.seek_async(
264+
msg_id, functools.partial(_set_future, future, value=None)
265+
)
266+
await future
267+
238268
async def close(self) -> None:
239269
"""
240270
Close the consumer asynchronously.
@@ -286,7 +316,7 @@ async def create_producer(self, topic: str,
286316
max_pending_messages: int = 1000,
287317
max_pending_messages_across_partitions: int = 50000,
288318
block_if_queue_full: bool = False,
289-
batching_enabled: bool = False,
319+
batching_enabled: bool = True,
290320
batching_max_messages: int = 1000,
291321
batching_max_allowed_size_in_bytes: int = 128*1024,
292322
batching_max_publish_delay_ms: int = 10,

src/consumer.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,16 @@ void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) {
142142
consumer.unsubscribeAsync(callback);
143143
}
144144

145+
void Consumer_seekAsync(Consumer& consumer, const MessageId& msgId, ResultCallback callback) {
146+
py::gil_scoped_release release;
147+
consumer.seekAsync(msgId, callback);
148+
}
149+
150+
void Consumer_seekAsync_timestamp(Consumer& consumer, uint64_t timestamp, ResultCallback callback) {
151+
py::gil_scoped_release release;
152+
consumer.seekAsync(timestamp, callback);
153+
}
154+
145155
void export_consumer(py::module_& m) {
146156
py::class_<Consumer>(m, "Consumer")
147157
.def(py::init<>())
@@ -173,5 +183,7 @@ void export_consumer(py::module_& m) {
173183
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync)
174184
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id)
175185
.def("close_async", &Consumer_closeAsync)
176-
.def("unsubscribe_async", &Consumer_unsubscribeAsync);
186+
.def("unsubscribe_async", &Consumer_unsubscribeAsync)
187+
.def("seek_async", &Consumer_seekAsync)
188+
.def("seek_async", &Consumer_seekAsync_timestamp);
177189
}

tests/asyncio_test.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#
2020

2121
import asyncio
22+
from typing import List
2223
import pulsar
2324
import time
2425
from pulsar.asyncio import (
@@ -99,9 +100,12 @@ async def test_close_producer(self):
99100
except PulsarException as e:
100101
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
101102

102-
async def _prepare_messages(self, producer: Producer):
103+
async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]:
104+
msg_ids = []
103105
for i in range(5):
104-
await producer.send(f'msg-{i}'.encode())
106+
msg_id = await producer.send(f'msg-{i}'.encode())
107+
msg_ids.append(msg_id)
108+
return msg_ids
105109

106110
async def test_consumer_cumulative_acknowledge(self):
107111
topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
@@ -185,6 +189,55 @@ async def test_unsubscribe(self):
185189
await consumer.unsubscribe()
186190
consumer = await self._client.subscribe(topic, sub)
187191

192+
async def test_seek_message_id(self):
193+
topic = f'asyncio-test-seek-message-id-{time.time()}'
194+
sub = 'sub'
195+
consumer = await self._client.subscribe(
196+
topic, sub, initial_position=pulsar.InitialPosition.Earliest
197+
)
198+
199+
producer = await self._client.create_producer(topic)
200+
msg_ids = await self._prepare_messages(producer)
201+
202+
for i in range(5):
203+
msg = await consumer.receive()
204+
self.assertEqual(msg.data(), f'msg-{i}'.encode())
205+
206+
await consumer.seek(msg_ids[2])
207+
208+
msg = await consumer.receive()
209+
self.assertEqual(msg.data(), b'msg-3')
210+
211+
async def test_seek_timestamp(self):
212+
topic = f'asyncio-test-seek-timestamp-{time.time()}'
213+
sub = 'sub'
214+
consumer = await self._client.subscribe(
215+
topic, sub, initial_position=pulsar.InitialPosition.Earliest
216+
)
217+
218+
producer = await self._client.create_producer(topic)
219+
220+
# Send first 3 messages
221+
for i in range(3):
222+
await producer.send(f'msg-{i}'.encode())
223+
224+
seek_time = int(time.time() * 1000)
225+
226+
# Send 2 more messages
227+
for i in range(3, 5):
228+
await producer.send(f'msg-{i}'.encode())
229+
230+
# Consume all messages first
231+
for i in range(5):
232+
msg = await consumer.receive()
233+
self.assertEqual(msg.data(), f'msg-{i}'.encode())
234+
235+
# Seek to the timestamp (should start from msg-3)
236+
await consumer.seek(seek_time)
237+
238+
msg = await consumer.receive()
239+
self.assertEqual(msg.data(), b'msg-3')
240+
188241

189242
if __name__ == '__main__':
190243
main()

0 commit comments

Comments
 (0)