@@ -315,28 +315,7 @@ def __init__(self, service_url, **kwargs) -> None:
315315
316316 # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
317317 async def create_producer (self , topic : str ,
318- producer_name : str | None = None ,
319318 schema : pulsar .schema .Schema | None = None ,
320- initial_sequence_id : int | None = None ,
321- send_timeout_millis : int = 30000 ,
322- compression_type : CompressionType = CompressionType .NONE ,
323- max_pending_messages : int = 1000 ,
324- max_pending_messages_across_partitions : int = 50000 ,
325- block_if_queue_full : bool = False ,
326- batching_enabled : bool = True ,
327- batching_max_messages : int = 1000 ,
328- batching_max_allowed_size_in_bytes : int = 128 * 1024 ,
329- batching_max_publish_delay_ms : int = 10 ,
330- chunking_enabled : bool = False ,
331- message_routing_mode : PartitionsRoutingMode =
332- PartitionsRoutingMode .RoundRobinDistribution ,
333- lazy_start_partitioned_producers : bool = False ,
334- properties : dict | None = None ,
335- batching_type : BatchingType = BatchingType .Default ,
336- encryption_key : str | None = None ,
337- crypto_key_reader : pulsar .CryptoKeyReader | None = None ,
338- access_mode : ProducerAccessMode = ProducerAccessMode .Shared ,
339- message_router : Callable [[pulsar .Message , int ], int ] | None = None ,
340319 ) -> Producer :
341320 """
342321 Create a new producer on a given topic
@@ -345,61 +324,8 @@ async def create_producer(self, topic: str,
345324 ----------
346325 topic: str
347326 The topic name
348- producer_name: str | None, default=None
349- Specify a name for the producer. If not assigned, the system will
350- generate a globally unique name which can be accessed with
351- `Producer.producer_name()`. When specifying a name, it is up to
352- the user to ensure that, for a given topic, the producer name is
353- unique across all Pulsar's clusters.
354327 schema: pulsar.schema.Schema | None, default=None
355328 Define the schema of the data that will be published by this producer.
356- initial_sequence_id: int | None, default=None
357- Set the baseline for the sequence ids for messages published by
358- the producer.
359- send_timeout_millis: int, default=30000
360- If a message is not acknowledged by the server before the
361- send_timeout expires, an error will be reported.
362- compression_type: CompressionType, default=CompressionType.NONE
363- Set the compression type for the producer.
364- max_pending_messages: int, default=1000
365- Set the max size of the queue holding the messages pending to
366- receive an acknowledgment from the broker.
367- max_pending_messages_across_partitions: int, default=50000
368- Set the max size of the queue holding the messages pending to
369- receive an acknowledgment across partitions.
370- block_if_queue_full: bool, default=False
371- Set whether send operations should block when the outgoing
372- message queue is full.
373- batching_enabled: bool, default=True
374- Enable automatic message batching. Note that, unlike the synchronous
375- producer API in ``pulsar.__init__``, batching is enabled by default
376- for the asyncio producer.
377- batching_max_messages: int, default=1000
378- Maximum number of messages in a batch.
379- batching_max_allowed_size_in_bytes: int, default=128*1024
380- Maximum size in bytes of a batch.
381- batching_max_publish_delay_ms: int, default=10
382- The batch interval in milliseconds.
383- chunking_enabled: bool, default=False
384- Enable chunking of large messages.
385- message_routing_mode: PartitionsRoutingMode,
386- default=PartitionsRoutingMode.RoundRobinDistribution
387- Set the message routing mode for the partitioned producer.
388- lazy_start_partitioned_producers: bool, default=False
389- Start partitioned producers lazily on demand.
390- properties: dict | None, default=None
391- Sets the properties for the producer.
392- batching_type: BatchingType, default=BatchingType.Default
393- Sets the batching type for the producer.
394- encryption_key: str | None, default=None
395- The key used for symmetric encryption.
396- crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
397- Symmetric encryption class implementation.
398- access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared
399- Set the type of access mode that the producer requires on the topic.
400- message_router: Callable[[pulsar.Message, int], int] | None, default=None
401- A custom message router function that takes a Message and the
402- number of partitions and returns the partition index.
403329
404330 Returns
405331 -------
@@ -412,52 +338,15 @@ async def create_producer(self, topic: str,
412338 """
413339 if schema is None :
414340 schema = pulsar .schema .BytesSchema ()
341+ schema .attach_client (self ._client )
415342
416343 future = asyncio .get_running_loop ().create_future ()
417344 conf = _pulsar .ProducerConfiguration ()
418- conf .send_timeout_millis (send_timeout_millis )
419- conf .compression_type (compression_type )
420- conf .max_pending_messages (max_pending_messages )
421- conf .max_pending_messages_across_partitions (max_pending_messages_across_partitions )
422- conf .block_if_queue_full (block_if_queue_full )
423- conf .batching_enabled (batching_enabled )
424- conf .batching_max_messages (batching_max_messages )
425- conf .batching_max_allowed_size_in_bytes (batching_max_allowed_size_in_bytes )
426- conf .batching_max_publish_delay_ms (batching_max_publish_delay_ms )
427- conf .partitions_routing_mode (message_routing_mode )
428- conf .batching_type (batching_type )
429- conf .chunking_enabled (chunking_enabled )
430- conf .lazy_start_partitioned_producers (lazy_start_partitioned_producers )
431- conf .access_mode (access_mode )
432- if message_router is not None :
433- def underlying_router (msg , num_partitions ):
434- return int (message_router (pulsar .Message ._wrap (msg ),
435- num_partitions ))
436- conf .message_router (underlying_router )
437-
438- if producer_name :
439- conf .producer_name (producer_name )
440- if initial_sequence_id is not None :
441- conf .initial_sequence_id (initial_sequence_id )
442- if properties :
443- for k , v in properties .items ():
444- conf .property (k , v )
445-
446345 conf .schema (schema .schema_info ())
447- if encryption_key :
448- conf .encryption_key (encryption_key )
449- if crypto_key_reader :
450- conf .crypto_key_reader (crypto_key_reader .cryptoKeyReader )
451-
452- if batching_enabled and chunking_enabled :
453- raise ValueError (
454- "Batching and chunking of messages can't be enabled together."
455- )
456346
457347 self ._client .create_producer_async (
458348 topic , conf , functools .partial (_set_future , future )
459349 )
460- schema .attach_client (self ._client )
461350 return Producer (await future , schema )
462351
463352 # pylint: disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments
0 commit comments