@@ -344,13 +344,11 @@ async def subscribe(self, topic, subscription_name,
344344 batch_index_ack_enabled = False ,
345345 regex_subscription_mode : _pulsar .RegexSubscriptionMode = _pulsar .RegexSubscriptionMode .PersistentOnly ,
346346 dead_letter_policy : Union [None , pulsar .ConsumerDeadLetterPolicy ] = None ,) -> Consumer :
347- print ("subscribe called" )
348347 conf = _pulsar .ConsumerConfiguration ()
349348 conf .consumer_type (consumer_type )
350349 conf .regex_subscription_mode (regex_subscription_mode )
351350 conf .read_compacted (is_read_compacted )
352351
353- print ("core conf set" )
354352
355353 if message_listener :
356354 conf .message_listener (_listener_wrapper (message_listener , schema ))
@@ -386,23 +384,16 @@ async def subscribe(self, topic, subscription_name,
386384 if dead_letter_policy :
387385 conf .dead_letter_policy (dead_letter_policy .policy ())
388386
389- print ("opt conf set" )
390-
391387 future = asyncio .get_running_loop ().create_future ()
392388
393- print ("future created" )
394-
395389 c = Consumer (None )
396390 if isinstance (topic , str ):
397- print ("single" )
398391 self ._client .subscribe_async (topic , subscription_name , conf , functools .partial (_set_future , future ))
399392 c ._consumer = await future
400393 elif isinstance (topic , list ):
401- print ("multi" )
402394 self ._client .subscribe_topics_async (topic , subscription_name , conf , functools .partial (_set_future , future ))
403395 c ._consumer = await future
404396 elif isinstance (topic , pulsar ._retype ):
405- print ("regex" )
406397 self ._client .subscribe_pattern_async (topic , subscription_name , conf , functools .partial (_set_future , future ))
407398 c ._consumer = await future
408399 else :
@@ -412,7 +403,6 @@ async def subscribe(self, topic, subscription_name,
412403 c ._schema = schema
413404 c ._schema .attach_client (self ._client )
414405
415- print ("consumer created" )
416406 self ._consumers .append (c )
417407
418408 return c
0 commit comments