Skip to content

Commit 9432cb8

Browse files
Improve asynchronous producer with more options for creation and send (#280)
1 parent 961411f commit 9432cb8

File tree

3 files changed

+246
-4
lines changed

3 files changed

+246
-4
lines changed

pulsar/asyncio.py

Lines changed: 201 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,16 @@
2525

2626
import asyncio
2727
import functools
28-
from typing import Any, List, Union
28+
from datetime import timedelta
29+
from typing import Any, Callable, List, Union
2930

3031
import _pulsar
3132
from _pulsar import (
3233
InitialPosition,
34+
CompressionType,
35+
PartitionsRoutingMode,
36+
BatchingType,
37+
ProducerAccessMode,
3338
RegexSubscriptionMode,
3439
ConsumerCryptoFailureAction,
3540
)
@@ -84,7 +89,17 @@ def __init__(self, producer: _pulsar.Producer, schema: pulsar.schema.Schema) ->
8489
self._producer = producer
8590
self._schema = schema
8691

87-
async def send(self, content: Any) -> pulsar.MessageId:
92+
# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
93+
async def send(self, content: Any,
94+
properties: dict | None = None,
95+
partition_key: str | None = None,
96+
ordering_key: str | None = None,
97+
sequence_id: int | None = None,
98+
replication_clusters: List[str] | None = None,
99+
disable_replication: bool | None = None,
100+
event_timestamp: int | None = None,
101+
deliver_at: int | None = None,
102+
deliver_after: timedelta | None = None) -> pulsar.MessageId:
88103
"""
89104
Send a message asynchronously.
90105
@@ -93,6 +108,28 @@ async def send(self, content: Any) -> pulsar.MessageId:
93108
content: Any
94109
The message payload, whose type should respect the schema defined in
95110
`Client.create_producer`.
111+
properties: dict | None
112+
A dict of application-defined string properties.
113+
partition_key: str | None
114+
Sets the partition key for the message routing. A hash of this key is
115+
used to determine the message's topic partition.
116+
ordering_key: str | None
117+
Sets the ordering key for the message routing.
118+
sequence_id: int | None
119+
Specify a custom sequence id for the message being published.
120+
replication_clusters: List[str] | None
121+
Override namespace replication clusters. Note that it is the caller's responsibility
122+
to provide valid cluster names and that all clusters have been previously configured
123+
as topics. Given an empty list, the message will replicate per the namespace
124+
configuration.
125+
disable_replication: bool | None
126+
Do not replicate this message.
127+
event_timestamp: int | None
128+
Timestamp in millis of the timestamp of event creation
129+
deliver_at: int | None
130+
Specify the message should not be delivered earlier than the specified timestamp.
131+
deliver_after: timedelta | None
132+
Specify a delay in timedelta for the delivery of the messages.
96133
97134
Returns
98135
-------
@@ -105,6 +142,27 @@ async def send(self, content: Any) -> pulsar.MessageId:
105142
"""
106143
builder = _pulsar.MessageBuilder()
107144
builder.content(self._schema.encode(content))
145+
146+
if properties is not None:
147+
for k, v in properties.items():
148+
builder.property(k, v)
149+
if partition_key is not None:
150+
builder.partition_key(partition_key)
151+
if ordering_key is not None:
152+
builder.ordering_key(ordering_key)
153+
if sequence_id is not None:
154+
builder.sequence_id(sequence_id)
155+
if replication_clusters is not None:
156+
builder.replication_clusters(replication_clusters)
157+
if disable_replication is not None:
158+
builder.disable_replication(disable_replication)
159+
if event_timestamp is not None:
160+
builder.event_timestamp(event_timestamp)
161+
if deliver_at is not None:
162+
builder.deliver_at(deliver_at)
163+
if deliver_after is not None:
164+
builder.deliver_after(deliver_after)
165+
108166
future = asyncio.get_running_loop().create_future()
109167
self._producer.send_async(builder.build(), functools.partial(_set_future, future))
110168
msg_id = await future
@@ -115,6 +173,18 @@ async def send(self, content: Any) -> pulsar.MessageId:
115173
msg_id.batch_index(),
116174
)
117175

