Skip to content

Commit 1f98f44

Browse files
committed
test start_message_id_inclusive
1 parent 3a6aa86 commit 1f98f44

File tree

2 files changed

+37
-24
lines changed

2 files changed

+37
-24
lines changed

pulsar/asyncio.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ def __init__(self, service_url, **kwargs) -> None:
306306
"""
307307
self._client: _pulsar.Client = pulsar.Client(service_url, **kwargs)._client
308308

309-
# pylint: disable=too-many-arguments,too-many-locals
309+
# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
310310
async def create_producer(self, topic: str,
311311
producer_name: str | None = None,
312312
schema: pulsar.schema.Schema | None = None,
@@ -452,13 +452,14 @@ def underlying_router(msg, num_partitions):
452452
)
453453
return Producer(await future)
454454

455-
# pylint: disable=too-many-arguments,too-many-locals,too-many-branches
455+
# pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments
456456
async def subscribe(self, topic: Union[str, List[str]],
457457
subscription_name: str,
458458
consumer_type: pulsar.ConsumerType =
459459
pulsar.ConsumerType.Exclusive,
460460
schema: pulsar.schema.Schema | None = None,
461-
message_listener: Callable[['Consumer', pulsar.Message], None] | None = None,
461+
message_listener: Callable[['Consumer', pulsar.Message],
462+
None] | None = None,
462463
receiver_queue_size: int = 1000,
463464
max_total_receiver_queue_size_across_partitions: int =
464465
50000,

tests/asyncio_test.py

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,35 @@
1818
# under the License.
1919
#
2020

21+
"""
22+
Unit tests for asyncio Pulsar client API.
23+
"""
24+
25+
# pylint: disable=missing-function-docstring
26+
2127
import asyncio
22-
from typing import List
23-
import pulsar
2428
import time
25-
from pulsar.asyncio import (
29+
from typing import List
30+
from unittest import (
31+
main,
32+
IsolatedAsyncioTestCase,
33+
)
34+
35+
import pulsar # pylint: disable=import-error
36+
from pulsar.asyncio import ( # pylint: disable=import-error
2637
Client,
2738
Consumer,
2839
Producer,
2940
PulsarException,
3041
)
31-
from unittest import (
32-
main,
33-
IsolatedAsyncioTestCase,
34-
)
3542

36-
service_url = 'pulsar://localhost:6650'
43+
SERVICE_URL = 'pulsar://localhost:6650'
3744

3845
class AsyncioTest(IsolatedAsyncioTestCase):
46+
"""Test cases for asyncio Pulsar client."""
3947

4048
async def asyncSetUp(self) -> None:
41-
self._client = Client(service_url,
49+
self._client = Client(SERVICE_URL,
4250
operation_timeout_seconds=5)
4351

4452
async def asyncTearDown(self) -> None:
@@ -103,8 +111,7 @@ async def test_close_producer(self):
103111
async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]:
104112
msg_ids = []
105113
for i in range(5):
106-
msg_id = await producer.send(f'msg-{i}'.encode())
107-
msg_ids.append(msg_id)
114+
msg_ids.append(await producer.send(f'msg-{i}'.encode()))
108115
return msg_ids
109116

110117
async def test_consumer_cumulative_acknowledge(self):
@@ -127,7 +134,7 @@ async def test_consumer_cumulative_acknowledge(self):
127134
async def test_consumer_individual_acknowledge(self):
128135
topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
129136
sub = 'sub'
130-
consumer = await self._client.subscribe(topic, sub,
137+
consumer = await self._client.subscribe(topic, sub,
131138
consumer_type=pulsar.ConsumerType.Shared)
132139
producer = await self._client.create_producer(topic)
133140
await self._prepare_messages(producer)
@@ -141,7 +148,7 @@ async def test_consumer_individual_acknowledge(self):
141148
await consumer.acknowledge(msgs[4])
142149
await consumer.close()
143150

144-
consumer = await self._client.subscribe(topic, sub,
151+
consumer = await self._client.subscribe(topic, sub,
145152
consumer_type=pulsar.ConsumerType.Shared)
146153
msg = await consumer.receive()
147154
self.assertEqual(msg.data(), b'msg-1')
@@ -194,21 +201,26 @@ async def test_unsubscribe(self):
194201
async def test_seek_message_id(self):
195202
topic = f'asyncio-test-seek-message-id-{time.time()}'
196203
sub = 'sub'
197-
consumer = await self._client.subscribe(
198-
topic, sub, initial_position=pulsar.InitialPosition.Earliest
199-
)
200204

201205
producer = await self._client.create_producer(topic)
202206
msg_ids = await self._prepare_messages(producer)
203207

204-
for i in range(5):
205-
msg = await consumer.receive()
206-
self.assertEqual(msg.data(), f'msg-{i}'.encode())
207-
208+
consumer = await self._client.subscribe(
209+
topic, sub, initial_position=pulsar.InitialPosition.Earliest
210+
)
208211
await consumer.seek(msg_ids[2])
209-
210212
msg = await consumer.receive()
211213
self.assertEqual(msg.data(), b'msg-3')
214+
await consumer.close()
215+
216+
consumer = await self._client.subscribe(
217+
topic, sub, initial_position=pulsar.InitialPosition.Earliest,
218+
start_message_id_inclusive=True
219+
)
220+
await consumer.seek(msg_ids[2])
221+
msg = await consumer.receive()
222+
self.assertEqual(msg.data(), b'msg-2')
223+
await consumer.close()
212224

213225
async def test_seek_timestamp(self):
214226
topic = f'asyncio-test-seek-timestamp-{time.time()}'

0 commit comments

Comments
 (0)