@@ -678,7 +678,13 @@ def _recv_cb(self, stream_id=0):
678678 # Concurrency
679679 #
680680 # Ignore this read if some other thread has recently read data from
681- # from the requested stream
681+ # from the requested stream.
682+ #
683+ # The lock here looks broad, but is need to ensure correct behavior
684+ # when there are multiple readers of the same stream. It re-acquired
685+ # in the calls to self._single_read.
686+ #
687+ # i/o occurs while the lock is held; waiting threads will see a delay.
682688 with self ._read_lock :
683689 log .debug ('recv for stream %d with %s already present' ,
684690 stream_id ,
@@ -687,36 +693,36 @@ def _recv_cb(self, stream_id=0):
687693 self .recent_recv_streams .discard (stream_id )
688694 return
689695
690- # TODO: Re-evaluate this.
691- self ._single_read ()
692- count = 9
693- retry_wait = 0.05 # can improve responsiveness to delay the retry
694-
695- while count and self ._sock is not None and self ._sock .can_read :
696- # If the connection has been closed, bail out, but retry
697- # on transient errors.
698- try :
699- self ._single_read ()
700- except ConnectionResetError :
701- break
702- except ssl .SSLError as e : # pragma: no cover
703- # these are transient errors that can occur while reading from
704- # ssl connections.
705- if e .args [0 ] in TRANSIENT_SSL_ERRORS :
706- continue
707- else :
708- raise
709- except socket .error as e : # pragma: no cover
710- if e .errno in (errno .EINTR , errno .EAGAIN ):
711- # if 'interrupted' or 'try again', continue
712- time .sleep (retry_wait )
713- continue
714- elif e .errno == errno .ECONNRESET :
696+ # TODO: Re-evaluate this.
697+ self ._single_read ()
698+ count = 9
699+ retry_wait = 0.05 # can improve responsiveness to delay the retry
700+
701+ while count and self ._sock is not None and self ._sock .can_read :
702+ # If the connection has been closed, bail out, but retry
703+ # on transient errors.
704+ try :
705+ self ._single_read ()
706+ except ConnectionResetError :
715707 break
716- else :
717- raise
708+ except ssl .SSLError as e : # pragma: no cover
709+ # these are transient errors that can occur while reading
710+ # from ssl connections.
711+ if e .args [0 ] in TRANSIENT_SSL_ERRORS :
712+ continue
713+ else :
714+ raise
715+ except socket .error as e : # pragma: no cover
716+ if e .errno in (errno .EINTR , errno .EAGAIN ):
717+ # if 'interrupted' or 'try again', continue
718+ time .sleep (retry_wait )
719+ continue
720+ elif e .errno == errno .ECONNRESET :
721+ break
722+ else :
723+ raise
718724
719- count -= 1
725+ count -= 1
720726
721727 def _send_rst_frame (self , stream_id , error_code ):
722728 """
@@ -734,7 +740,7 @@ def _send_rst_frame(self, stream_id, error_code):
734740
735741 # Concurrency
736742 #
737- # Hold _lock; the stram storage is being update . No i/o occurs, any
743+ # Hold _lock; the stream storage is being updated . No i/o occurs, any
738744 # delay is proportional to the number of waiting threads.
739745 with self ._lock :
740746 try :
0 commit comments