Skip to content

Commit d4aab5c

Browse files
committed
Added tests for async client
1 parent 58b5eb5 commit d4aab5c

File tree

3 files changed

+222
-14
lines changed

3 files changed

+222
-14
lines changed

pulsar/asyncio.py

Lines changed: 155 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@
2323

2424
import asyncio
2525
import functools
26-
from typing import Any, Iterable, Optional
26+
from typing import Any, Iterable, Optional, Union
2727

2828
import _pulsar
2929

3030
import pulsar
31+
from pulsar import _listener_wrapper
3132

3233

3334
class PulsarException(BaseException):
@@ -212,12 +213,16 @@ async def close(self):
212213
self._consumer.close_async(functools.partial(_set_future, future, value=None))
213214
await future
214215

215-
async def seek(self, position: tuple[int, int, int, int]):
216+
async def seek(self, position: tuple[int, int, int, int] | pulsar.MessageId):
216217
"""
217218
Reset the subscription associated with this consumer to a specific message id or publish timestamp. The message id can either be a specific message or represent the first or last messages in the topic. ...
218219
"""
219-
partition, ledger_id, entry_id, batch_index = position
220-
message_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
220+
if isinstance(position, tuple):
221+
partition, ledger_id, entry_id, batch_index = position
222+
message_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
223+
else:
224+
message_id = position
225+
221226
future = asyncio.get_running_loop().create_future()
222227
self._consumer.seek_async(message_id, functools.partial(_set_future, future))
223228
await future
@@ -249,21 +254,40 @@ def redeliver_unacknowledged_messages(self):
249254
self._consumer.redeliver_unacknowledged_messages()
250255

251256
@property
252-
def last_message_id(self) -> int:
253-
return self._consumer.last_message_id
257+
def last_message_id(self) -> pulsar.MessageId:
258+
"""
259+
MessageId of the last consumed message
260+
"""
261+
return self._consumer.get_last_message_id()
254262

255263
@property
256264
def is_connected(self) -> bool:
265+
"""
266+
True if the consumer is connected to a broker
267+
"""
257268
return self._consumer.is_connected()
258269

259270
@property
260271
def subscription_name(self) -> str:
272+
"""
273+
Name of the current subscription
274+
"""
261275
return self._consumer.subscription_name()
262276

263277
@property
264278
def topic(self) -> str:
279+
"""
280+
Topic(s) of consumer
281+
"""
265282
return self._consumer.topic()
266283

284+
@property
285+
def consumer_name(self) -> str:
286+
"""
287+
Name of consumer
288+
"""
289+
return self._consumer.consumer_name()
290+
267291

