@@ -1030,20 +1030,24 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
1030
1030
self .requests -= 1
1031
1031
self .size_cond .notify ()
1032
1032
1033
- # Assume all non dns/tcp/timeout errors mean the server rejected the connection due to overload.
1034
- # if not errorDuringDnsTcp and not timeoutError:
1035
- # error._add_error_label("SystemOverloadedError")
1036
-
1037
- def _handle_connection_error (self , error : Exception , phase : str ) -> None :
1033
+ def _handle_connection_error (self , error : BaseException , phase : str ) -> None :
1038
1034
# Handle system overload condition for non-sdam pools.
1039
- # Look for an AutoReconnect error raise from a ConnectionResetError with
1040
- # errno == errno.ECONNRESET. If found, set backoff and add error labels.
1035
+ # Look for an AutoReconnect error raised from a ConnectionResetError with
1036
+ # errno == errno.ECONNRESET or raised from an OSError that we've created due to
1037
+ # a closed connection.
1038
+ # If found, set backoff and add error labels.
1041
1039
if self .is_sdam or type (error ) != AutoReconnect or not len (error .errors ):
1042
1040
return
1043
- if not isinstance (error .errors [0 ], ConnectionResetError ):
1044
- return
1045
- if error .errors [0 ].errno != errno .ECONNRESET :
1046
- return
1041
+ if hasattr (error .errors , "values" ):
1042
+ root_err = next (iter (error .errors .values ()))
1043
+ else :
1044
+ root_err = error .errors [0 ]
1045
+ if isinstance (root_err , ConnectionResetError ):
1046
+ if root_err .errno != errno .ECONNRESET :
1047
+ return
1048
+ elif isinstance (root_err , OSError ):
1049
+ if str (root_err ) != "connection closed" :
1050
+ return
1047
1051
self ._backoff += 1
1048
1052
error ._add_error_label ("SystemOverloadedError" )
1049
1053
error ._add_error_label ("RetryableError" )
0 commit comments