11from decimal import Decimal
2+ import errno
23import logging
34import math
45import time
1516
1617from databricks .sql .thrift_api .TCLIService import TCLIService , ttypes
1718from databricks .sql import *
19+ from databricks .sql .thrift_api .TCLIService .TCLIService import (
20+ Client as TCLIServiceClient ,
21+ )
1822from databricks .sql .utils import (
1923 ArrowQueue ,
2024 ExecuteResponse ,
3943 "_retry_delay_max" : (float , 60 , 5 , 3600 ),
4044 "_retry_stop_after_attempts_count" : (int , 30 , 1 , 60 ),
4145 "_retry_stop_after_attempts_duration" : (float , 900 , 1 , 86400 ),
46+ "_retry_delay_default" : (float , 5 , 1 , 60 ),
4247}
4348
4449
@@ -71,6 +76,8 @@ def __init__(
7176 # _retry_delay_min (default: 1)
7277 # _retry_delay_max (default: 60)
7378 # {min,max} pre-retry delay bounds
79+ # _retry_delay_default (default: 5)
80+ # Only used when GetOperationStatus fails due to a TCP/OS Error.
7481 # _retry_stop_after_attempts_count (default: 30)
7582 # total max attempts during retry sequence
7683 # _retry_stop_after_attempts_duration (default: 900)
@@ -158,7 +165,7 @@ def _initialize_retry_args(self, kwargs):
158165 "retry parameter: {} given_or_default {}" .format (key , given_or_default )
159166 )
160167 if bound != given_or_default :
161- logger .warn (
168+ logger .warning (
162169 "Override out of policy retry parameter: "
163170 + "{} given {}, restricted to {}" .format (
164171 key , given_or_default , bound
@@ -243,7 +250,9 @@ def _handle_request_error(self, error_info, attempt, elapsed):
243250 # FUTURE: Consider moving to https://github.com/litl/backoff or
244251 # https://github.com/jd/tenacity for retry logic.
245252 def make_request (self , method , request ):
246- """Execute given request, attempting retries when receiving HTTP 429/503.
253+ """Execute given request, attempting retries when
254+ 1. Receiving HTTP 429/503 from server
255+ 2. OSError is raised during a GetOperationStatus
247256
248257 For delay between attempts, honor the given Retry-After header, but with bounds.
249258 Use lower bound of expontial-backoff based on _retry_delay_min,
@@ -260,17 +269,21 @@ def make_request(self, method, request):
260269 def get_elapsed ():
261270 return time .time () - t0
262271
272+ def bound_retry_delay (attempt , proposed_delay ):
273+ """bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]"""
274+ delay = int (proposed_delay )
275+ delay = max (delay , self ._retry_delay_min * math .pow (1.5 , attempt - 1 ))
276+ delay = min (delay , self ._retry_delay_max )
277+ return delay
278+
263279 def extract_retry_delay (attempt ):
264280 # encapsulate retry checks, returns None || delay-in-secs
265281 # Retry IFF 429/503 code + Retry-After header set
266282 http_code = getattr (self ._transport , "code" , None )
267283 retry_after = getattr (self ._transport , "headers" , {}).get ("Retry-After" )
268284 if http_code in [429 , 503 ] and retry_after :
269285 # bound delay (seconds) by [min_delay*1.5^(attempt-1), max_delay]
270- delay = int (retry_after )
271- delay = max (delay , self ._retry_delay_min * math .pow (1.5 , attempt - 1 ))
272- delay = min (delay , self ._retry_delay_max )
273- return delay
286+ return bound_retry_delay (attempt , int (retry_after ))
274287 return None
275288
276289 def attempt_request (attempt ):
@@ -279,24 +292,57 @@ def attempt_request(attempt):
279292 # - non-None method_return -> success, return and be done
280293 # - non-None retry_delay -> sleep delay before retry
281294 # - error, error_message always set when available
295+
296+ error , error_message , retry_delay = None , None , None
282297 try :
283298 logger .debug ("Sending request: {}" .format (request ))
284299 response = method (request )
285300 logger .debug ("Received response: {}" .format (response ))
286301 return response
287- except Exception as error :
302+ except OSError as err :
303+ error = err
304+ error_message = str (err )
305+
306+ gos_name = TCLIServiceClient .GetOperationStatus .__name__
307+ if method .__name__ == gos_name :
308+ retry_delay = bound_retry_delay (attempt , self ._retry_delay_default )
309+
310+ # fmt: off
311+ # The built-in errno package encapsulates OSError codes, which are OS-specific.
312+ # log.info for errors we believe are not unusual or unexpected. log.warn for
313+ # for others like EEXIST, EBADF, ERANGE which are not expected in this context.
314+ #
315+ # I manually tested this retry behaviour using mitmweb and confirmed that
316+ # GetOperationStatus requests are retried when I forced network connection
317+ # interruptions / timeouts / reconnects. See #24 for more info.
318+ # | Debian | Darwin |
319+ info_errs = [ # |--------|--------|
320+ errno .ESHUTDOWN , # | 32 | 32 |
321+ errno .EAFNOSUPPORT , # | 97 | 47 |
322+ errno .ECONNRESET , # | 104 | 54 |
323+ errno .ETIMEDOUT , # | 110 | 60 |
324+ ]
325+
326+ # fmt: on
327+ log_string = f"{ gos_name } failed with code { err .errno } and will attempt to retry"
328+ if err .errno in info_errs :
329+ logger .info (log_string )
330+ else :
331+ logger .warning (log_string )
332+ except Exception as err :
333+ error = err
288334 retry_delay = extract_retry_delay (attempt )
289335 error_message = ThriftBackend ._extract_error_message_from_headers (
290336 getattr (self ._transport , "headers" , {})
291337 )
292- return RequestErrorInfo (
293- error = error ,
294- error_message = error_message ,
295- retry_delay = retry_delay ,
296- http_code = getattr (self ._transport , "code" , None ),
297- method = method .__name__ ,
298- request = request ,
299- )
338+ return RequestErrorInfo (
339+ error = error ,
340+ error_message = error_message ,
341+ retry_delay = retry_delay ,
342+ http_code = getattr (self ._transport , "code" , None ),
343+ method = method .__name__ ,
344+ request = request ,
345+ )
300346
301347 # The real work:
302348 # - for each available attempt:
0 commit comments