Skip to content

Commit a4c228d

Browse files
committed
Files API client: recover on download failures (#844)
1 parent 8975d07 commit a4c228d

File tree

5 files changed

+582
-6
lines changed

5 files changed

+582
-6
lines changed

databricks/sdk/__init__.py

Lines changed: 7 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

databricks/sdk/_base_client.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import io
22
import logging
3+
from abc import ABC, abstractmethod
34
import urllib.parse
45
from datetime import timedelta
56
from types import TracebackType
@@ -284,9 +285,18 @@ def _record_request_log(self, response: requests.Response, raw: bool = False) ->
284285
return
285286
logger.debug(RoundTrip(response, self._debug_headers, self._debug_truncate_bytes, raw).generate())
286287

288+
class _RawResponse(ABC):
289+
@abstractmethod
290+
# follows Response signature: https://github.com/psf/requests/blob/main/src/requests/models.py#L799
291+
def iter_content(self, chunk_size: int = 1, decode_unicode: bool = False):
292+
pass
293+
294+
@abstractmethod
295+
def close(self):
296+
pass
287297

288298
class _StreamingResponse(BinaryIO):
289-
_response: requests.Response
299+
_response: _RawResponse
290300
_buffer: bytes
291301
_content: Union[Iterator[bytes], None]
292302
_chunk_size: Union[int, None]
@@ -298,7 +308,7 @@ def fileno(self) -> int:
298308
def flush(self) -> int:
299309
pass
300310

301-
def __init__(self, response: requests.Response, chunk_size: Union[int, None] = None):
311+
def __init__(self, response: _RawResponse, chunk_size: Union[int, None] = None):
302312
self._response = response
303313
self._buffer = b''
304314
self._content = None
@@ -308,7 +318,7 @@ def _open(self) -> None:
308318
if self._closed:
309319
raise ValueError("I/O operation on closed file")
310320
if not self._content:
311-
self._content = self._response.iter_content(chunk_size=self._chunk_size)
321+
self._content = self._response.iter_content(chunk_size=self._chunk_size, decode_unicode=False)
312322

313323
def __enter__(self) -> BinaryIO:
314324
self._open()

databricks/sdk/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ class Config:
9292
max_connections_per_pool: int = ConfigAttribute()
9393
databricks_environment: Optional[DatabricksEnvironment] = None
9494

95+
enable_experimental_files_api_client: bool = ConfigAttribute(env='DATABRICKS_ENABLE_EXPERIMENTAL_FILES_API_CLIENT')
96+
files_api_client_download_max_total_recovers = None
97+
files_api_client_download_max_total_recovers_without_progressing = 1
98+
9599
def __init__(
96100
self,
97101
*,

databricks/sdk/mixins/files.py

Lines changed: 181 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,25 @@
88
import sys
99
from abc import ABC, abstractmethod
1010
from collections import deque
11+
from collections.abc import Iterator
1112
from io import BytesIO
1213
from types import TracebackType
13-
from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Generator, Iterable,
14-
Iterator, Type, Union)
14+
from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Generator, Iterable, Optional, Type, Union)
1515
from urllib import parse
16+
from requests import RequestException
1617

18+
import logging
1719
from .._property import _cached_property
1820
from ..errors import NotFound
1921
from ..service import files
22+
from ..service._internal import _escape_multi_segment_path_parameter
23+
from ..service.files import DownloadResponse
24+
from .._base_client import _RawResponse, _StreamingResponse
2025

2126
if TYPE_CHECKING:
2227
from _typeshed import Self
2328

29+
_LOG = logging.getLogger(__name__)
2430

