@@ -656,41 +656,36 @@ async def send_rpc[R, A](
656656 )
657657 # Handle potential errors during communication
658658 try :
659+ async with asyncio .timeout (timeout .total_seconds ()):
660+ response = await output .get ()
661+ except asyncio .TimeoutError as e :
662+ await self .send_message (
663+ stream_id = stream_id ,
664+ control_flags = STREAM_CANCEL_BIT ,
665+ payload = {"type" : "CANCEL" },
666+ service_name = service_name ,
667+ procedure_name = procedure_name ,
668+ span = span ,
669+ )
670+ raise RiverException (ERROR_CODE_CANCEL , str (e )) from e
671+ except ChannelClosed as e :
672+ raise RiverServiceException (
673+ ERROR_CODE_STREAM_CLOSED ,
674+ "Stream closed before response" ,
675+ service_name ,
676+ procedure_name ,
677+ ) from e
678+ except RuntimeError as e :
679+ raise RiverException (ERROR_CODE_STREAM_CLOSED , str (e )) from e
680+ if not response .get ("ok" , False ):
659681 try :
660- async with asyncio .timeout (timeout .total_seconds ()):
661- response = await output .get ()
662- except asyncio .TimeoutError as e :
663- await self .send_message (
664- stream_id = stream_id ,
665- control_flags = STREAM_CANCEL_BIT ,
666- payload = {"type" : "CANCEL" },
667- service_name = service_name ,
668- procedure_name = procedure_name ,
669- span = span ,
670- )
671- raise RiverException (ERROR_CODE_CANCEL , str (e )) from e
672- except ChannelClosed as e :
673- raise RiverServiceException (
674- ERROR_CODE_STREAM_CLOSED ,
675- "Stream closed before response" ,
676- service_name ,
677- procedure_name ,
678- ) from e
679- except RuntimeError as e :
680- raise RiverException (ERROR_CODE_STREAM_CLOSED , str (e )) from e
681- if not response .get ("ok" , False ):
682- try :
683- error = error_deserializer (response ["payload" ])
684- except Exception as e :
685- raise RiverException ("error_deserializer" , str (e )) from e
686- raise exception_from_message (error .code )(
687- error .code , error .message , service_name , procedure_name
688- )
689- return response_deserializer (response ["payload" ])
690- except RiverException as e :
691- raise e
692- except Exception as e :
693- raise e
682+ error = error_deserializer (response ["payload" ])
683+ except Exception as e :
684+ raise RiverException ("error_deserializer" , str (e )) from e
685+ raise exception_from_message (error .code )(
686+ error .code , error .message , service_name , procedure_name
687+ )
688+ return response_deserializer (response ["payload" ])
694689
695690 async def send_upload [I , R , A ](
696691 self ,
@@ -751,31 +746,26 @@ async def send_upload[I, R, A](
751746 # Handle potential errors during communication
752747 # TODO: throw a error when the transport is hard closed
753748 try :
749+ response = await output .get ()
750+ except ChannelClosed as e :
751+ raise RiverServiceException (
752+ ERROR_CODE_STREAM_CLOSED ,
753+ "Stream closed before response" ,
754+ service_name ,
755+ procedure_name ,
756+ ) from e
757+ except RuntimeError as e :
758+ raise RiverException (ERROR_CODE_STREAM_CLOSED , str (e )) from e
759+ if not response .get ("ok" , False ):
754760 try :
755- response = await output .get ()
756- except ChannelClosed as e :
757- raise RiverServiceException (
758- ERROR_CODE_STREAM_CLOSED ,
759- "Stream closed before response" ,
760- service_name ,
761- procedure_name ,
762- ) from e
763- except RuntimeError as e :
764- raise RiverException (ERROR_CODE_STREAM_CLOSED , str (e )) from e
765- if not response .get ("ok" , False ):
766- try :
767- error = error_deserializer (response ["payload" ])
768- except Exception as e :
769- raise RiverException ("error_deserializer" , str (e )) from e
770- raise exception_from_message (error .code )(
771- error .code , error .message , service_name , procedure_name
772- )
761+ error = error_deserializer (response ["payload" ])
762+ except Exception as e :
763+ raise RiverException ("error_deserializer" , str (e )) from e
764+ raise exception_from_message (error .code )(
765+ error .code , error .message , service_name , procedure_name
766+ )
773767
774- return response_deserializer (response ["payload" ])
775- except RiverException as e :
776- raise e
777- except Exception as e :
778- raise e
768+ return response_deserializer (response ["payload" ])
779769
780770 async def send_subscription [R , E , A ](
781771 self ,
0 commit comments