268292
class Client:
269293
"""
@@ -277,17 +301,104 @@ def __init__(self, service_url, **kwargs) -> None:
277301
assert service_url.startswith('pulsar://'), "The service url must start with 'pulsar://'"
278302
self._client = pulsar.Client(service_url, **kwargs)._client
279303

280-
async def subscribe(self, topics: str, subscription_name: str, consumer_type: _pulsar.ConsumerType,
281-
schema: Optional[_pulsar.SchemaInfo] = _pulsar.SchemaInfo(_pulsar.SchemaType.BYTES, "bytes", "")) -> Consumer:
304+
async def subscribe(self, topic, subscription_name,
305+
consumer_type: _pulsar.ConsumerType = _pulsar.ConsumerType.Exclusive,
306+
schema=pulsar.schema.BytesSchema(),
307+
message_listener=None,
308+
receiver_queue_size=1000,
309+
max_total_receiver_queue_size_across_partitions=50000,
310+
consumer_name=None,
311+
unacked_messages_timeout_ms=None,
312+
broker_consumer_stats_cache_time_ms=30000,
313+
negative_ack_redelivery_delay_ms=60000,
314+
is_read_compacted=False,
315+
properties=None,
316+
pattern_auto_discovery_period=60,
317+
initial_position: _pulsar.InitialPosition = _pulsar.InitialPosition.Latest,
318+
crypto_key_reader: Union[None, _pulsar.CryptoKeyReader] = None,
319+
replicate_subscription_state_enabled=False,
320+
max_pending_chunked_message=10,
321+
auto_ack_oldest_chunked_message_on_queue_full=False,
322+
start_message_id_inclusive=False,
323+
batch_receive_policy=None,
324+
key_shared_policy=None,
325+
batch_index_ack_enabled=False,
326+
regex_subscription_mode: _pulsar.RegexSubscriptionMode = _pulsar.RegexSubscriptionMode.PersistentOnly,
327+
dead_letter_policy: Union[None, pulsar.ConsumerDeadLetterPolicy] = None,) -> Consumer:
282328
conf = _pulsar.ConsumerConfiguration()
283329
conf.consumer_type(consumer_type)
284-
conf.schema(schema)
330+
conf.regex_subscription_mode(regex_subscription_mode)
331+
conf.read_compacted(is_read_compacted)
332+
if message_listener:
333+
conf.message_listener(_listener_wrapper(message_listener, schema))
334+
conf.receiver_queue_size(receiver_queue_size)
335+
conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions)
336+
if consumer_name:
337+
conf.consumer_name(consumer_name)
338+
if unacked_messages_timeout_ms:
339+
conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
340+
341+
conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
342+
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
343+
if properties:
344+
for k, v in properties.items():
345+
conf.property(k, v)
346+
conf.subscription_initial_position(initial_position)
347+
348+
conf.schema(schema.schema_info())
349+
350+
if crypto_key_reader:
351+
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
352+
353+
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
354+
conf.max_pending_chunked_message(max_pending_chunked_message)
355+
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
356+
conf.start_message_id_inclusive(start_message_id_inclusive)
357+
if batch_receive_policy:
358+
conf.batch_receive_policy(batch_receive_policy.policy())
359+
360+
if key_shared_policy:
361+
conf.key_shared_policy(key_shared_policy.policy())
362+
conf.batch_index_ack_enabled(batch_index_ack_enabled)
363+
if dead_letter_policy:
364+
conf.dead_letter_policy(dead_letter_policy.policy())
285365

286366
future = asyncio.get_running_loop().create_future()
287-
self._client.subscribe_async(topics, subscription_name, conf, functools.partial(_set_future, future))
367+
368+
if isinstance(topic, str):
369+
self._client.subscribe_async(topic, subscription_name, conf, functools.partial(_set_future, future))
370+
elif isinstance(topic, list):
371+
self._client.subscribe_topics_async(topic, subscription_name, conf, functools.partial(_set_future, future))
372+
elif isinstance(topic, pulsar._retype):
373+
self._client.subscribe_pattern_async(topic, subscription_name, conf, functools.partial(_set_future, future))
374+
else:
375+
raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)")
376+
288377
return Consumer(await future)
289378

290-
async def create_producer(self, topic: str) -> Producer:
379+
async def create_producer(self, topic,
380+
producer_name=None,
381+
schema=pulsar.schema.BytesSchema(),
382+
initial_sequence_id=None,
383+
send_timeout_millis=30000,
384+
compression_type: _pulsar.CompressionType = _pulsar.CompressionType.NONE,
385+
max_pending_messages=1000,
386+
max_pending_messages_across_partitions=50000,
387+
block_if_queue_full=False,
388+
batching_enabled=False,
389+
batching_max_messages=1000,
390+
batching_max_allowed_size_in_bytes=128*1024,
391+
batching_max_publish_delay_ms=10,
392+
chunking_enabled=False,
393+
message_routing_mode: _pulsar.PartitionsRoutingMode = _pulsar.PartitionsRoutingMode.RoundRobinDistribution,
394+
lazy_start_partitioned_producers=False,
395+
properties=None,
396+
batching_type: _pulsar.BatchingType = _pulsar.BatchingType.Default,
397+
encryption_key=None,
398+
crypto_key_reader: Union[None, _pulsar.CryptoKeyReader] = None,
399+
access_mode: _pulsar.ProducerAccessMode = _pulsar.ProducerAccessMode.Shared,
400+
401+
) -> Producer:
291402
"""
292403
Create a new producer on a given topic
293404
@@ -306,8 +417,39 @@ async def create_producer(self, topic: str) -> Producer:
306417
PulsarException
307418
"""
308419
future = asyncio.get_running_loop().create_future()
420+
309421
conf = _pulsar.ProducerConfiguration()
310-
# TODO: add more configs
422+
conf.send_timeout_millis(send_timeout_millis)
423+
conf.compression_type(compression_type)
424+
conf.max_pending_messages(max_pending_messages)
425+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
426+
conf.block_if_queue_full(block_if_queue_full)
427+
conf.batching_enabled(batching_enabled)
428+
conf.batching_max_messages(batching_max_messages)
429+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
430+
conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
431+
conf.partitions_routing_mode(message_routing_mode)
432+
conf.batching_type(batching_type)
433+
conf.chunking_enabled(chunking_enabled)
434+
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
435+
conf.access_mode(access_mode)
436+
if producer_name:
437+
conf.producer_name(producer_name)
438+
if initial_sequence_id:
439+
conf.initial_sequence_id(initial_sequence_id)
440+
if properties:
441+
for k, v in properties.items():
442+
conf.property(k, v)
443+
444+
conf.schema(schema.schema_info())
445+
if encryption_key:
446+
conf.encryption_key(encryption_key)
447+
if crypto_key_reader:
448+
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
449+
450+
if batching_enabled and chunking_enabled:
451+
raise ValueError("Batching and chunking can´t be enabled at the same time")
452+
311453
self._client.create_producer_async(topic, conf, functools.partial(_set_future, future))
312454
return Producer(await future)
313455

@@ -340,3 +482,4 @@ def complete():
340482
future.set_exception(PulsarException(result))
341483

342484
future.get_loop().call_soon_threadsafe(complete)
485+

src/client.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <pybind11/functional.h>
2222
#include <pybind11/pybind11.h>
2323
#include <pybind11/stl.h>
24+
#include <cmath>
2425

2526
namespace py = pybind11;
2627

@@ -53,13 +54,27 @@ Consumer Client_subscribe_topics(Client& client, const std::vector<std::string>&
5354
[&](SubscribeCallback callback) { client.subscribeAsync(topics, subscriptionName, conf, callback); });
5455
}
5556

57+
void Client_subscribe_topicsAsync(Client& client, const std::vector<std::string>& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback){
58+
client.subscribeAsync(topics, subscriptionName, conf, [callback](Result result, pulsar::Consumer consumer){
59+
py::gil_scoped_acquire acquire;
60+
callback(result, consumer);
61+
});
62+
}
63+
5664
Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern,
5765
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
5866
return waitForAsyncValue<Consumer>([&](SubscribeCallback callback) {
5967
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
6068
});
6169
}
6270

71+
void Client_subscribe_patternAsync(Client& client, const std::string& topic_pattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback){
72+
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, [callback](Result result, Consumer consumer){
73+
py::gil_scoped_acquire acquire;
74+
callback(result, consumer);
75+
});
76+
}
77+
6378
Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId,
6479
const ReaderConfiguration& conf) {
6580
return waitForAsyncValue<Reader>(
@@ -94,7 +109,9 @@ void export_client(py::module_& m) {
94109
.def("subscribe", &Client_subscribe)
95110
.def("subscribe_async", &Client_subscribeAsync)
96111
.def("subscribe_topics", &Client_subscribe_topics)
112+
.def("subscribe_topics_async", &Client_subscribe_topicsAsync)
97113
.def("subscribe_pattern", &Client_subscribe_pattern)
114+
.def("subscribe_pattern_async", &Client_subscribe_patternAsync)
98115
.def("create_reader", &Client_createReader)
99116
.def("get_topic_partitions", &Client_getTopicPartitions)
100117
.def("get_schema_info", &Client_getSchemaInfo)

tests/asyncio_test.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
#
2020

2121
import asyncio
22+
from typing import Iterable
2223

2324
from _pulsar import ConsumerType
2425

2526
import pulsar
2627
from pulsar.asyncio import (
2728
Client,
2829
PulsarException,
30+
Consumer
2931
)
3032
from unittest import (
3133
main,
@@ -59,8 +61,6 @@ async def test_batch_send(self):
5961
print(f'{i} was sent to {msg_id}')
6062
self.assertIsInstance(msg_id, pulsar.MessageId)
6163
self.assertEqual(msg_ids[i].ledger_id(), ledger_id)
62-
self.assertEqual(msg_ids[i].entry_id(), entry_id)
63-
self.assertEqual(msg_ids[i].batch_index(), i)
6464

6565
async def test_create_producer_failure(self):
6666
try:
@@ -87,5 +87,53 @@ async def test_close_producer(self):
8787
except PulsarException as e:
8888
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
8989

90+
async def test_subscribe(self):
91+
consumer = await self._client.subscribe('awaitio-test-close-producer', 'test-subscription')
92+
self.assertIsInstance(consumer, Consumer)
93+
94+
async def test_read_and_ack(self):
95+
test_producer = await self._client.create_producer("awaitio-test-consumer-ack")
96+
consumer = await self._client.subscribe('awaitio-test-consumer-ack', 'test-subscription')
97+
98+
await test_producer.send(b"test123")
99+
msg = await consumer.receive()
100+
101+
self.assertEqual(msg.data(), b"test123")
102+
103+
await consumer.acknowledge(msg)
104+
105+
async def test_batch_read_and_ack(self):
106+
test_producer = await self._client.create_producer("awaitio-test-consumer-ack-batch")
107+
consumer = await self._client.subscribe('awaitio-test-consumer-ack-batch', 'test-subscription')
108+
109+
await test_producer.send(b"test123")
110+
msgs = await consumer.batch_receive()
111+
112+
last = None
113+
for msg in msgs:
114+
last = msg
115+
116+
await consumer.acknowledge_cumulative(last)
117+
118+
self.assertIsInstance(msgs, Iterable)
119+
for msg in msgs:
120+
self.assertEqual(b"test123", msg.data())
121+
122+
async def test_consumer_close(self):
123+
consumer = await self._client.subscribe('awaitio-test-consumer-close', 'test-subscription')
124+
await consumer.close()
125+
126+
self.assertFalse(consumer.is_connected)
127+
128+
async def test_consumer_seek(self):
129+
consumer = await self._client.subscribe('awaitio-test-consumer-close', 'test-subscription')
130+
await consumer.seek(consumer.last_message_id)
131+
132+
async def test_consumer_unsubscribe(self):
133+
consumer = await self._client.subscribe('awaitio-test-consumer-close', 'test-subscription')
134+
await consumer.unsubscribe()
135+
136+
137+
90138
if __name__ == '__main__':
91139
main()

0 commit comments

Comments
 (0)