176+
async def flush(self) -> None:
177+
"""
178+
Flush all the messages buffered in the producer asynchronously.
179+
180+
Raises
181+
------
182+
PulsarException
183+
"""
184+
future = asyncio.get_running_loop().create_future()
185+
self._producer.flush_async(functools.partial(_set_future, future, value=None))
186+
await future
187+
118188
async def close(self) -> None:
119189
"""
120190
Close the producer.
@@ -127,6 +197,30 @@ async def close(self) -> None:
127197
self._producer.close_async(functools.partial(_set_future, future, value=None))
128198
await future
129199

200+
def topic(self):
201+
"""
202+
Return the topic which producer is publishing to
203+
"""
204+
return self._producer.topic()
205+
206+
def producer_name(self):
207+
"""
208+
Return the producer name which could have been assigned by the
209+
system or specified by the client
210+
"""
211+
return self._producer.producer_name()
212+
213+
def last_sequence_id(self):
214+
"""
215+
Return the last sequence id that was published and acknowledged by this producer.
216+
217+
The sequence id can be either automatically assigned or custom set on the message.
218+
After recreating a producer with the same name, this will return the sequence id
219+
of the last message that was published in the previous session, or -1 if no
220+
message was ever published.
221+
"""
222+
return self._producer.last_sequence_id()
223+
130224
class Consumer:
131225
"""
132226
The Pulsar message consumer, used to subscribe to messages from a topic.
@@ -311,7 +405,28 @@ def __init__(self, service_url, **kwargs) -> None:
311405

