Skip to content

Commit 76c5bff

Browse files
committed
Improve asynchronous producer with more options for creation and send
1 parent 75495d3 commit 76c5bff

File tree

3 files changed

+240
-4
lines changed

3 files changed

+240
-4
lines changed

pulsar/asyncio.py

Lines changed: 195 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@
2525

2626
import asyncio
2727
import functools
28-
from typing import Any, List, Union
28+
from typing import Any, Callable, List, Union
2929

3030
import _pulsar
3131
from _pulsar import (
3232
InitialPosition,
33+
CompressionType,
34+
PartitionsRoutingMode,
35+
BatchingType,
36+
ProducerAccessMode,
3337
RegexSubscriptionMode,
3438
ConsumerCryptoFailureAction,
3539
)
@@ -84,7 +88,17 @@ def __init__(self, producer: _pulsar.Producer, schema: pulsar.schema.Schema) ->
8488
self._producer = producer
8589
self._schema = schema
8690

87-
async def send(self, content: Any) -> pulsar.MessageId:
91+
# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
92+
async def send(self, content: Any,
93+
properties: dict | None = None,
94+
partition_key: str | None = None,
95+
ordering_key: str | None = None,
96+
sequence_id: int | None = None,
97+
replication_clusters: List[str] | None = None,
98+
disable_replication: bool | None = None,
99+
event_timestamp: int | None = None,
100+
deliver_at: int | None = None,
101+
deliver_after: int | None = None) -> pulsar.MessageId:
88102
"""
89103
Send a message asynchronously.
90104
@@ -93,6 +107,28 @@ async def send(self, content: Any) -> pulsar.MessageId:
93107
content: Any
94108
The message payload, whose type should respect the schema defined in
95109
`Client.create_producer`.
110+
properties: dict | None
111+
A dict of application0-defined string properties.
112+
partition_key: str | None
113+
Sets the partition key for the message routing. A hash of this key is
114+
used to determine the message's topic partition.
115+
ordering_key: str | None
116+
Sets the ordering key for the message routing.
117+
sequence_id: int | None
118+
Specify a custom sequence id for the message being published.
119+
replication_clusters: List[str] | None
120+
Override namespace replication clusters. Note that it is the caller's responsibility
121+
to provide valid cluster names and that all clusters have been previously configured
122+
as topics. Given an empty list, the message will replicate per the namespace
123+
configuration.
124+
disable_replication: bool | None
125+
Do not replicate this message.
126+
event_timestamp: int | None
127+
Timestamp in millis of the timestamp of event creation
128+
deliver_at: int | None
129+
Specify the message should not be delivered earlier than the specified timestamp.
130+
deliver_after: int | None
131+
Specify a delay in timedelta for the delivery of the messages.
96132
97133
Returns
98134
-------
@@ -105,6 +141,27 @@ async def send(self, content: Any) -> pulsar.MessageId:
105141
"""
106142
builder = _pulsar.MessageBuilder()
107143
builder.content(self._schema.encode(content))
144+
145+
if properties is not None:
146+
for k, v in properties.items():
147+
builder.property(k, v)
148+
if partition_key is not None:
149+
builder.partition_key(partition_key)
150+
if ordering_key is not None:
151+
builder.ordering_key(ordering_key)
152+
if sequence_id is not None:
153+
builder.sequence_id(sequence_id)
154+
if replication_clusters is not None:
155+
builder.replication_clusters(replication_clusters)
156+
if disable_replication is not None:
157+
builder.disable_replication(disable_replication)
158+
if event_timestamp is not None:
159+
builder.event_timestamp(event_timestamp)
160+
if deliver_at is not None:
161+
builder.deliver_at(deliver_at)
162+
if deliver_after is not None:
163+
builder.deliver_after(deliver_after)
164+
108165
future = asyncio.get_running_loop().create_future()
109166
self._producer.send_async(builder.build(), functools.partial(_set_future, future))
110167
msg_id = await future
@@ -115,6 +172,18 @@ async def send(self, content: Any) -> pulsar.MessageId:
115172
msg_id.batch_index(),
116173
)
117174

175+
async def flush(self) -> None:
176+
"""
177+
Flush all the messages buffered in the producer asynchronously.
178+
179+
Raises
180+
------
181+
PulsarException
182+
"""
183+
future = asyncio.get_running_loop().create_future()
184+
self._producer.flush_async(functools.partial(_set_future, future, value=None))
185+
await future
186+
118187
async def close(self) -> None:
119188
"""
120189
Close the producer.
@@ -127,6 +196,25 @@ async def close(self) -> None:
127196
self._producer.close_async(functools.partial(_set_future, future, value=None))
128197
await future
129198

199+
def topic(self):
200+
"""
201+
Return the topic which producer is publishing to
202+
"""
203+
return self._producer.topic()
204+
205+
def producer_name(self):
206+
"""
207+
Return the producer name which could have been assigned by the
208+
system or specified by the client
209+
"""
210+
return self._producer.producer_name()
211+
212+
def last_sequence_id(self):
213+
"""
214+
Return the last sequence id that was published by this producer.
215+
"""
216+
return self._producer.last_sequence_id()
217+
130218
class Consumer:
131219
"""
132220
The Pulsar message consumer, used to subscribe to messages from a topic.
@@ -311,7 +399,28 @@ def __init__(self, service_url, **kwargs) -> None:
311399

