@@ -341,7 +341,7 @@ async def create_producer(self, topic: str,
341341 producer_name: str | None, default=None
342342 Specify a name for the producer. If not assigned, the system will
343343 generate a globally unique name which can be accessed with
344- `Producer.producer_name()`. When specifying a name, it is app to
344+ `Producer.producer_name()`. When specifying a name, it is up to
345345 the user to ensure that, for a given topic, the producer name is
346346 unique across all Pulsar's clusters.
347347 schema: pulsar.schema.Schema | None, default=None
@@ -458,8 +458,6 @@ async def subscribe(self, topic: Union[str, List[str]],
458458 consumer_type : pulsar .ConsumerType =
459459 pulsar .ConsumerType .Exclusive ,
460460 schema : pulsar .schema .Schema | None = None ,
461- message_listener : Callable [['Consumer' , pulsar .Message ],
462- None ] | None = None ,
463461 receiver_queue_size : int = 1000 ,
464462 max_total_receiver_queue_size_across_partitions : int =
465463 50000 ,
@@ -501,8 +499,6 @@ async def subscribe(self, topic: Union[str, List[str]],
501499 Select the subscription type to be used when subscribing to the topic.
502500 schema: pulsar.schema.Schema | None, default=None
503501 Define the schema of the data that will be received by this consumer.
504- message_listener: Callable[[Consumer, pulsar.Message], None] | None, default=None
505- Sets a message listener for the consumer.
506502 receiver_queue_size: int, default=1000
507503 Sets the size of the consumer receive queue.
508504 max_total_receiver_queue_size_across_partitions: int, default=50000
@@ -572,8 +568,6 @@ async def subscribe(self, topic: Union[str, List[str]],
572568 conf .consumer_type (consumer_type )
573569 conf .regex_subscription_mode (regex_subscription_mode )
574570 conf .read_compacted (is_read_compacted )
575- if message_listener :
576- conf .message_listener (_listener_wrapper (message_listener , schema ))
577571 conf .receiver_queue_size (receiver_queue_size )
578572 conf .max_total_receiver_queue_size_across_partitions (
579573 max_total_receiver_queue_size_across_partitions
@@ -656,12 +650,3 @@ def complete():
656650 else :
657651 future .set_exception (PulsarException (result ))
658652 future .get_loop ().call_soon_threadsafe (complete )
659-
660- def _listener_wrapper (listener , schema ):
661- def wrapper (consumer , msg ):
662- c = Consumer (consumer )
663- m = pulsar .Message ()
664- m ._message = msg
665- m ._schema = schema
666- listener (c , m )
667- return wrapper
0 commit comments