2828from .._property import _cached_property
2929from ..config import Config
3030from ..errors import AlreadyExists , NotFound
31- from ..errors .customizer import _RetryAfterCustomizer
3231from ..errors .mapper import _error_mapper
3332from ..retries import retried
3433from ..service import files
@@ -577,6 +576,27 @@ def __repr__(self) -> str:
577576 return f"<_DbfsPath { self ._path } >"
578577
579578
579+ class _RetryableException (Exception ):
580+ """Base class for retryable exceptions in DBFS operations."""
581+
582+ def __init__ (self , message : str , http_status_code : int ):
583+ super ().__init__ ()
584+ self .message = message
585+ self .http_status_code = http_status_code
586+
587+ def __str__ (self ) -> str :
588+ return f"{ self .message } (HTTP Status: { self .http_status_code } )"
589+
590+ @staticmethod
591+ def make_error (response : requests .Response ) -> "_RetryableException" :
592+ """Map the response to a retryable exception."""
593+
594+ return _RetryableException (
595+ message = response .text ,
596+ http_status_code = response .status_code ,
597+ )
598+
599+
580600class DbfsExt (files .DbfsAPI ):
581601 __doc__ = files .DbfsAPI .__doc__
582602
@@ -885,7 +905,7 @@ def perform():
885905 timeout = self ._config .multipart_upload_single_chunk_upload_timeout_seconds ,
886906 )
887907
888- upload_response = self ._retry_idempotent_operation (perform , rewind )
908+ upload_response = self ._retry_cloud_idempotent_operation (perform , rewind )
889909
890910 if upload_response .status_code in (200 , 201 ):
891911 # Chunk upload successful
@@ -1097,7 +1117,7 @@ def perform():
10971117 )
10981118
10991119 try :
1100- return self ._retry_idempotent_operation (perform )
1120+ return self ._retry_cloud_idempotent_operation (perform )
11011121 except RequestException :
11021122 _LOG .warning ("Failed to retrieve upload status" )
11031123 return None
@@ -1116,7 +1136,7 @@ def perform():
11161136 # a 503 or 500 response, then you need to resume the interrupted upload from where it left off.
11171137
11181138 # Let's follow that for all potentially retryable status codes.
1119- # Together with the catch block below we replicate the logic in _retry_idempotent_operation ().
1139+ # Together with the catch block below we replicate the logic in _retry_databricks_idempotent_operation ().
11201140 if upload_response .status_code in self ._RETRYABLE_STATUS_CODES :
11211141 if retry_count < self ._config .multipart_upload_max_retries :
11221142 retry_count += 1
@@ -1243,7 +1263,7 @@ def perform():
12431263 timeout = self ._config .multipart_upload_single_chunk_upload_timeout_seconds ,
12441264 )
12451265
1246- abort_response = self ._retry_idempotent_operation (perform )
1266+ abort_response = self ._retry_cloud_idempotent_operation (perform )
12471267
12481268 if abort_response .status_code not in (200 , 201 ):
12491269 raise ValueError (abort_response )
@@ -1265,7 +1285,7 @@ def perform():
12651285 timeout = self ._config .multipart_upload_single_chunk_upload_timeout_seconds ,
12661286 )
12671287
1268- abort_response = self ._retry_idempotent_operation (perform )
1288+ abort_response = self ._retry_cloud_idempotent_operation (perform )
12691289
12701290 if abort_response .status_code not in (200 , 201 ):
12711291 raise ValueError (abort_response )
@@ -1283,31 +1303,39 @@ def _create_cloud_provider_session(self):
12831303 session .mount ("http://" , http_adapter )
12841304 return session
12851305
1286- def _retry_idempotent_operation (
1306+ def _retry_cloud_idempotent_operation (
12871307 self , operation : Callable [[], requests .Response ], before_retry : Callable = None
12881308 ) -> requests .Response :
1289- """Perform given idempotent operation with necessary retries. Since operation is idempotent it's
1290- safe to retry it for response codes where server state might have changed.
1309+ """Perform given idempotent operation with necessary retries for requests to non Databricks APIs.
1310+ For cloud APIs, we will retry on network errors and on server response codes.
1311+ Since operation is idempotent it's safe to retry it for response codes where server state might have changed.
12911312 """
12921313
1293- def delegate ():
1314+ def delegate () -> requests . Response :
12941315 response = operation ()
12951316 if response .status_code in self ._RETRYABLE_STATUS_CODES :
1296- attrs = {}
1297- # this will assign "retry_after_secs" to the attrs, essentially making exception look retryable
1298- _RetryAfterCustomizer ().customize_error (response , attrs )
1299- raise _error_mapper (response , attrs )
1317+ raise _RetryableException .make_error (response )
13001318 else :
13011319 return response
13021320
1321+ def extended_is_retryable (e : BaseException ) -> Optional [str ]:
1322+ retry_reason_from_base = _BaseClient ._is_retryable (e )
1323+ if retry_reason_from_base is not None :
1324+ return retry_reason_from_base
1325+
1326+ if isinstance (e , _RetryableException ):
1327+ # this is a retriable exception, but not a network error
1328+ return f"retryable exception (status_code:{ e .http_status_code } )"
1329+ return None
1330+
13031331 # following _BaseClient timeout
13041332 retry_timeout_seconds = self ._config .retry_timeout_seconds or 300
13051333
13061334 return retried (
13071335 timeout = timedelta (seconds = retry_timeout_seconds ),
13081336 # also retry on network errors (connection error, connection timeout)
13091337 # where we believe request didn't reach the server
1310- is_retryable = _BaseClient . _is_retryable ,
1338+ is_retryable = extended_is_retryable ,
13111339 before_retry = before_retry ,
13121340 )(delegate )()
13131341
0 commit comments