312400
# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
313401
async def create_producer(self, topic: str,
402+
producer_name: str | None = None,
314403
schema: pulsar.schema.Schema | None = None,
404+
initial_sequence_id: int | None = None,
405+
send_timeout_millis: int = 30000,
406+
compression_type: CompressionType = CompressionType.NONE,
407+
max_pending_messages: int = 1000,
408+
max_pending_messages_across_partitions: int = 50000,
409+
block_if_queue_full: bool = False,
410+
batching_enabled: bool = True,
411+
batching_max_messages: int = 1000,
412+
batching_max_allowed_size_in_bytes: int = 128*1024,
413+
batching_max_publish_delay_ms: int = 10,
414+
chunking_enabled: bool = False,
415+
message_routing_mode: PartitionsRoutingMode =
416+
PartitionsRoutingMode.RoundRobinDistribution,
417+
lazy_start_partitioned_producers: bool = False,
418+
properties: dict | None = None,
419+
batching_type: BatchingType = BatchingType.Default,
420+
encryption_key: str | None = None,
421+
crypto_key_reader: pulsar.CryptoKeyReader | None = None,
422+
access_mode: ProducerAccessMode = ProducerAccessMode.Shared,
423+
message_router: Callable[[pulsar.Message, int], int] | None = None,
315424
) -> Producer:
316425
"""
317426
Create a new producer on a given topic
@@ -320,8 +429,60 @@ async def create_producer(self, topic: str,
320429
----------
321430
topic: str
322431
The topic name
432+
producer_name: str | None
433+
Specify a name for the producer. If not assigned, the system will generate a globally
434+
unique name which can be accessed with `Producer.producer_name()`. When specifying a
435+
name, it is up to the user to ensure that, for a given topic, the producer name is
436+
unique across all Pulsar's clusters.
323437
schema: pulsar.schema.Schema | None, default=None
324438
Define the schema of the data that will be published by this producer.
439+
initial_sequence_id: int | None, default=None
440+
Set the baseline for the sequence ids for messages published by
441+
the producer.
442+
send_timeout_millis: int, default=30000
443+
If a message is not acknowledged by the server before the
444+
send_timeout expires, an error will be reported.
445+
compression_type: CompressionType, default=CompressionType.NONE
446+
Set the compression type for the producer.
447+
max_pending_messages: int, default=1000
448+
Set the max size of the queue holding the messages pending to
449+
receive an acknowledgment from the broker.
450+
max_pending_messages_across_partitions: int, default=50000
451+
Set the max size of the queue holding the messages pending to
452+
receive an acknowledgment across partitions.
453+
block_if_queue_full: bool, default=False
454+
Set whether send operations should block when the outgoing
455+
message queue is full.
456+
batching_enabled: bool, default=True
457+
Enable automatic message batching. Note that, unlike the synchronous producer API in
458+
``pulsar.Client.create_producer``, batching is enabled by default for the asyncio
459+
producer.
460+
batching_max_messages: int, default=1000
461+
Maximum number of messages in a batch.
462+
batching_max_allowed_size_in_bytes: int, default=128*1024
463+
Maximum size in bytes of a batch.
464+
batching_max_publish_delay_ms: int, default=10
465+
The batch interval in milliseconds.
466+
chunking_enabled: bool, default=False
467+
Enable chunking of large messages.
468+
message_routing_mode: PartitionsRoutingMode,
469+
default=PartitionsRoutingMode.RoundRobinDistribution
470+
Set the message routing mode for the partitioned producer.
471+
lazy_start_partitioned_producers: bool, default=False
472+
Start partitioned producers lazily on demand.
473+
properties: dict | None, default=None
474+
Sets the properties for the producer.
475+
batching_type: BatchingType, default=BatchingType.Default
476+
Sets the batching type for the producer.
477+
encryption_key: str | None, default=None
478+
The key used for symmetric encryption.
479+
crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
480+
Symmetric encryption class implementation.
481+
access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared
482+
Set the type of access mode that the producer requires on the topic.
483+
message_router: Callable[[pulsar.Message, int], int] | None, default=None
484+
A custom message router function that takes a Message and the
485+
number of partitions and returns the partition index.
325486
326487
Returns
327488
-------
@@ -332,13 +493,45 @@ async def create_producer(self, topic: str,
332493
------
333494
PulsarException
334495
"""
496+
if batching_enabled and chunking_enabled:
497+
raise ValueError("Batching and chunking of messages can't be enabled together.")
498+
335499
if schema is None:
336500
schema = pulsar.schema.BytesSchema()
337501
schema.attach_client(self._client)
338502

