@@ -384,17 +384,21 @@ async def _connection_loop(self):
384384 await stream_writer .close ()
385385 for task in tasks :
386386 task .cancel ()
387- await asyncio .wait (tasks )
387+ if tasks :
388+ await asyncio .wait (tasks )
388389
389390 async def _encode_loop (self ):
390- while True :
391- messages = await self ._messages_for_encode .get ()
392- while not self ._messages_for_encode .empty ():
393- messages .extend (self ._messages_for_encode .get_nowait ())
391+ try :
392+ while True :
393+ messages = await self ._messages_for_encode .get ()
394+ while not self ._messages_for_encode .empty ():
395+ messages .extend (self ._messages_for_encode .get_nowait ())
394396
395- batch_codec = await self ._codec_selector (messages )
396- await self ._encode_data_inplace (batch_codec , messages )
397- self ._add_messages_to_send_queue (messages )
397+ batch_codec = await self ._codec_selector (messages )
398+ await self ._encode_data_inplace (batch_codec , messages )
399+ self ._add_messages_to_send_queue (messages )
400+ except BaseException as err :
401+ self ._stop (err )
398402
399403 async def _encode_data_inplace (self , codec : PublicCodec , messages : List [InternalMessage ]):
400404 if codec == PublicCodec .RAW :
@@ -531,10 +535,9 @@ async def _send_loop(self, writer: "WriterAsyncIOStream"):
531535 writer .write ([m ])
532536 except Exception as e :
533537 self ._stop (e )
534- finally :
535- pass
538+ raise
536539
537- def _stop (self , reason : Exception ):
540+ def _stop (self , reason : BaseException ):
538541 if reason is None :
539542 raise Exception ("writer stop reason can not be None" )
540543
@@ -554,7 +557,7 @@ async def flush(self):
554557 return
555558
556559 # wait last message
557- await asyncio .wait (( self ._messages_future [ - 1 ],) )
560+ await asyncio .wait (self ._messages_future )
558561
559562
560563class WriterAsyncIOStream :
@@ -647,7 +650,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes
647650 @staticmethod
648651 def _ensure_ok (message : WriterMessagesFromServerToClient ):
649652 if not message .status .is_success ():
650- raise TopicWriterError ("status error from server in writer: %s" , message .status )
653+ raise TopicWriterError (f "status error from server in writer: { message .status } " )
651654
652655 def write (self , messages : List [InternalMessage ]):
653656 if self ._closed :
0 commit comments