@@ -99,7 +99,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
9999 def __del__ (self ):
100100 if not self ._closed :
101101 try :
102- logger .warning ("Topic reader was not closed properly. Consider using method close()." )
102+ logger .debug ("Topic reader was not closed properly. Consider using method close()." )
103103 task = self ._loop .create_task (self .close (flush = False ))
104104 topic_common .wrap_set_name_for_asyncio_task (task , task_name = "close reader" )
105105 except BaseException :
@@ -121,6 +121,7 @@ async def receive_batch(
121121
122122 use asyncio.wait_for for wait with timeout.
123123 """
124+ logger .debug ("receive_batch max_messages=%s" , max_messages )
124125 await self ._reconnector .wait_message ()
125126 return self ._reconnector .receive_batch_nowait (
126127 max_messages = max_messages ,
@@ -137,6 +138,7 @@ async def receive_batch_with_tx(
137138
138139 use asyncio.wait_for for wait with timeout.
139140 """
141+ logger .debug ("receive_batch_with_tx tx=%s max_messages=%s" , tx , max_messages )
140142 await self ._reconnector .wait_message ()
141143 return self ._reconnector .receive_batch_with_tx_nowait (
142144 tx = tx ,
@@ -149,6 +151,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
149151
150152 use asyncio.wait_for for wait with timeout.
151153 """
154+ logger .debug ("receive_message" )
152155 await self ._reconnector .wait_message ()
153156 return self ._reconnector .receive_message_nowait ()
154157
@@ -159,6 +162,7 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
159162 For the method no way check the commit result
160163 (for example if lost connection - commits will not re-send and committed messages will receive again).
161164 """
165+ logger .debug ("commit message or batch" )
162166 if self ._settings .consumer is None :
163167 raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
164168
@@ -177,6 +181,7 @@ async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, dat
177181 before receive commit ack. Message may be acked or not (if not - it will send in other read session,
178182 to this or other reader).
179183 """
184+ logger .debug ("commit_with_ack message or batch" )
180185 if self ._settings .consumer is None :
181186 raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
182187
@@ -187,8 +192,10 @@ async def close(self, flush: bool = True):
187192 if self ._closed :
188193 raise TopicReaderClosedError ()
189194
195+ logger .debug ("Close topic reader" )
190196 self ._closed = True
191197 await self ._reconnector .close (flush )
198+ logger .debug ("Topic reader was closed" )
192199
193200 @property
194201 def read_session_id (self ) -> Optional [str ]:
@@ -214,11 +221,12 @@ def __init__(
214221 settings : topic_reader .PublicReaderSettings ,
215222 loop : Optional [asyncio .AbstractEventLoop ] = None ,
216223 ):
217- self ._id = self ._static_reader_reconnector_counter .inc_and_get ()
224+ self ._id = ReaderReconnector ._static_reader_reconnector_counter .inc_and_get ()
218225 self ._settings = settings
219226 self ._driver = driver
220227 self ._loop = loop if loop is not None else asyncio .get_running_loop ()
221228 self ._background_tasks = set ()
229+ logger .debug ("init reader reconnector id=%s" , self ._id )
222230
223231 self ._state_changed = asyncio .Event ()
224232 self ._stream_reader = None
@@ -231,13 +239,16 @@ async def _connection_loop(self):
231239 attempt = 0
232240 while True :
233241 try :
242+ logger .debug ("reader %s connect attempt %s" , self ._id , attempt )
234243 self ._stream_reader = await ReaderStream .create (self ._id , self ._driver , self ._settings )
244+ logger .debug ("reader %s connected stream %s" , self ._id , self ._stream_reader ._id )
235245 attempt = 0
236246 self ._state_changed .set ()
237247 await self ._stream_reader .wait_error ()
238248 except BaseException as err :
239249 retry_info = check_retriable_error (err , self ._settings ._retry_settings (), attempt )
240250 if not retry_info .is_retriable :
251+ logger .debug ("reader %s stop connection loop due to %s" , self ._id , err )
241252 self ._set_first_error (err )
242253 return
243254
@@ -358,6 +369,7 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
358369 return self ._stream_reader .commit (batch )
359370
360371 async def close (self , flush : bool ):
372+ logger .debug ("reader reconnector %s close" , self ._id )
361373 if self ._stream_reader :
362374 await self ._stream_reader .close (flush )
363375 for task in self ._background_tasks :
@@ -447,6 +459,8 @@ def __init__(
447459
448460 self ._settings = settings
449461
462+ logger .debug ("created ReaderStream id=%s reconnector=%s" , self ._id , self ._reader_reconnector_id )
463+
450464 @staticmethod
451465 async def create (
452466 reader_reconnector_id : int ,
@@ -464,6 +478,7 @@ async def create(
464478 get_token_function = creds .get_auth_token if creds else None ,
465479 )
466480 await reader ._start (stream , settings ._init_message ())
481+ logger .debug ("reader stream %s started session=%s" , reader ._id , reader ._session_id )
467482 return reader
468483
469484 async def _start (self , stream : IGrpcWrapperAsyncIO , init_message : StreamReadMessage .InitRequest ):
@@ -472,11 +487,13 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
472487
473488 self ._started = True
474489 self ._stream = stream
490+ logger .debug ("reader stream %s send init request" , self ._id )
475491
476492 stream .write (StreamReadMessage .FromClient (client_message = init_message ))
477493 init_response = await stream .receive () # type: StreamReadMessage.FromServer
478494 if isinstance (init_response .server_message , StreamReadMessage .InitResponse ):
479495 self ._session_id = init_response .server_message .session_id
496+ logger .debug ("reader stream %s initialized session=%s" , self ._id , self ._session_id )
480497 else :
481498 raise TopicReaderError ("Unexpected message after InitRequest: %s" , init_response )
482499
@@ -615,6 +632,7 @@ async def _handle_background_errors(self):
615632
616633 async def _read_messages_loop (self ):
617634 try :
635+ logger .debug ("reader stream %s start read loop" , self ._id )
618636 self ._stream .write (
619637 StreamReadMessage .FromClient (
620638 client_message = StreamReadMessage .ReadRequest (
@@ -628,6 +646,7 @@ async def _read_messages_loop(self):
628646 _process_response (message .server_status )
629647
630648 if isinstance (message .server_message , StreamReadMessage .ReadResponse ):
649+ logger .debug ("reader stream %s read %s bytes" , self ._id , message .server_message .bytes_size )
631650 self ._on_read_response (message .server_message )
632651
633652 elif isinstance (message .server_message , StreamReadMessage .CommitOffsetResponse ):
@@ -637,18 +656,33 @@ async def _read_messages_loop(self):
637656 message .server_message ,
638657 StreamReadMessage .StartPartitionSessionRequest ,
639658 ):
659+ logger .debug (
660+ "reader stream %s start partition %s" ,
661+ self ._id ,
662+ message .server_message .partition_session .partition_session_id ,
663+ )
640664 await self ._on_start_partition_session (message .server_message )
641665
642666 elif isinstance (
643667 message .server_message ,
644668 StreamReadMessage .StopPartitionSessionRequest ,
645669 ):
670+ logger .debug (
671+ "reader stream %s stop partition %s" ,
672+ self ._id ,
673+ message .server_message .partition_session_id ,
674+ )
646675 self ._on_partition_session_stop (message .server_message )
647676
648677 elif isinstance (
649678 message .server_message ,
650679 StreamReadMessage .EndPartitionSession ,
651680 ):
681+ logger .debug (
682+ "reader stream %s end partition %s" ,
683+ self ._id ,
684+ message .server_message .partition_session_id ,
685+ )
652686 self ._on_end_partition_session (message .server_message )
653687
654688 elif isinstance (message .server_message , UpdateTokenResponse ):
@@ -663,6 +697,7 @@ async def _read_messages_loop(self):
663697
664698 self ._state_changed .set ()
665699 except Exception as e :
700+ logger .debug ("reader stream %s error: %s" , self ._id , e )
666701 self ._set_first_error (e )
667702 return
668703
@@ -825,6 +860,7 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) ->
825860 async def _decode_batches_loop (self ):
826861 while True :
827862 batch = await self ._batches_to_decode .get ()
863+ logger .debug ("reader stream %s decode batch %s messages" , self ._id , len (batch .messages ))
828864 await self ._decode_batch_inplace (batch )
829865 self ._add_batch_to_queue (batch )
830866 self ._state_changed .set ()
@@ -833,9 +869,21 @@ def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
833869 part_sess_id = batch ._partition_session .id
834870 if part_sess_id in self ._message_batches :
835871 self ._message_batches [part_sess_id ]._extend (batch )
872+ logger .debug (
873+ "reader stream %s extend batch partition=%s size=%s" ,
874+ self ._id ,
875+ part_sess_id ,
876+ len (batch .messages ),
877+ )
836878 return
837879
838880 self ._message_batches [part_sess_id ] = batch
881+ logger .debug (
882+ "reader stream %s new batch partition=%s size=%s" ,
883+ self ._id ,
884+ part_sess_id ,
885+ len (batch .messages ),
886+ )
839887
840888 async def _decode_batch_inplace (self , batch ):
841889 if batch ._codec == Codec .CODEC_RAW :
@@ -882,6 +930,7 @@ async def close(self, flush: bool):
882930 return
883931
884932 self ._closed = True
933+ logger .debug ("reader stream %s close" , self ._id )
885934
886935 if flush :
887936 await self .flush ()
@@ -899,3 +948,5 @@ async def close(self, flush: bool):
899948
900949 if self ._background_tasks :
901950 await asyncio .wait (self ._background_tasks )
951+
952+ logger .debug ("reader stream %s was closed" , self ._id )
0 commit comments