339503
future = asyncio.get_running_loop().create_future()
340504
conf = _pulsar.ProducerConfiguration()
505+
if producer_name is not None:
506+
conf.producer_name(producer_name)
341507
conf.schema(schema.schema_info())
508+
if initial_sequence_id is not None:
509+
conf.initial_sequence_id(initial_sequence_id)
510+
conf.send_timeout_millis(send_timeout_millis)
511+
conf.compression_type(compression_type)
512+
conf.max_pending_messages(max_pending_messages)
513+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
514+
conf.block_if_queue_full(block_if_queue_full)
515+
conf.batching_enabled(batching_enabled)
516+
conf.batching_max_messages(batching_max_messages)
517+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
518+
conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
519+
conf.chunking_enabled(chunking_enabled)
520+
conf.partitions_routing_mode(message_routing_mode)
521+
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
522+
if properties is not None:
523+
for k, v in properties.items():
524+
conf.property(k, v)
525+
conf.batching_type(batching_type)
526+
if encryption_key is not None:
527+
conf.encryption_key(encryption_key)
528+
if crypto_key_reader is not None:
529+
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
530+
conf.access_mode(access_mode)
531+
if message_router is not None:
532+
def default_router(msg: _pulsar.Message, num_partitions: int) -> int:
533+
return int(msg.partition_key()) % num_partitions
534+
conf.message_router(default_router)
342535

343536
self._client.create_producer_async(
344537
topic, conf, functools.partial(_set_future, future)

src/producer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,6 @@ void export_producer(py::module_& m) {
8282
"successfully persisted\n")
8383
.def("close", &Producer_close)
8484
.def("close_async", &Producer_closeAsync)
85-
.def("is_connected", &Producer::isConnected);
85+
.def("is_connected", &Producer::isConnected)
86+
.def("flush_async", &Producer::flushAsync);
8687
}

tests/asyncio_test.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,18 @@ async def asyncTearDown(self) -> None:
6060

6161
async def test_batch_end_to_end(self):
6262
topic = f'asyncio-test-batch-e2e-{time.time()}'
63-
producer = await self._client.create_producer(topic)
63+
producer = await self._client.create_producer(topic,
64+
producer_name="my-producer")
65+
self.assertEqual(producer.topic(), f'persistent://public/default/{topic}')
66+
self.assertEqual(producer.producer_name(), "my-producer")
6467
tasks = []
6568
for i in range(5):
6669
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
6770
msg_ids = await asyncio.gather(*tasks)
6871
self.assertEqual(len(msg_ids), 5)
72+
# pylint: disable=fixme
73+
# TODO: the result is wrong due to https://github.com/apache/pulsar-client-cpp/issues/531
74+
self.assertEqual(producer.last_sequence_id(), 8)
6975
ledger_id = msg_ids[0].ledger_id()
7076
entry_id = msg_ids[0].entry_id()
7177
# These messages should be in the same entry
@@ -90,6 +96,42 @@ async def test_batch_end_to_end(self):
9096
msg = await consumer.receive()
9197
self.assertEqual(msg.data(), b'final-message')
9298

99+
async def test_send_keyed_message(self):
100+
topic = f'asyncio-test-send-keyed-message-{time.time()}'
101+
producer = await self._client.create_producer(topic)
102+
consumer = await self._client.subscribe(topic, 'sub')
103+
await producer.send(b'msg', partition_key='key0',
104+
ordering_key="key1", properties={'my-prop': 'my-value'})
105+
106+
msg = await consumer.receive()
107+
self.assertEqual(msg.data(), b'msg')
108+
self.assertEqual(msg.partition_key(), 'key0')
109+
self.assertEqual(msg.ordering_key(), 'key1')
110+
self.assertEqual(msg.properties(), {'my-prop': 'my-value'})
111+
112+
async def test_flush(self):
113+
topic = f'asyncio-test-flush-{time.time()}'
114+
producer = await self._client.create_producer(topic, batching_max_messages=3,
115+
batching_max_publish_delay_ms=60000)
116+
tasks = []
117+
tasks.append(asyncio.create_task(producer.send(b'msg-0')))
118+
tasks.append(asyncio.create_task(producer.send(b'msg-1')))
119+
120+
done, pending = await asyncio.wait(tasks, timeout=1, return_when=asyncio.FIRST_COMPLETED)
121+
self.assertEqual(len(done), 0)
122+
self.assertEqual(len(pending), 2)
123+
124+
# flush will trigger sending the batched messages
125+
await producer.flush()
126+
for task in pending:
127+
self.assertTrue(task.done())
128+
msg_id0 = tasks[0].result()
129+
msg_id1 = tasks[1].result()
130+
self.assertEqual(msg_id0.ledger_id(), msg_id1.ledger_id())
131+
self.assertEqual(msg_id0.entry_id(), msg_id1.entry_id())
132+
self.assertEqual(msg_id0.batch_index(), 0)
133+
self.assertEqual(msg_id1.batch_index(), 1)
134+
93135
async def test_create_producer_failure(self):
94136
try:
95137
await self._client.create_producer('tenant/ns/asyncio-test-send-failure')

0 commit comments

Comments
 (0)