@@ -953,17 +953,105 @@ async def _serve(
953953 idx += 1
954954 try :
955955 try :
956- await _handle_messages_from_ws (
957- transport_id = transport_id ,
958- get_state = get_state ,
959- get_ws = get_ws ,
960- transition_connecting = transition_connecting ,
961- close_session = close_session ,
962- assert_incoming_seq_bookkeeping = assert_incoming_seq_bookkeeping ,
963- reset_session_close_countdown = reset_session_close_countdown ,
964- get_stream = get_stream ,
965- close_stream = close_stream ,
956+ logging . debug ( "_handle_messages_from_ws started" )
957+ while (
958+ ws := get_ws ()
959+ ) is None or get_state () == SessionState . CONNECTING :
960+ logging . debug ( "_handle_messages_from_ws spinning while connecting" )
961+ await asyncio . sleep ( 1 )
962+ logger . debug (
963+ "%s start handling messages from ws %s" ,
964+ "client" ,
965+ ws . id ,
966966 )
967+ try :
968+ # We should not process messages if the websocket is closed.
969+ while ws := get_ws ():
970+ # decode=False: Avoiding an unnecessary round-trip through str
971+ # Ideally this should be type-ascripted to : bytes, but there
972+ # is no @overrides in `websockets` to hint this.
973+ message = await ws .recv (decode = False )
974+ try :
975+ msg = parse_transport_msg (message )
976+ logger .debug (
977+ "[%s] got a message %r" ,
978+ transport_id ,
979+ msg ,
980+ )
981+
982+ if msg .controlFlags & STREAM_OPEN_BIT != 0 :
983+ raise InvalidMessageException (
984+ "Client should not receive stream open bit"
985+ )
986+
987+ match assert_incoming_seq_bookkeeping (
988+ msg .from_ ,
989+ msg .seq ,
990+ msg .ack ,
991+ ):
992+ case _IgnoreMessage ():
993+ logger .debug (
994+ "Ignoring transport message" ,
995+ exc_info = True ,
996+ )
997+ continue
998+ case True :
999+ pass
1000+ case other :
1001+ assert_never (other )
1002+
1003+ reset_session_close_countdown ()
1004+
1005+ # Shortcut to avoid processing ack packets
1006+ if msg .controlFlags & ACK_BIT != 0 :
1007+ continue
1008+
1009+ stream = get_stream (msg .streamId )
1010+
1011+ if not stream :
1012+ logger .warning (
1013+ "no stream for %s, ignoring message" ,
1014+ msg .streamId ,
1015+ )
1016+ continue
1017+
1018+ if (
1019+ msg .controlFlags & STREAM_CLOSED_BIT != 0
1020+ and msg .payload .get ("type" , None ) == "CLOSE"
1021+ ):
1022+ # close message is not sent to the stream
1023+ pass
1024+ else :
1025+ try :
1026+ await stream .put (msg .payload )
1027+ except ChannelClosed :
1028+ # The client is no longer interested in this stream,
1029+ # just drop the message.
1030+ pass
1031+ except RuntimeError as e :
1032+ raise InvalidMessageException (e ) from e
1033+
1034+ if msg .controlFlags & STREAM_CLOSED_BIT != 0 :
1035+ if stream :
1036+ stream .close ()
1037+ close_stream (msg .streamId )
1038+ except OutOfOrderMessageException :
1039+ logger .exception ("Out of order message, closing connection" )
1040+ await close_session ()
1041+ return
1042+ except InvalidMessageException :
1043+ logger .exception (
1044+ "Got invalid transport message, closing session" ,
1045+ )
1046+ await close_session ()
1047+ return
1048+ except ConnectionClosedOK :
1049+ # Exited normally
1050+ transition_connecting ()
1051+ except ConnectionClosed as e :
1052+ transition_connecting ()
1053+ raise e
1054+ logging .debug ("_handle_messages_from_ws exiting" )
9671055 except ConnectionClosed :
9681056 # Set ourselves to closed as soon as we get the signal
9691057 await transition_closed ()
@@ -988,99 +1076,3 @@ async def _serve(
9881076 )
9891077 raise unhandled
9901078 logging .debug (f"_serve exiting normally after { idx } loops" )
991-
992-
993- async def _handle_messages_from_ws (
994- transport_id : str ,
995- get_state : Callable [[], SessionState ],
996- get_ws : Callable [[], ClientConnection | None ],
997- transition_connecting : Callable [[], Awaitable [None ]],
998- close_session : Callable [[], Awaitable [None ]],
999- assert_incoming_seq_bookkeeping : Callable [
1000- [str , int , int ], Literal [True ] | _IgnoreMessage
1001- ], # noqa: E501
1002- reset_session_close_countdown : Callable [[], None ],
1003- get_stream : Callable [[str ], Channel [Any ] | None ],
1004- close_stream : Callable [[str ], None ],
1005- ) -> None :
1006- logging .debug ("_handle_messages_from_ws started" )
1007- while (ws := get_ws ()) is None or get_state () == SessionState .CONNECTING :
1008- logging .debug ("_handle_messages_from_ws spinning while connecting" )
1009- await asyncio .sleep (1 )
1010- logger .debug (
1011- "%s start handling messages from ws %s" ,
1012- "client" ,
1013- ws .id ,
1014- )
1015- try :
1016- # We should not process messages if the websocket is closed.
1017- while ws := get_ws ():
1018- # decode=False: Avoiding an unnecessary round-trip through str
1019- # Ideally this should be type-ascripted to : bytes, but there is no
1020- # @overrides in `websockets` to hint this.
1021- message = await ws .recv (decode = False )
1022- try :
1023- msg = parse_transport_msg (message )
1024- logger .debug ("[%s] got a message %r" , transport_id , msg )
1025-
1026- if msg .controlFlags & STREAM_OPEN_BIT != 0 :
1027- raise InvalidMessageException (
1028- "Client should not receive stream open bit"
1029- )
1030-
1031- match assert_incoming_seq_bookkeeping (msg .from_ , msg .seq , msg .ack ):
1032- case _IgnoreMessage ():
1033- logger .debug ("Ignoring transport message" , exc_info = True )
1034- continue
1035- case True :
1036- pass
1037- case other :
1038- assert_never (other )
1039-
1040- reset_session_close_countdown ()
1041-
1042- # Shortcut to avoid processing ack packets
1043- if msg .controlFlags & ACK_BIT != 0 :
1044- continue
1045-
1046- stream = get_stream (msg .streamId )
1047-
1048- if not stream :
1049- logger .warning ("no stream for %s, ignoring message" , msg .streamId )
1050- continue
1051-
1052- if (
1053- msg .controlFlags & STREAM_CLOSED_BIT != 0
1054- and msg .payload .get ("type" , None ) == "CLOSE"
1055- ):
1056- # close message is not sent to the stream
1057- pass
1058- else :
1059- try :
1060- await stream .put (msg .payload )
1061- except ChannelClosed :
1062- # The client is no longer interested in this stream,
1063- # just drop the message.
1064- pass
1065- except RuntimeError as e :
1066- raise InvalidMessageException (e ) from e
1067-
1068- if msg .controlFlags & STREAM_CLOSED_BIT != 0 :
1069- if stream :
1070- stream .close ()
1071- close_stream (msg .streamId )
1072- except OutOfOrderMessageException :
1073- logger .exception ("Out of order message, closing connection" )
1074- await close_session ()
1075- return
1076- except InvalidMessageException :
1077- logger .exception ("Got invalid transport message, closing session" )
1078- await close_session ()
1079- return
1080- except ConnectionClosedOK :
1081- # Exited normally
1082- transition_connecting ()
1083- except ConnectionClosed as e :
1084- transition_connecting ()
1085- raise e
1086- logging .debug ("_handle_messages_from_ws exiting" )
0 commit comments