@@ -75,6 +75,7 @@ async def __with_retries(self, func: Callable, max_attempts: int):
7575 backoff_seconds = (self ._options .recovery_backoff_ms / 1000 ) if self ._options .recovery else 0
7676
7777 for attempt in range (max_attempts ):
78+ logger .info (f"Attempting retry { attempt } out of { max_attempts } " )
7879 try :
7980 try :
8081 await asyncio .wait_for (func (), timeout = timeout_seconds )
@@ -149,6 +150,7 @@ async def __create_stream(self):
149150 async def _initialize (self ):
150151 # Asynchronous initialization method for ZerobusStream. Must be called before using the stream.
151152 try :
153+ logger .info ("Starting initializing stream" )
152154 max_attempts = self ._options .recovery_retries if self ._options .recovery else 1
153155 await self .__with_retries (self .__create_stream , max_attempts )
154156 await self .__set_state (StreamState .OPENED )
@@ -271,22 +273,29 @@ async def __handle_stream_failed(
271273 err_msg = str (exception ) if exception is not None else "Stream closed unexpectedly!"
272274 self .__stream_failure_info .log_failure (failure_type )
273275
274- if (self .__state == StreamState .OPENED or self .__state == StreamState .FLUSHING ) and not isinstance (
275- exception , NonRetriableException
276- ):
276+ should_recover = (
277+ (self .__state == StreamState .OPENED or self .__state == StreamState .FLUSHING )
278+ and not isinstance (exception , NonRetriableException )
279+ and self ._options .recovery
280+ )
281+
282+ if should_recover :
277283 # Set the state to recovering
278284 # This is to prevent the stream from being closed multiple times
279285 self .__state = StreamState .RECOVERING
280286 recovered = await self .__recover_stream ()
281287 if recovered :
282288 # Stream recovered successfully
283289 return
290+ # Recovery failed
291+ logger .error (f"Stream failed permanently after failed recovery attempt: { err_msg } " )
292+ else :
293+ # Non-recoverable error
294+ logger .error (f"Stream closed due to a non-recoverable error: { err_msg } " )
284295
285296 # Close the stream for new events
286297 await self .__set_state (StreamState .FAILED )
287298 await self .__close (hard_failure = True , err_msg = err_msg )
288-
289- logger .error (f"Stream closed due to an error: { err_msg } " )
290299 finally :
291300 self .__error_handling_in_progress = False
292301
@@ -312,6 +321,7 @@ async def __sender(self):
312321
313322 try :
314323 # 1. CREATE STREAM
324+ logger .info ("Sending CreateIngestStreamRequest to gRPC stream" )
315325 create_stream_request = zerobus_service_pb2 .CreateIngestStreamRequest (
316326 table_name = self ._table_properties .table_name .encode ("utf-8" ),
317327 record_type = self ._options .record_type .value ,
@@ -324,6 +334,7 @@ async def __sender(self):
324334 )
325335
326336 yield zerobus_service_pb2 .EphemeralStreamRequest (create_stream = create_stream_request )
337+ logger .info ("Waiting for CreateIngestStreamResponse" )
327338 await self .__wait_for_stream_to_finish_initialization ()
328339 stream_id = self .stream_id
329340
@@ -414,7 +425,12 @@ async def __sender(self):
414425 except asyncio .CancelledError as e :
415426 exception = e
416427 except grpc .RpcError as e :
417- exception = log_and_get_exception (e )
428+ # Check if this is a CANCELLED error due to intentional stream closure
429+ if self .__state == StreamState .CLOSED and e .code () == grpc .StatusCode .CANCELLED :
430+ # Stream was cancelled during close() - don't log as error
431+ exception = ZerobusException (f"Error happened in sending records: { e } " )
432+ else :
433+ exception = log_and_get_exception (e )
418434 except Exception as e :
419435 logger .error (f"Error happened in sending records: { str (e )} " )
420436 exception = ZerobusException (f"Error happened in sending records: { str (e )} " )
@@ -482,7 +498,12 @@ async def __receiver(self):
482498 except asyncio .CancelledError as e :
483499 exception = e
484500 except grpc .RpcError as e :
485- exception = log_and_get_exception (e )
501+ # Check if this is a CANCELLED error due to intentional stream closure
502+ if self .__state == StreamState .CLOSED and e .code () == grpc .StatusCode .CANCELLED :
503+ # Stream was cancelled during close() - don't log as error
504+ exception = ZerobusException (f"Error happened in receiving records: { e } " )
505+ else :
506+ exception = log_and_get_exception (e )
486507 except Exception as e :
487508 logger .error (f"Error happened in receiving records: { str (e )} " )
488509 exception = ZerobusException (f"Error happened in receiving records: { str (e )} " )
0 commit comments