@@ -331,6 +331,19 @@ async def _send_message(
331331 service_name ,
332332 procedure_name ,
333333 )
334+ # Ensure the buffer isn't full before we enqueue
335+ await self ._space_available .wait ()
336+
337+ # Before we append, do an important check
338+ if self ._state in TerminalStates :
339+ # session is closing / closed, raise
340+ raise SessionClosedRiverServiceException (
341+ "river session is closed, dropping message" ,
342+ service_name ,
343+ procedure_name ,
344+ )
345+
346+ # Begin critical section: Avoid any await between here and _send_buffer.append
334347 msg = TransportMessage (
335348 streamId = stream_id ,
336349 id = nanoid .generate (),
@@ -348,18 +361,6 @@ async def _send_message(
348361 with use_span (span ):
349362 trace_propagator .inject (msg , None , trace_setter )
350363
351- # Ensure the buffer isn't full before we enqueue
352- await self ._space_available .wait ()
353-
354- # Before we append, do an important check
355- if self ._state in TerminalStates :
356- # session is closing / closed, raise
357- raise SessionClosedRiverServiceException (
358- "river session is closed, dropping message" ,
359- service_name ,
360- procedure_name ,
361- )
362-
363364 self ._send_buffer .append (msg )
364365
365366 # If the buffer is now full, reset the block
0 commit comments