Skip to content

Commit 66cda91

Browse files
committed
[SC-179831] Preliminary support for seekable DLs
1 parent d3b85cb commit 66cda91

File tree

3 files changed

+273
-5
lines changed

3 files changed

+273
-5
lines changed

.codegen/__init__.py.tmpl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import databricks.sdk.dbutils as dbutils
33
from databricks.sdk.credentials_provider import CredentialsStrategy
44

55
from databricks.sdk.mixins.files import DbfsExt
6+
from databricks.sdk.mixins.files import FilesExt
67
from databricks.sdk.mixins.compute import ClustersExt
78
from databricks.sdk.mixins.workspace import WorkspaceExt
89
from databricks.sdk.mixins.open_ai_client import ServingEndpointsExt
@@ -18,7 +19,7 @@ from typing import Optional
1819
"google_credentials" "google_service_account" }}
1920

2021
{{- define "api" -}}
21-
{{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsAPI" "ServingEndpointsExt" -}}
22+
{{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "FilesAPI" "FilesExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsAPI" "ServingEndpointsExt" -}}
2223
{{- $genApi := concat .PascalName "API" -}}
2324
{{- getOrDefault $mixins $genApi $genApi -}}
2425
{{- end -}}

databricks/sdk/__init__.py

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

databricks/sdk/mixins/files.py

Lines changed: 268 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import sys
99
from abc import ABC, abstractmethod
1010
from collections import deque
11-
from io import BytesIO
11+
from datetime import datetime, timezone
12+
from enum import Enum
13+
from io import BytesIO, IOBase, BufferedIOBase, UnsupportedOperation
1214
from types import TracebackType
1315
from typing import (TYPE_CHECKING, AnyStr, BinaryIO, Generator, Iterable,
1416
Iterator, Type, Union)
@@ -17,6 +19,11 @@
1719
from .._property import _cached_property
1820
from ..errors import NotFound
1921
from ..service import files
22+
from ..service._internal import _escape_multi_segment_path_parameter
23+
from ..service.files import DownloadResponse
24+
25+
_FILES_MIXIN_DEBUG_ENABLED = True
26+
_FILES_MIXIN_ENABLE_UNSUPPORTED_FEATURES = True
2027

2128
if TYPE_CHECKING:
2229
from _typeshed import Self
@@ -636,3 +643,263 @@ def delete(self, path: str, *, recursive=False):
636643
if p.is_dir and not recursive:
637644
raise IOError('deleting directories requires recursive flag')
638645
p.delete(recursive=recursive)
646+
647+
648+
class FilesExt(files.FilesAPI):
649+
"""Extends the FilesAPI with support for complex multipart upload/download operations & more robust file I/O"""
650+
__doc__ = files.FilesAPI.__doc__
651+
652+
class _FileTransferBackend(Enum):
653+
DB_FILES_API = 1
654+
PRESIGNED_URLS = 2
655+
656+
def __init__(self, api_client):
657+
super().__init__(api_client)
658+
659+
def download(self, file_path: str, *, start_byte_offset: Optional[int] = None, if_unmodified_since_timestamp: Optional[datetime] = None) -> DownloadResponse:
660+
"""Download a file.
661+
662+
Downloads a file of up to 5 GiB. The file contents are the response body. This is a standard HTTP file
663+
download, not a JSON RPC.
664+
665+
:param file_path: str
666+
The absolute path of the file.
667+
668+
:returns: :class:`DownloadResponse`
669+
"""
670+
671+
headers = {'Accept': 'application/octet-stream', }
672+
673+
if start_byte_offset:
674+
headers['Range'] = f'bytes={start_byte_offset}-'
675+
676+
if if_unmodified_since_timestamp:
677+
headers['If-Unmodified-Since'] = if_unmodified_since_timestamp.strftime("%a, %d %b %Y %H:%M:%S GMT")
678+
679+
response_headers = ['content-length', 'content-type', 'last-modified', ]
680+
res = self._api.do('GET',
681+
f'/api/2.0/fs/files{_escape_multi_segment_path_parameter(file_path)}',
682+
headers=headers,
683+
response_headers=response_headers,
684+
raw=True)
685+
686+
return DownloadResponse.from_dict(res)
687+
688+
689+
class PresignedUrl:
690+
"""Represents all information needed to execute a presigned URL request"""
691+
692+
def __init__(self, method: str, url: str, headers: List[Dict[str, str]],
693+
headers_populated_by_client: List[str]):
694+
self.method = method
695+
self.url = url
696+
self.headers_populated_by_client = set(headers_populated_by_client)
697+
self.headers = {h["name"]: h["value"] for h in headers}
698+
699+
def all_client_headers_populated(self, user_headers: List[str]):
700+
return self.headers_populated_by_client.issubset(user_headers)
701+
702+
703+
class MultipartUploadCreatePartUrlsResponse:
704+
"""Represents the response of a request for presigned URLs for uploading parts of a file in a multipart upload session."""
705+
706+
def __init__(self, upload_part_urls: List[PresignedUrl], next_page_token: str):
707+
self.upload_part_urls = upload_part_urls
708+
self.next_page_token = next_page_token
709+
710+
711+
class MultipartUploadCreate:
712+
"""Represents the response to an initiated multipart upload session."""
713+
714+
def __init__(self, session_token: str, part_size: int):
715+
self.session_token = session_token
716+
self.part_size = part_size
717+
718+
719+
class SeekableDownloadBinaryIO(BufferedIOBase):
720+
"""Presents a remote filesystem object as a seekable BinaryIO stream """
721+
# TODO: The initial version will ONLY support resumption; it will NOT support truncation / end points
722+
# TODO: This currently is only handling situations where the underlying stream is closed at the start of a request. It should add:
723+
# - Automatic closure & reopening of a stream that throws an exception during normal operations
724+
# - Throwing an exception if we're unable to open a stream (i.e. non-200 response)
725+
def __init__(self, file_path: str, api: FilesExt):
726+
727+
# This is actively under development and should not be considered production ready or API stable.
728+
if not _FILES_MIXIN_ENABLE_UNSUPPORTED_FEATURES:
729+
raise NotImplementedError("SeekableDownloadBinaryIO is not yet supported")
730+
731+
self._file_path: str = file_path
732+
733+
self._api = api
734+
self._initial_request_metadata: DownloadResponse = self._api.download(self._file_path)
735+
self._dl_session_initiated = datetime.now(timezone.utc)
736+
self._current_pos_of_underlying_stream: int = 0
737+
self._start_pos_of_underlying_stream: int = 0
738+
self._underlying_stream: BinaryIO = self._initial_request_metadata.contents
739+
self._overall_file_size: int = self._initial_request_metadata.content_length
740+
self._most_recent_dl_resp = None
741+
self._closed: bool = False
742+
743+
def _replace_underlying_stream(self, __offset):
744+
"""Close the existing underlying stream and open a new one at the specified file offset"""
745+
old_stream = self._underlying_stream
746+
self._underlying_stream = self._api.download(self._file_path, start_byte_offset=__offset, if_unmodified_since_timestamp=self._dl_session_initiated).contents
747+
printd("Closed older stream")
748+
old_stream.close()
749+
printd("Set underlying stream")
750+
751+
def _underlying_stream_is_open(self):
752+
"""Convenience method indicating that the underlying stream is open. TODO: This also assumes that the stream does not auto-close at EOF. Might need to revisit that"""
753+
return self._underlying_stream is not None and not self._underlying_stream.closed
754+
755+
def _ensure_open_stream(self):
756+
"""Calling this will ensure that the underlying stream is open, smoothing over issues like socket timeouts to create the illusion of one indefinitely readable file stream"""
757+
if not self._underlying_stream_is_open():
758+
self._replace_underlying_stream(self.tell())
759+
760+
def detach(self):
761+
raise UnsupportedOperation("Detaching from the buffer is not supported")
762+
763+
def read(self, __size = -1, /):
764+
# Read and return up to size bytes. If omitted, None, or Negative, data is read until EOF is reached
765+
# Empty bytes object returned if stream is EOF
766+
self._ensure_open_stream()
767+
out = self._underlying_stream.read(__size)
768+
self._current_pos_of_underlying_stream += len(out)
769+
return out
770+
771+
def read1(self, __size = ...):
772+
# Read and return up to size bytes, with at most one read() system call
773+
self._ensure_open_stream()
774+
out = self._underlying_stream.read1(__size)
775+
self._current_pos_of_underlying_stream += len(out)
776+
return out
777+
778+
def readinto(self, __buffer):
779+
# Read up to len(buffer) bytes into buffer and return number of bytes read
780+
self._ensure_open_stream()
781+
out = self._underlying_stream.readinto(__buffer)
782+
self._current_pos_of_underlying_stream += len(out)
783+
return out
784+
785+
def readinto1(self, __buffer):
786+
# Read up to len(buffer) bytes into buffer with at most one read() system call
787+
self._ensure_open_stream()
788+
out = self._underlying_stream.readinto1(__buffer)
789+
self._current_pos_of_underlying_stream += len(out)
790+
return out
791+
792+
def write(self, __buffer):
793+
raise UnsupportedOperation("SeekableDownloadBinaryIO is used exclusively for read operations")
794+
795+
def close(self):
796+
"""Close the underlying stream & mark the SeekableBinaryIO stream as closed as well"""
797+
try:
798+
self._underlying_stream.close()
799+
self._closed = True
800+
except:
801+
self._underlying_stream = None
802+
self._closed = True
803+
804+
def closed(self):
805+
"""Reflects whether we permit additional operations on this stream"""
806+
return self._closed
807+
808+
def fileno(self):
809+
raise UnsupportedOperation("fileno() is not supported on this stream")
810+
811+
def flush(self):
812+
return
813+
814+
def isatty(self):
815+
return False
816+
817+
def readable(self):
818+
return True
819+
820+
def readline(self, __size = -1):
821+
self._ensure_open_stream()
822+
out = self._underlying_stream.readline(__size)
823+
self._current_pos_of_underlying_stream += len(out)
824+
return out
825+
826+
def readlines(self, __hint = -1):
827+
self._ensure_open_stream()
828+
out = self._underlying_stream.readlines(__hint)
829+
self._current_pos_of_underlying_stream += len(out)
830+
return out
831+
832+
def seek(self, __offset, __whence = os.SEEK_SET, /):
833+
"""
834+
Change the stream position to the given byte offset, which may necessitate closing the existing client connection and opening a new one.
835+
836+
:param __offset: Change the position to the byte offset, relative to the whence reference point
837+
:param __whence:
838+
- os.SEEK_SET / 0: Start of the file, offset must be 0 or positive
839+
- os.SEEK_CUR / 1: Current position, offset may be pos/neg/0
840+
- os.SEEK_END / 2: End of the file, offset must be 0 or negative
841+
:return: absolute position of the stream (in the overall file)
842+
"""
843+
844+
if self._underlying_stream.seekable() and __offset > 0:
845+
return self._start_pos_of_underlying_stream + self._underlying_stream.seek(__offset, __whence)
846+
847+
if(__whence == os.SEEK_SET):
848+
if(__offset < 0):
849+
raise ValueError("Seek position must be 0 or positive")
850+
851+
printd("Closing underlying stream, START")
852+
self._underlying_stream.close()
853+
854+
# TODO: Request new stream starting from byte __offset
855+
printd(f"Setting up new underlying stream, START, {__offset}")
856+
self._replace_underlying_stream(__offset)
857+
self._start_pos_of_underlying_stream = __offset
858+
return __offset
859+
860+
if(__whence == os.SEEK_CUR):
861+
if(__offset == 0):
862+
return self._underlying_stream.tell()
863+
self._underlying_stream.close()
864+
printd("Closing underlying stream, CUR")
865+
866+
# TODO: Request new stream starting from byte __offset
867+
new_offset = self._start_pos_of_underlying_stream + __offset
868+
printd(f"Setting up new underlying stream, CUR, {new_offset}")
869+
self._replace_underlying_stream(new_offset)
870+
self._start_pos_of_underlying_stream = new_offset
871+
return new_offset
872+
873+
if(__whence == os.SEEK_END):
874+
if(__offset > 0):
875+
raise ValueError("Seek position must be 0 or negative")
876+
877+
self._underlying_stream.close()
878+
printd("Closing underlying stream, END")
879+
new_offset = self._initial_request_metadata.content_length + __offset
880+
self._replace_underlying_stream(new_offset)
881+
self._start_pos_of_underlying_stream = new_offset
882+
return new_offset
883+
884+
885+
def seekable(self):
886+
return True
887+
888+
def tell(self):
889+
return self._current_pos_of_underlying_stream + self._start_pos_of_underlying_stream
890+
891+
def truncate(self, __size = None):
892+
raise UnsupportedOperation("Truncation is not supported on this stream")
893+
894+
def writable(self):
895+
return False
896+
897+
def writelines(self, __lines):
898+
raise UnsupportedOperation("Writing lines is not supported on this stream")
899+
900+
def __del__(self):
901+
self.close()
902+
903+
def printd(s):
904+
if _FILES_MIXIN_DEBUG_ENABLED:
905+
print(s)

0 commit comments

Comments
 (0)