@@ -69,7 +69,6 @@ class Session:
6969
7070 # book keeping
7171 _seq_manager : SeqManager
72- _msg_lock : asyncio .Lock
7372 _buffer : MessageBuffer
7473 _task_manager : BackgroundTaskManager
7574
@@ -105,7 +104,6 @@ def __init__(
105104
106105 # book keeping
107106 self ._seq_manager = SeqManager ()
108- self ._msg_lock = asyncio .Lock ()
109107 self ._buffer = MessageBuffer (self ._transport_options .buffer_size )
110108 self ._task_manager = BackgroundTaskManager ()
111109
@@ -229,6 +227,8 @@ async def send_message(
229227 # if the session is not active, we should not do anything
230228 if self ._state != SessionState .ACTIVE :
231229 return
230+ await self ._buffer .has_capacity ()
231+ # Start of critical section. No await between here and buffer.put()!
232232 msg = TransportMessage (
233233 streamId = stream_id ,
234234 id = nanoid .generate (),
@@ -245,23 +245,19 @@ async def send_message(
245245 with use_span (span ):
246246 trace_propagator .inject (msg , None , trace_setter )
247247 try :
248- # We need this lock to ensure the buffer order and message sending order
249- # are the same.
250- async with self ._msg_lock :
251- await self ._buffer .has_capacity ()
252- try :
253- self ._buffer .put (msg )
254- except MessageBufferClosedError :
255- # The session is closed and is no longer accepting new messages.
248+ try :
249+ self ._buffer .put (msg )
250+ except MessageBufferClosedError :
251+ # The session is closed and is no longer accepting new messages.
252+ return
253+ async with self ._ws_lock :
254+ if not self ._ws_wrapper .is_open ():
255+ # If the websocket is closed, we should not send the message
256+ # and wait for the retry from the buffer.
256257 return
257- async with self ._ws_lock :
258- if not self ._ws_wrapper .is_open ():
259- # If the websocket is closed, we should not send the message
260- # and wait for the retry from the buffer.
261- return
262- await send_transport_message (
263- msg , self ._ws_wrapper .ws , self ._begin_close_session_countdown
264- )
258+ await send_transport_message (
259+ msg , self ._ws_wrapper .ws , self ._begin_close_session_countdown
260+ )
265261 except WebsocketClosedException as e :
266262 logger .debug (
267263 "Connection closed while sending message %r, waiting for "
0 commit comments