2525
2626import asyncio
2727import functools
28- from typing import Any , List , Union
28+ from typing import Any , Callable , List , Union
2929
3030import _pulsar
3131from _pulsar import (
3232 InitialPosition ,
33+ CompressionType ,
34+ PartitionsRoutingMode ,
35+ BatchingType ,
36+ ProducerAccessMode ,
3337 RegexSubscriptionMode ,
3438 ConsumerCryptoFailureAction ,
3539)
@@ -84,7 +88,17 @@ def __init__(self, producer: _pulsar.Producer, schema: pulsar.schema.Schema) ->
8488 self ._producer = producer
8589 self ._schema = schema
8690
87- async def send (self , content : Any ) -> pulsar .MessageId :
91+ # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
92+ async def send (self , content : Any ,
93+ properties : dict | None = None ,
94+ partition_key : str | None = None ,
95+ ordering_key : str | None = None ,
96+ sequence_id : int | None = None ,
97+ replication_clusters : List [str ] | None = None ,
98+ disable_replication : bool | None = None ,
99+ event_timestamp : int | None = None ,
100+ deliver_at : int | None = None ,
101+ deliver_after : int | None = None ) -> pulsar .MessageId :
88102 """
89103 Send a message asynchronously.
90104
@@ -93,6 +107,28 @@ async def send(self, content: Any) -> pulsar.MessageId:
93107 content: Any
94108 The message payload, whose type should respect the schema defined in
95109 `Client.create_producer`.
110+ properties: dict | None
111+ A dict of application0-defined string properties.
112+ partition_key: str | None
113+ Sets the partition key for the message routing. A hash of this key is
114+ used to determine the message's topic partition.
115+ ordering_key: str | None
116+ Sets the ordering key for the message routing.
117+ sequence_id: int | None
118+ Specify a custom sequence id for the message being published.
119+ replication_clusters: List[str] | None
120+ Override namespace replication clusters. Note that it is the caller's responsibility
121+ to provide valid cluster names and that all clusters have been previously configured
122+ as topics. Given an empty list, the message will replicate per the namespace
123+ configuration.
124+ disable_replication: bool | None
125+ Do not replicate this message.
126+ event_timestamp: int | None
127+ Timestamp in millis of the timestamp of event creation
128+ deliver_at: int | None
129+ Specify the message should not be delivered earlier than the specified timestamp.
130+ deliver_after: int | None
131+ Specify a delay in timedelta for the delivery of the messages.
96132
97133 Returns
98134 -------
@@ -105,6 +141,27 @@ async def send(self, content: Any) -> pulsar.MessageId:
105141 """
106142 builder = _pulsar .MessageBuilder ()
107143 builder .content (self ._schema .encode (content ))
144+
145+ if properties is not None :
146+ for k , v in properties .items ():
147+ builder .property (k , v )
148+ if partition_key is not None :
149+ builder .partition_key (partition_key )
150+ if ordering_key is not None :
151+ builder .ordering_key (ordering_key )
152+ if sequence_id is not None :
153+ builder .sequence_id (sequence_id )
154+ if replication_clusters is not None :
155+ builder .replication_clusters (replication_clusters )
156+ if disable_replication is not None :
157+ builder .disable_replication (disable_replication )
158+ if event_timestamp is not None :
159+ builder .event_timestamp (event_timestamp )
160+ if deliver_at is not None :
161+ builder .deliver_at (deliver_at )
162+ if deliver_after is not None :
163+ builder .deliver_after (deliver_after )
164+
108165 future = asyncio .get_running_loop ().create_future ()
109166 self ._producer .send_async (builder .build (), functools .partial (_set_future , future ))
110167 msg_id = await future
@@ -115,6 +172,18 @@ async def send(self, content: Any) -> pulsar.MessageId:
115172 msg_id .batch_index (),
116173 )
117174
175+ async def flush (self ) -> None :
176+ """
177+ Flush all the messages buffered in the producer asynchronously.
178+
179+ Raises
180+ ------
181+ PulsarException
182+ """
183+ future = asyncio .get_running_loop ().create_future ()
184+ self ._producer .flush_async (functools .partial (_set_future , future , value = None ))
185+ await future
186+
118187 async def close (self ) -> None :
119188 """
120189 Close the producer.
@@ -127,6 +196,25 @@ async def close(self) -> None:
127196 self ._producer .close_async (functools .partial (_set_future , future , value = None ))
128197 await future
129198
199+ def topic (self ):
200+ """
201+ Return the topic which producer is publishing to
202+ """
203+ return self ._producer .topic ()
204+
205+ def producer_name (self ):
206+ """
207+ Return the producer name which could have been assigned by the
208+ system or specified by the client
209+ """
210+ return self ._producer .producer_name ()
211+
212+ def last_sequence_id (self ):
213+ """
214+ Return the last sequence id that was published by this producer.
215+ """
216+ return self ._producer .last_sequence_id ()
217+
130218class Consumer :
131219 """
132220 The Pulsar message consumer, used to subscribe to messages from a topic.
@@ -311,7 +399,28 @@ def __init__(self, service_url, **kwargs) -> None:
311399
312400 # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
313401 async def create_producer (self , topic : str ,
402+ producer_name : str | None = None ,
314403 schema : pulsar .schema .Schema | None = None ,
404+ initial_sequence_id : int | None = None ,
405+ send_timeout_millis : int = 30000 ,
406+ compression_type : CompressionType = CompressionType .NONE ,
407+ max_pending_messages : int = 1000 ,
408+ max_pending_messages_across_partitions : int = 50000 ,
409+ block_if_queue_full : bool = False ,
410+ batching_enabled : bool = True ,
411+ batching_max_messages : int = 1000 ,
412+ batching_max_allowed_size_in_bytes : int = 128 * 1024 ,
413+ batching_max_publish_delay_ms : int = 10 ,
414+ chunking_enabled : bool = False ,
415+ message_routing_mode : PartitionsRoutingMode =
416+ PartitionsRoutingMode .RoundRobinDistribution ,
417+ lazy_start_partitioned_producers : bool = False ,
418+ properties : dict | None = None ,
419+ batching_type : BatchingType = BatchingType .Default ,
420+ encryption_key : str | None = None ,
421+ crypto_key_reader : pulsar .CryptoKeyReader | None = None ,
422+ access_mode : ProducerAccessMode = ProducerAccessMode .Shared ,
423+ message_router : Callable [[pulsar .Message , int ], int ] | None = None ,
315424 ) -> Producer :
316425 """
317426 Create a new producer on a given topic
@@ -320,8 +429,60 @@ async def create_producer(self, topic: str,
320429 ----------
321430 topic: str
322431 The topic name
432+ producer_name: str | None
433+ Specify a name for the producer. If not assigned, the system will generate a globally
434+ unique name which can be accessed with `Producer.producer_name()`. When specifying a
435+ name, it is up to the user to ensure that, for a given topic, the producer name is
436+ unique across all Pulsar's clusters.
323437 schema: pulsar.schema.Schema | None, default=None
324438 Define the schema of the data that will be published by this producer.
439+ initial_sequence_id: int | None, default=None
440+ Set the baseline for the sequence ids for messages published by
441+ the producer.
442+ send_timeout_millis: int, default=30000
443+ If a message is not acknowledged by the server before the
444+ send_timeout expires, an error will be reported.
445+ compression_type: CompressionType, default=CompressionType.NONE
446+ Set the compression type for the producer.
447+ max_pending_messages: int, default=1000
448+ Set the max size of the queue holding the messages pending to
449+ receive an acknowledgment from the broker.
450+ max_pending_messages_across_partitions: int, default=50000
451+ Set the max size of the queue holding the messages pending to
452+ receive an acknowledgment across partitions.
453+ block_if_queue_full: bool, default=False
454+ Set whether send operations should block when the outgoing
455+ message queue is full.
456+ batching_enabled: bool, default=True
457+ Enable automatic message batching. Note that, unlike the synchronous producer API in
458+ ``pulsar.Client.create_producer``, batching is enabled by default for the asyncio
459+ producer.
460+ batching_max_messages: int, default=1000
461+ Maximum number of messages in a batch.
462+ batching_max_allowed_size_in_bytes: int, default=128*1024
463+ Maximum size in bytes of a batch.
464+ batching_max_publish_delay_ms: int, default=10
465+ The batch interval in milliseconds.
466+ chunking_enabled: bool, default=False
467+ Enable chunking of large messages.
468+ message_routing_mode: PartitionsRoutingMode,
469+ default=PartitionsRoutingMode.RoundRobinDistribution
470+ Set the message routing mode for the partitioned producer.
471+ lazy_start_partitioned_producers: bool, default=False
472+ Start partitioned producers lazily on demand.
473+ properties: dict | None, default=None
474+ Sets the properties for the producer.
475+ batching_type: BatchingType, default=BatchingType.Default
476+ Sets the batching type for the producer.
477+ encryption_key: str | None, default=None
478+ The key used for symmetric encryption.
479+ crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
480+ Symmetric encryption class implementation.
481+ access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared
482+ Set the type of access mode that the producer requires on the topic.
483+ message_router: Callable[[pulsar.Message, int], int] | None, default=None
484+ A custom message router function that takes a Message and the
485+ number of partitions and returns the partition index.
325486
326487 Returns
327488 -------
@@ -332,13 +493,45 @@ async def create_producer(self, topic: str,
332493 ------
333494 PulsarException
334495 """
496+ if batching_enabled and chunking_enabled :
497+ raise ValueError ("Batching and chunking of messages can't be enabled together." )
498+
335499 if schema is None :
336500 schema = pulsar .schema .BytesSchema ()
337501 schema .attach_client (self ._client )
338502
339503 future = asyncio .get_running_loop ().create_future ()
340504 conf = _pulsar .ProducerConfiguration ()
505+ if producer_name is not None :
506+ conf .producer_name (producer_name )
341507 conf .schema (schema .schema_info ())
508+ if initial_sequence_id is not None :
509+ conf .initial_sequence_id (initial_sequence_id )
510+ conf .send_timeout_millis (send_timeout_millis )
511+ conf .compression_type (compression_type )
512+ conf .max_pending_messages (max_pending_messages )
513+ conf .max_pending_messages_across_partitions (max_pending_messages_across_partitions )
514+ conf .block_if_queue_full (block_if_queue_full )
515+ conf .batching_enabled (batching_enabled )
516+ conf .batching_max_messages (batching_max_messages )
517+ conf .batching_max_allowed_size_in_bytes (batching_max_allowed_size_in_bytes )
518+ conf .batching_max_publish_delay_ms (batching_max_publish_delay_ms )
519+ conf .chunking_enabled (chunking_enabled )
520+ conf .partitions_routing_mode (message_routing_mode )
521+ conf .lazy_start_partitioned_producers (lazy_start_partitioned_producers )
522+ if properties is not None :
523+ for k , v in properties .items ():
524+ conf .property (k , v )
525+ conf .batching_type (batching_type )
526+ if encryption_key is not None :
527+ conf .encryption_key (encryption_key )
528+ if crypto_key_reader is not None :
529+ conf .crypto_key_reader (crypto_key_reader .cryptoKeyReader )
530+ conf .access_mode (access_mode )
531+ if message_router is not None :
532+ def default_router (msg : _pulsar .Message , num_partitions : int ) -> int :
533+ return int (msg .partition_key ()) % num_partitions
534+ conf .message_router (default_router )
342535
343536 self ._client .create_producer_async (
344537 topic , conf , functools .partial (_set_future , future )
0 commit comments