312406
# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
313407
async def create_producer(self, topic: str,
408+
producer_name: str | None = None,
314409
schema: pulsar.schema.Schema | None = None,
410+
initial_sequence_id: int | None = None,
411+
send_timeout_millis: int = 30000,
412+
compression_type: CompressionType = CompressionType.NONE,
413+
max_pending_messages: int = 1000,
414+
max_pending_messages_across_partitions: int = 50000,
415+
block_if_queue_full: bool = False,
416+
batching_enabled: bool = True,
417+
batching_max_messages: int = 1000,
418+
batching_max_allowed_size_in_bytes: int = 128*1024,
419+
batching_max_publish_delay_ms: int = 10,
420+
chunking_enabled: bool = False,
421+
message_routing_mode: PartitionsRoutingMode =
422+
PartitionsRoutingMode.RoundRobinDistribution,
423+
lazy_start_partitioned_producers: bool = False,
424+
properties: dict | None = None,
425+
batching_type: BatchingType = BatchingType.Default,
426+
encryption_key: str | None = None,
427+
crypto_key_reader: pulsar.CryptoKeyReader | None = None,
428+
access_mode: ProducerAccessMode = ProducerAccessMode.Shared,
429+
message_router: Callable[[pulsar.Message, int], int] | None = None,
315430
) -> Producer:
316431
"""
317432
Create a new producer on a given topic
@@ -320,8 +435,60 @@ async def create_producer(self, topic: str,
320435
----------
321436
topic: str
322437
The topic name
438+
producer_name: str | None
439+
Specify a name for the producer. If not assigned, the system will generate a globally
440+
unique name which can be accessed with `Producer.producer_name()`. When specifying a
441+
name, it is up to the user to ensure that, for a given topic, the producer name is
442+
unique across all Pulsar's clusters.
323443
schema: pulsar.schema.Schema | None, default=None
324444
Define the schema of the data that will be published by this producer.
445+
initial_sequence_id: int | None, default=None
446+
Set the baseline for the sequence ids for messages published by
447+
the producer.
448+
send_timeout_millis: int, default=30000
449+
If a message is not acknowledged by the server before the
450+
send_timeout expires, an error will be reported.
451+
compression_type: CompressionType, default=CompressionType.NONE
452+
Set the compression type for the producer.
453+
max_pending_messages: int, default=1000
454+
Set the max size of the queue holding the messages pending to
455+
receive an acknowledgment from the broker.
456+
max_pending_messages_across_partitions: int, default=50000
457+
Set the max size of the queue holding the messages pending to
458+
receive an acknowledgment across partitions.
459+
block_if_queue_full: bool, default=False
460+
Set whether send operations should block when the outgoing
461+
message queue is full.
462+
batching_enabled: bool, default=True
463+
Enable automatic message batching. Note that, unlike the synchronous producer API in
464+
``pulsar.Client.create_producer``, batching is enabled by default for the asyncio
465+
producer.
466+
batching_max_messages: int, default=1000
467+
Maximum number of messages in a batch.
468+
batching_max_allowed_size_in_bytes: int, default=128*1024
469+
Maximum size in bytes of a batch.
470+
batching_max_publish_delay_ms: int, default=10
471+
The batch interval in milliseconds.
472+
chunking_enabled: bool, default=False
473+
Enable chunking of large messages.
474+
message_routing_mode: PartitionsRoutingMode,
475+
default=PartitionsRoutingMode.RoundRobinDistribution
476+
Set the message routing mode for the partitioned producer.
477+
lazy_start_partitioned_producers: bool, default=False
478+
Start partitioned producers lazily on demand.
479+
properties: dict | None, default=None
480+
Sets the properties for the producer.
481+
batching_type: BatchingType, default=BatchingType.Default
482+
Sets the batching type for the producer.
483+
encryption_key: str | None, default=None
484+
The key used for symmetric encryption.
485+
crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
486+
Symmetric encryption class implementation.
487+
access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared
488+
Set the type of access mode that the producer requires on the topic.
489+
message_router: Callable[[pulsar.Message, int], int] | None, default=None
490+
A custom message router function that takes a Message and the
491+
number of partitions and returns the partition index.
325492
326493
Returns
327494
-------
@@ -332,13 +499,45 @@ async def create_producer(self, topic: str,
332499
------
333500
PulsarException
334501
"""
502+
if batching_enabled and chunking_enabled:
503+
raise ValueError("Batching and chunking of messages can't be enabled together.")
504+
335505
if schema is None:
336506
schema = pulsar.schema.BytesSchema()
337507
schema.attach_client(self._client)
338508

339509
future = asyncio.get_running_loop().create_future()
340510
conf = _pulsar.ProducerConfiguration()
511+
if producer_name is not None:
512+
conf.producer_name(producer_name)
341513
conf.schema(schema.schema_info())
514+
if initial_sequence_id is not None:
515+
conf.initial_sequence_id(initial_sequence_id)
516+
conf.send_timeout_millis(send_timeout_millis)
517+
conf.compression_type(compression_type)
518+
conf.max_pending_messages(max_pending_messages)
519+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
520+
conf.block_if_queue_full(block_if_queue_full)
521+
conf.batching_enabled(batching_enabled)
522+
conf.batching_max_messages(batching_max_messages)
523+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
524+
conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
525+
conf.chunking_enabled(chunking_enabled)
526+
conf.partitions_routing_mode(message_routing_mode)
527+
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
528+
if properties is not None:
529+
for k, v in properties.items():
530+
conf.property(k, v)
531+
conf.batching_type(batching_type)
532+
if encryption_key is not None:
533+
conf.encryption_key(encryption_key)
534+
if crypto_key_reader is not None:
535+
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
536+
conf.access_mode(access_mode)
537+
if message_router is not None:
538+
def underlying_router(msg: _pulsar.Message, num_partitions: int) -> int:
539+
return message_router(pulsar.Message._wrap(msg), num_partitions)
540+
conf.message_router(underlying_router)
342541