2531
class _DbfsIO(BinaryIO):
2632
MAX_CHUNK_SIZE = 1024 * 1024
@@ -636,3 +642,176 @@ def delete(self, path: str, *, recursive=False):
636642
if p.is_dir and not recursive:
637643
raise IOError('deleting directories requires recursive flag')
638644
p.delete(recursive=recursive)
645+
646+
647+
class FilesExt(files.FilesAPI):
648+
__doc__ = files.FilesAPI.__doc__
649+
650+
def __init__(self, api_client, config: Config):
651+
super().__init__(api_client)
652+
self._config = config.copy()
653+
654+
def download(self, file_path: str) -> DownloadResponse:
655+
"""Download a file.
656+
657+
Downloads a file of any size. The file contents are the response body.
658+
This is a standard HTTP file download, not a JSON RPC.
659+
660+
It is strongly recommended, for fault tolerance reasons,
661+
to iteratively consume from the stream with a maximum read(size)
662+
defined instead of using indefinite-size reads.
663+
664+
:param file_path: str
665+
The remote path of the file, e.g. /Volumes/path/to/your/file
666+
667+
:returns: :class:`DownloadResponse`
668+
"""
669+
670+
initial_response: DownloadResponse = self._download_raw_stream(file_path=file_path,
671+
start_byte_offset=0,
672+
if_unmodified_since_timestamp=None)
673+
674+
wrapped_response = self._wrap_stream(file_path, initial_response)
675+
initial_response.contents._response = wrapped_response
676+
return initial_response
677+
678+
def _download_raw_stream(self,
679+
file_path: str,
680+
start_byte_offset: int,
681+
if_unmodified_since_timestamp: Optional[str] = None) -> DownloadResponse:
682+
headers = {'Accept': 'application/octet-stream', }
683+
684+
if start_byte_offset and not if_unmodified_since_timestamp:
685+
raise Exception("if_unmodified_since_timestamp is required if start_byte_offset is specified")
686+
687+
if start_byte_offset:
688+
headers['Range'] = f'bytes={start_byte_offset}-'
689+
690+
if if_unmodified_since_timestamp:
691+
headers['If-Unmodified-Since'] = if_unmodified_since_timestamp
692+
693+
response_headers = ['content-length', 'content-type', 'last-modified', ]
694+
res = self._api.do('GET',
695+
f'/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}',
696+
headers=headers,
697+
response_headers=response_headers,
698+
raw=True)
699+
700+
result = DownloadResponse.from_dict(res)
701+
if not isinstance(result.contents, _StreamingResponse):
702+
raise Exception("Internal error: response contents is of unexpected type: " + type(result.contents).__name__)
703+
704+
return result
705+
706+
def _wrap_stream(self, file_path: str, downloadResponse: DownloadResponse):
707+
underlying_response = _ResilientIterator._extract_raw_response(downloadResponse)
708+
return _ResilientResponse(self, file_path, downloadResponse.last_modified, offset=0,
709+
underlying_response=underlying_response)
710+
711+
712+
class _ResilientResponse(_RawResponse):
713+
# _StreamingResponse uses two methods of the underlying response:
714+
# - _response.iter_content(chunk_size=self._chunk_size)
715+
# - _response.close
716+
# we need to provide them and nothing else
717+
718+
def __init__(self, api: FilesExt, file_path: str, file_last_modified: str, offset: int,
719+
underlying_response: _RawResponse):
720+
self.api = api
721+
self.file_path = file_path
722+
self.underlying_response = underlying_response
723+
self.offset = offset
724+
self.file_last_modified = file_last_modified
725+
726+
def iter_content(self, chunk_size=1, decode_unicode=False):
727+
if decode_unicode:
728+
raise ValueError('Decode unicode is not supported')
729+
730+
iterator = self.underlying_response.iter_content(chunk_size=chunk_size, decode_unicode=False)
731+
self.iterator = _ResilientIterator(iterator, self.file_path, self.file_last_modified,
732+
self.offset, self.api, chunk_size)
733+
return self.iterator
734+
735+
def close(self):
736+
self.iterator.close()
737+
738+
739+
class _ResilientIterator(Iterator):
740+
# This class tracks current offset (returned to the client code)
741+
# and recovers from failures by requesting download from the current offset.
742+
743+
@staticmethod
744+
def _extract_raw_response(download_response: DownloadResponse) -> _RawResponse:
745+
streaming_response: _StreamingResponse = download_response.contents # this is an instance of _StreamingResponse
746+
return streaming_response._response
747+
748+
def __init__(self, underlying_iterator, file_path: str, file_last_modified: str, offset: int,
749+
api: FilesExt, chunk_size: int):
750+
self._underlying_iterator = underlying_iterator
751+
self._api = api
752+
self._file_path = file_path
753+
754+
# Absolute current offset (0-based), i.e. number of bytes from the beginning of the file
755+
# that were so far returned to the caller code.
756+
self._offset = offset
757+
self._file_last_modified = file_last_modified
758+
self._chunk_size = chunk_size
759+
760+
self._total_recovers_count: int = 0
761+
self._recovers_without_progressing_count: int = 0
762+
self._closed: bool = False
763+
764+
765+
def _should_recover(self) -> bool:
766+
if self._total_recovers_count == self._api._config.files_api_client_download_max_total_recovers:
767+
_LOG.debug("Total recovers limit exceeded")
768+
return False
769+
if self._api._config.files_api_client_download_max_total_recovers_without_progressing is not None and self._recovers_without_progressing_count >= self._api._config.files_api_client_download_max_total_recovers_without_progressing:
770+
_LOG.debug("No progression recovers limit exceeded")
771+
return False
772+
return True
773+
774+
def _recover(self) -> bool:
775+
if not self._should_recover():
776+
return False # recover suppressed, rethrow original exception
777+
778+
self._total_recovers_count += 1
779+
self._recovers_without_progressing_count += 1
780+
781+
try:
782+
self._underlying_iterator.close()
783+
784+
_LOG.debug("Trying to recover from offset " + str(self._offset))
785+
786+
# following call includes all the required network retries
787+
downloadResponse = self._api._download_raw_stream(self._file_path, self._offset, self._file_last_modified)
788+
underlying_response = _ResilientIterator._extract_raw_response(downloadResponse)
789+
self._underlying_iterator = underlying_response.iter_content(chunk_size=self._chunk_size, decode_unicode=False)
790+
_LOG.debug("Recover succeeded")
791+
return True
792+
except:
793+
return False # recover failed, rethrow original exception
794+
795+
def __next__(self):
796+
if self._closed:
797+
# following _BaseClient
798+
raise ValueError("I/O operation on closed file")
799+
800+
while True:
801+
try:
802+
returned_bytes = next(self._underlying_iterator)
803+
self._offset += len(returned_bytes)
804+
self._recovers_without_progressing_count = 0
805+
return returned_bytes
806+
807+
except StopIteration:
808+
raise
809+
810+
# https://requests.readthedocs.io/en/latest/user/quickstart/#errors-and-exceptions
811+
except RequestException:
812+
if not self._recover():
813+
raise
814+
815+
def close(self):
816+
self._underlying_iterator.close()
817+
self._closed = True

0 commit comments

Comments
 (0)