4848import _pulsar
4949
5050from _pulsar import Result , CompressionType , ConsumerType , InitialPosition , PartitionsRoutingMode , BatchingType , \
51- LoggerLevel , BatchReceivePolicy , KeySharedPolicy , KeySharedMode # noqa: F401
51+ LoggerLevel , BatchReceivePolicy , KeySharedPolicy , KeySharedMode , ProducerAccessMode # noqa: F401
5252
5353from pulsar .__about__ import __version__
5454
@@ -523,7 +523,8 @@ def create_producer(self, topic,
523523 properties = None ,
524524 batching_type = BatchingType .Default ,
525525 encryption_key = None ,
526- crypto_key_reader = None
526+ crypto_key_reader = None ,
527+ access_mode = ProducerAccessMode .Shared ,
527528 ):
528529 """
529530 Create a new producer on a given topic.
@@ -614,6 +615,17 @@ def create_producer(self, topic,
614615 crypto_key_reader: CryptoKeyReader, optional
615616 Symmetric encryption class implementation, configuring public key encryption messages for the producer
616617 and private key decryption messages for the consumer
618+ access_mode: ProducerAccessMode, optional
619+ Set the type of access mode that the producer requires on the topic.
620+
621+ Supported modes:
622+
623+ * Shared: By default multiple producers can publish on a topic.
624+ * Exclusive: Require exclusive access for producer.
625+ Fail immediately if there's already a producer connected.
626+ * WaitForExclusive: Producer creation is pending until it can acquire exclusive access.
627+ * ExclusiveWithFencing: Acquire exclusive access for the producer.
628+ Any existing producer will be removed and invalidated immediately.
617629 """
618630 _check_type (str , topic , 'topic' )
619631 _check_type_or_none (str , producer_name , 'producer_name' )
@@ -634,6 +646,7 @@ def create_producer(self, topic,
634646 _check_type_or_none (str , encryption_key , 'encryption_key' )
635647 _check_type_or_none (CryptoKeyReader , crypto_key_reader , 'crypto_key_reader' )
636648 _check_type (bool , lazy_start_partitioned_producers , 'lazy_start_partitioned_producers' )
649+ _check_type (ProducerAccessMode , access_mode , 'access_mode' )
637650
638651 conf = _pulsar .ProducerConfiguration ()
639652 conf .send_timeout_millis (send_timeout_millis )
@@ -649,6 +662,7 @@ def create_producer(self, topic,
649662 conf .batching_type (batching_type )
650663 conf .chunking_enabled (chunking_enabled )
651664 conf .lazy_start_partitioned_producers (lazy_start_partitioned_producers )
665+ conf .access_mode (access_mode )
652666 if producer_name :
653667 conf .producer_name (producer_name )
654668 if initial_sequence_id :
0 commit comments