@@ -121,6 +121,7 @@ async def receive_batch(
121121
122122 use asyncio.wait_for for wait with timeout.
123123 """
124+ logger .debug ("receive_batch max_messages=%s" , max_messages )
124125 await self ._reconnector .wait_message ()
125126 return self ._reconnector .receive_batch_nowait (
126127 max_messages = max_messages ,
@@ -137,6 +138,7 @@ async def receive_batch_with_tx(
137138
138139 use asyncio.wait_for for wait with timeout.
139140 """
141+ logger .debug ("receive_batch_with_tx tx=%s max_messages=%s" , tx , max_messages )
140142 await self ._reconnector .wait_message ()
141143 return self ._reconnector .receive_batch_with_tx_nowait (
142144 tx = tx ,
@@ -149,6 +151,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
149151
150152 use asyncio.wait_for for wait with timeout.
151153 """
154+ logger .debug ("receive_message" )
152155 await self ._reconnector .wait_message ()
153156 return self ._reconnector .receive_message_nowait ()
154157
@@ -159,6 +162,7 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
159162 For the method no way check the commit result
160163 (for example if lost connection - commits will not re-send and committed messages will receive again).
161164 """
165+ logger .debug ("commit message or batch" )
162166 if self ._settings .consumer is None :
163167 raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
164168
@@ -177,6 +181,7 @@ async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, dat
177181 before receive commit ack. Message may be acked or not (if not - it will send in other read session,
178182 to this or other reader).
179183 """
184+ logger .debug ("commit_with_ack message or batch" )
180185 if self ._settings .consumer is None :
181186 raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
182187
@@ -187,8 +192,10 @@ async def close(self, flush: bool = True):
187192 if self ._closed :
188193 raise TopicReaderClosedError ()
189194
195+ logger .debug ("Close topic reader" )
190196 self ._closed = True
191197 await self ._reconnector .close (flush )
198+ logger .debug ("Topic reader was closed" )
192199
193200 @property
194201 def read_session_id (self ) -> Optional [str ]:
@@ -219,6 +226,7 @@ def __init__(
219226 self ._driver = driver
220227 self ._loop = loop if loop is not None else asyncio .get_running_loop ()
221228 self ._background_tasks = set ()
229+ logger .debug ("init reader reconnector id=%s" , self ._id )
222230
223231 self ._state_changed = asyncio .Event ()
224232 self ._stream_reader = None
@@ -231,13 +239,18 @@ async def _connection_loop(self):
231239 attempt = 0
232240 while True :
233241 try :
242+ logger .debug ("reader %s connect attempt %s" , self ._id , attempt )
234243 self ._stream_reader = await ReaderStream .create (self ._id , self ._driver , self ._settings )
244+ logger .debug (
245+ "reader %s connected stream %s" , self ._id , self ._stream_reader ._id
246+ )
235247 attempt = 0
236248 self ._state_changed .set ()
237249 await self ._stream_reader .wait_error ()
238250 except BaseException as err :
239251 retry_info = check_retriable_error (err , self ._settings ._retry_settings (), attempt )
240252 if not retry_info .is_retriable :
253+ logger .debug ("reader %s stop connection loop due to %s" , self ._id , err )
241254 self ._set_first_error (err )
242255 return
243256
@@ -358,6 +371,7 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
358371 return self ._stream_reader .commit (batch )
359372
360373 async def close (self , flush : bool ):
374+ logger .debug ("reader reconnector %s close" , self ._id )
361375 if self ._stream_reader :
362376 await self ._stream_reader .close (flush )
363377 for task in self ._background_tasks :
@@ -447,6 +461,10 @@ def __init__(
447461
448462 self ._settings = settings
449463
464+ logger .debug (
465+ "created ReaderStream id=%s reconnector=%s" , self ._id , self ._reader_reconnector_id
466+ )
467+
450468 @staticmethod
451469 async def create (
452470 reader_reconnector_id : int ,
@@ -464,6 +482,9 @@ async def create(
464482 get_token_function = creds .get_auth_token if creds else None ,
465483 )
466484 await reader ._start (stream , settings ._init_message ())
485+ logger .debug (
486+ "reader stream %s started session=%s" , reader ._id , reader ._session_id
487+ )
467488 return reader
468489
469490 async def _start (self , stream : IGrpcWrapperAsyncIO , init_message : StreamReadMessage .InitRequest ):
@@ -472,11 +493,17 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
472493
473494 self ._started = True
474495 self ._stream = stream
496+ logger .debug (
497+ "reader stream %s send init request" , self ._id
498+ )
475499
476500 stream .write (StreamReadMessage .FromClient (client_message = init_message ))
477501 init_response = await stream .receive () # type: StreamReadMessage.FromServer
478502 if isinstance (init_response .server_message , StreamReadMessage .InitResponse ):
479503 self ._session_id = init_response .server_message .session_id
504+ logger .debug (
505+ "reader stream %s initialized session=%s" , self ._id , self ._session_id
506+ )
480507 else :
481508 raise TopicReaderError ("Unexpected message after InitRequest: %s" , init_response )
482509
@@ -615,6 +642,7 @@ async def _handle_background_errors(self):
615642
616643 async def _read_messages_loop (self ):
617644 try :
645+ logger .debug ("reader stream %s start read loop" , self ._id )
618646 self ._stream .write (
619647 StreamReadMessage .FromClient (
620648 client_message = StreamReadMessage .ReadRequest (
@@ -628,6 +656,9 @@ async def _read_messages_loop(self):
628656 _process_response (message .server_status )
629657
630658 if isinstance (message .server_message , StreamReadMessage .ReadResponse ):
659+ logger .debug (
660+ "reader stream %s read %s bytes" , self ._id , message .server_message .bytes_size
661+ )
631662 self ._on_read_response (message .server_message )
632663
633664 elif isinstance (message .server_message , StreamReadMessage .CommitOffsetResponse ):
@@ -637,18 +668,33 @@ async def _read_messages_loop(self):
637668 message .server_message ,
638669 StreamReadMessage .StartPartitionSessionRequest ,
639670 ):
671+ logger .debug (
672+ "reader stream %s start partition %s" ,
673+ self ._id ,
674+ message .server_message .partition_session .partition_session_id ,
675+ )
640676 await self ._on_start_partition_session (message .server_message )
641677
642678 elif isinstance (
643679 message .server_message ,
644680 StreamReadMessage .StopPartitionSessionRequest ,
645681 ):
682+ logger .debug (
683+ "reader stream %s stop partition %s" ,
684+ self ._id ,
685+ message .server_message .partition_session_id ,
686+ )
646687 self ._on_partition_session_stop (message .server_message )
647688
648689 elif isinstance (
649690 message .server_message ,
650691 StreamReadMessage .EndPartitionSession ,
651692 ):
693+ logger .debug (
694+ "reader stream %s end partition %s" ,
695+ self ._id ,
696+ message .server_message .partition_session_id ,
697+ )
652698 self ._on_end_partition_session (message .server_message )
653699
654700 elif isinstance (message .server_message , UpdateTokenResponse ):
@@ -663,6 +709,7 @@ async def _read_messages_loop(self):
663709
664710 self ._state_changed .set ()
665711 except Exception as e :
712+ logger .debug ("reader stream %s error: %s" , self ._id , e )
666713 self ._set_first_error (e )
667714 return
668715
@@ -825,6 +872,9 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) ->
825872 async def _decode_batches_loop (self ):
826873 while True :
827874 batch = await self ._batches_to_decode .get ()
875+ logger .debug (
876+ "reader stream %s decode batch %s messages" , self ._id , len (batch .messages )
877+ )
828878 await self ._decode_batch_inplace (batch )
829879 self ._add_batch_to_queue (batch )
830880 self ._state_changed .set ()
@@ -833,9 +883,21 @@ def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
833883 part_sess_id = batch ._partition_session .id
834884 if part_sess_id in self ._message_batches :
835885 self ._message_batches [part_sess_id ]._extend (batch )
886+ logger .debug (
887+ "reader stream %s extend batch partition=%s size=%s" ,
888+ self ._id ,
889+ part_sess_id ,
890+ len (batch .messages ),
891+ )
836892 return
837893
838894 self ._message_batches [part_sess_id ] = batch
895+ logger .debug (
896+ "reader stream %s new batch partition=%s size=%s" ,
897+ self ._id ,
898+ part_sess_id ,
899+ len (batch .messages ),
900+ )
839901
840902 async def _decode_batch_inplace (self , batch ):
841903 if batch ._codec == Codec .CODEC_RAW :
@@ -882,6 +944,7 @@ async def close(self, flush: bool):
882944 return
883945
884946 self ._closed = True
947+ logger .debug ("reader stream %s close" , self ._id )
885948
886949 if flush :
887950 await self .flush ()
@@ -899,3 +962,5 @@ async def close(self, flush: bool):
899962
900963 if self ._background_tasks :
901964 await asyncio .wait (self ._background_tasks )
965+
966+ logger .debug ("reader stream %s was closed" , self ._id )
0 commit comments