343542
self._client.create_producer_async(
344543
topic, conf, functools.partial(_set_future, future)

src/producer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,6 @@ void export_producer(py::module_& m) {
8282
"successfully persisted\n")
8383
.def("close", &Producer_close)
8484
.def("close_async", &Producer_closeAsync)
85-
.def("is_connected", &Producer::isConnected);
85+
.def("is_connected", &Producer::isConnected)
86+
.def("flush_async", &Producer::flushAsync);
8687
}

tests/asyncio_test.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,18 @@ async def asyncTearDown(self) -> None:
6060

6161
async def test_batch_end_to_end(self):
6262
topic = f'asyncio-test-batch-e2e-{time.time()}'
63-
producer = await self._client.create_producer(topic)
63+
producer = await self._client.create_producer(topic,
64+
producer_name="my-producer")
65+
self.assertEqual(producer.topic(), f'persistent://public/default/{topic}')
66+
self.assertEqual(producer.producer_name(), "my-producer")
6467
tasks = []
6568
for i in range(5):
6669
tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode())))
6770
msg_ids = await asyncio.gather(*tasks)
6871
self.assertEqual(len(msg_ids), 5)
72+
# pylint: disable=fixme
73+
# TODO: the result is wrong due to https://github.com/apache/pulsar-client-cpp/issues/531
74+
self.assertEqual(producer.last_sequence_id(), 8)
6975
ledger_id = msg_ids[0].ledger_id()
7076
entry_id = msg_ids[0].entry_id()
7177
# These messages should be in the same entry
@@ -90,6 +96,42 @@ async def test_batch_end_to_end(self):
9096
msg = await consumer.receive()
9197
self.assertEqual(msg.data(), b'final-message')
9298

99+
async def test_send_keyed_message(self):
100+
topic = f'asyncio-test-send-keyed-message-{time.time()}'
101+
producer = await self._client.create_producer(topic)
102+
consumer = await self._client.subscribe(topic, 'sub')
103+
await producer.send(b'msg', partition_key='key0',
104+
ordering_key="key1", properties={'my-prop': 'my-value'})
105+
106+
msg = await consumer.receive()
107+
self.assertEqual(msg.data(), b'msg')
108+
self.assertEqual(msg.partition_key(), 'key0')
109+
self.assertEqual(msg.ordering_key(), 'key1')
110+
self.assertEqual(msg.properties(), {'my-prop': 'my-value'})
111+
112+
async def test_flush(self):
113+
topic = f'asyncio-test-flush-{time.time()}'
114+
producer = await self._client.create_producer(topic, batching_max_messages=3,
115+
batching_max_publish_delay_ms=60000)
116+
tasks = []
117+
tasks.append(asyncio.create_task(producer.send(b'msg-0')))
118+
tasks.append(asyncio.create_task(producer.send(b'msg-1')))
119+
120+
done, pending = await asyncio.wait(tasks, timeout=1, return_when=asyncio.FIRST_COMPLETED)
121+
self.assertEqual(len(done), 0)
122+
self.assertEqual(len(pending), 2)
123+
124+
# flush will trigger sending the batched messages
125+
await producer.flush()
126+
for task in pending:
127+
self.assertTrue(task.done())
128+
msg_id0 = tasks[0].result()
129+
msg_id1 = tasks[1].result()
130+
self.assertEqual(msg_id0.ledger_id(), msg_id1.ledger_id())
131+
self.assertEqual(msg_id0.entry_id(), msg_id1.entry_id())
132+
self.assertEqual(msg_id0.batch_index(), 0)
133+
self.assertEqual(msg_id1.batch_index(), 1)
134+
93135
async def test_create_producer_failure(self):
94136
try:
95137
await self._client.create_producer('tenant/ns/asyncio-test-send-failure')

0 commit comments

Comments
 (0)