diff --git a/dataplug/cloudobject.py b/dataplug/cloudobject.py index 8666bd1..d572b4b 100644 --- a/dataplug/cloudobject.py +++ b/dataplug/cloudobject.py @@ -4,7 +4,6 @@ import inspect import logging import pickle -import tempfile from collections import namedtuple from copy import deepcopy from functools import partial @@ -58,7 +57,7 @@ def __init__( storage_config = storage_config or {} self._s3: S3Client = PickleableS3ClientProxy(**storage_config) - logger.info(f"Created reference for %s", self) + logger.info("Created reference for %s", self) logger.debug(f"{self._obj_path=},{self._meta_path=}") @property @@ -225,7 +224,7 @@ def preprocess(self, parallel_config=None, extra_args=None, chunk_size=None, for meta_bucket_head = None if not meta_bucket_head: - logger.info("Creating meta bucket %s".format(self.meta_path.bucket)) + logger.info("Creating meta bucket %s", self.meta_path.bucket) try: self.storage.create_bucket(Bucket=self.meta_path.bucket) except botocore.exceptions.ClientError as error: diff --git a/dataplug/fileobject.py b/dataplug/fileobject.py new file mode 100644 index 0000000..1fd7c2c --- /dev/null +++ b/dataplug/fileobject.py @@ -0,0 +1,372 @@ +from __future__ import annotations + +import copy +import inspect +import logging +import pickle +from collections import namedtuple +from copy import deepcopy +from functools import partial +from types import SimpleNamespace +from typing import TYPE_CHECKING + +import botocore.exceptions +import smart_open +import joblib + +from .entities import CloudDataFormat, CloudObjectSlice +from .preprocessing.handler import joblib_handler + +from .storage.filesystem import FileSystemS3API, FilePath +from .util import head_object, upload_file_with_progress + +if TYPE_CHECKING: + from typing import List, Tuple, Dict, Optional, Any + +logger = logging.getLogger(__name__) + + +class CloudObject: + def __init__( + self, + data_format: CloudDataFormat, + object_path: FilePath, + meta_path: FilePath, + attrs_path: FilePath, + storage_config: Optional[Dict[str, Any]] = None + ): + # Storage headers of the data object + self._obj_headers: Optional[Dict[str, str]] = None + # Storage headers of the metadata object + self._meta_headers: Optional[Dict[str, str]] = None + # Storage headers of the attributes object + self._attrs_headers: Optional[Dict[str, str]] = None + + # cls reference for the CloudDataType of this object + self._format_cls: CloudDataFormat = data_format + + self._obj_path = object_path + + # S3 Path for the metadata object. Located in bucket suffixed + # with .meta with the same key as original data object + self._meta_path = meta_path + + # S3 Path for the attributes object. Located in bucket suffixed + # with .meta with key as original data object suffixed with .attrs + self._attrs_path = attrs_path + self._attrs: Optional[SimpleNamespace] = None + + storage_config = storage_config or {} + self._s3 = FileSystemS3API() + + logger.info("Created reference for %s", self) + logger.debug(f"{self._obj_path=},{self._meta_path=}") + + @property + def path(self) -> FilePath: + return self._obj_path + + @property + def meta_path(self) -> FilePath: + return self._meta_path + + @property + def size(self) -> int: + if not self._obj_headers: + self.fetch() + return int(self._obj_headers["ContentLength"]) + + @property + def meta_size(self) -> int: + if self._meta_headers is None or "ContentLength" not in self._meta_headers: + raise AttributeError() + return int(self._meta_headers["ContentLength"]) + + @property + def storage(self) -> FileSystemS3API: + return self._s3 + + @property + def attributes(self) -> Any: + return self._attrs + + @property + def open(self) -> smart_open.smart_open: + assert self.storage is not None + logger.debug("Creating new smart_open client for uri %s", self.path.as_uri()) + client = copy.deepcopy(self.storage) + return partial( + smart_open.open, self.path.as_uri(), transport_params={"client": client} + ) + + @property + def open_metadata(self) -> smart_open.smart_open: + assert self.storage is not None + logger.debug("Creating new smart_open client for uri %s", self.path.as_uri()) + client = copy.deepcopy(self.storage) + return partial( + smart_open.open, + self.meta_path.as_uri(), + transport_params={"client": client}, + ) + + # @classmethod + # def from_s3( + # cls, + # data_format: CloudDataFormat, + # storage_uri: str, + # fetch: Optional[bool] = True, + # metadata_bucket: Optional[str] = None, + # s3_config: Optional[Dict[str, Any]] = None, + # ) -> CloudObject: + # obj_path = S3Path.from_uri(storage_uri) + # if metadata_bucket is None: + # metadata_bucket = obj_path.bucket + ".meta" + # metadata_path = S3Path.from_bucket_key(metadata_bucket, obj_path.key) + # attributes_path = S3Path.from_bucket_key( + # metadata_bucket, obj_path.key + ".attrs" + # ) + # co = cls(data_format, obj_path, metadata_path, attributes_path, s3_config) + # if fetch: + # co.fetch() + # return co + + @classmethod + def from_bucket_key( + cls, + data_format: CloudDataFormat, + bucket: str, + key: str, + fetch: Optional[bool] = True, + metadata_bucket: Optional[str] = None, + s3_config: Optional[Dict[str, Any]] = None, + ) -> CloudObject: + obj_path = FilePath.from_bucket_key(bucket, key) + if metadata_bucket is None: + metadata_bucket = bucket + ".meta" + meta_path = FilePath.from_bucket_key(metadata_bucket, key) + attributes_path = FilePath.from_bucket_key(metadata_bucket, key + ".attrs") + + co = cls(data_format, obj_path, meta_path, attributes_path) + if fetch: + co.fetch() + return co + + # @classmethod + # def new_from_file( + # cls, data_format, file_path, cloud_path, s3_config=None, override=False + # ) -> "CloudObject": + # obj_path = S3Path.from_uri(cloud_path) + # metadata_path = S3Path.from_bucket_key(obj_path.bucket + ".meta", obj_path.key) + # attributes_path = S3Path.from_bucket_key( + # obj_path.bucket + ".meta", obj_path.key + ".attrs" + # ) + # co_instance = cls( + # data_format, obj_path, metadata_path, attributes_path, s3_config + # ) + + # if co_instance.exists(): + # if not override: + # raise Exception("Object already exists") + # else: + # # Clean preprocessing metadata if object already exists + # co_instance.clean() + + # upload_file_with_progress( + # co_instance.storage, + # co_instance.path.bucket, + # co_instance.path.key, + # file_path, + # ) + # return co_instance + + def exists(self) -> bool: + if not self._obj_headers: + try: + self.fetch() + except KeyError: + return False + return bool(self._obj_headers) + + def is_preprocessed(self) -> bool: + try: + head_object(self.storage, bucket=self._meta_path.bucket, key=self._meta_path.key) + return True + except KeyError: + return False + + def fetch(self): + if not self._obj_headers: + logger.info("Fetching object from Storage") + self._fetch_object() + if not self._meta_headers: + logger.info("Fetching metadata from Storage") + self._fetch_metadata() + + def _fetch_object(self): + self._obj_headers, _ = head_object(self._s3, self._obj_path.bucket, self._obj_path.key) + + def _fetch_metadata(self): + try: + res, _ = head_object(self._s3, self._meta_path.bucket, self._meta_path.key) + self._meta_headers = res + res, _ = head_object(self._s3, self._attrs_path.bucket, self._attrs_path.key) + self._attrs_headers = res + get_res = self.storage.get_object(Bucket=self._attrs_path.bucket, Key=self._attrs_path.key) + try: + attrs_dict = pickle.load(get_res["Body"]) + # Get default attributes from the class, + # so we can have default attributes different from None set in the Class + base_attrs = deepcopy(self._format_cls.attrs_types) + # Replace attributes that have been set in the preprocessing stage + base_attrs.update(attrs_dict) + # Create namedtuple so that the attributes object is immutable + co_named_tuple = namedtuple( + self._format_cls.co_class.__name__ + "Attributes", base_attrs.keys() + ) + self._attrs = co_named_tuple(**base_attrs) + except Exception as e: + logger.error(e) + self._attrs = None + except KeyError as e: + self._meta_headers = None + self._attrs = None + + def clean(self): + logger.info("Cleaning indexes and metadata for %s", self) + self._s3.delete_object(Bucket=self._meta_path.bucket, Key=self._meta_path.key) + self._meta_headers = None + self.storage.delete_object(Bucket=self._attrs_path.bucket, Key=self._attrs_path.key) + self._attrs_headers = None + self._attrs = {} + + def preprocess( + self, + parallel_config=None, + extra_args=None, + chunk_size=None, + force=False, + debug=False, + ): + assert self.exists(), "Object not found in S3" + if self.is_preprocessed() and not force: + return + + parallel_config = parallel_config or {} + extra_args = extra_args or {} + + # Check if the metadata bucket exists, if not create it + try: + meta_bucket_head = self.storage.head_bucket(Bucket=self.meta_path.bucket) + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "404": + raise error + meta_bucket_head = None + + if not meta_bucket_head: + logger.info("Creating meta bucket %s", self.meta_path.bucket) + try: + self.storage.create_bucket(Bucket=self.meta_path.bucket) + except botocore.exceptions.ClientError as error: + logger.error( + "Metadata bucket %s not found -- Also failed to create it", + self.meta_path.bucket, + ) + raise error + + preproc_signature = inspect.signature( + self._format_cls.preprocessing_function + ).parameters + # Check if parameter cloud_object is in the signature + if "cloud_object" not in preproc_signature: + raise Exception( + "Preprocessing function must have cloud_object as a parameter" + ) + + jobs = [] + if chunk_size is None: + # Process the entire object as one batch job + preproc_args = {"cloud_object": self} + if "chunk_data" in preproc_signature: + # Placeholder, we will get the data inside the handler function, in case a remote joblib is used + # since a StreamingBody is not picklable + preproc_args["chunk_data"] = None + # get_res = self._s3.get_object(Bucket=self._obj_path.bucket, Key=self._obj_path.key) + # assert get_res["ResponseMetadata"]["HTTPStatusCode"] in (200, 206) + # preproc_args["chunk_data"] = get_res["Body"] + if "chunk_id" in preproc_signature: + preproc_args["chunk_id"] = 0 + if "chunk_size" in preproc_signature: + preproc_args["chunk_size"] = self.size + if "num_chunks" in preproc_signature: + preproc_args["num_chunks"] = 1 + + # Add extra args if there are any other arguments in the signature + for arg in preproc_signature.keys(): + if arg not in preproc_args and arg in extra_args: + preproc_args[arg] = extra_args[arg] + + jobs.append(preproc_args) + # preprocessing_metadata = self._format_cls.preprocessing_function(**preproc_args) + else: + assert chunk_size != 0 and chunk_size <= self.size, ( + "Chunk size must be greater than 0 " "and less or equal to object size" + ) + # Partition the object in chunks and preprocess it in parallel + if not {"chunk_data", "chunk_id", "chunk_size", "num_chunks"}.issubset( + preproc_signature.keys() + ): + raise Exception( + "Preprocessing function must have " + "(chunk_data, chunk_id, chunk_size, num_chunks) as a parameters" + ) + num_chunks = self.size // chunk_size + for chunk_id in range(num_chunks): + preproc_args = { + "cloud_object": self, + "chunk_id": chunk_id, + "chunk_size": chunk_size, + "num_chunks": num_chunks, + } + # Add extra args if there are any other arguments in the signature + for arg in preproc_signature.keys(): + if arg not in preproc_args: + preproc_args[arg] = extra_args[arg] + jobs.append(preproc_args) + + if debug: + # Run in the main thread for debugging + for job in jobs: + joblib_handler((self._format_cls.preprocessing_function, job)) + return + + with joblib.parallel_config(**parallel_config): + jl = joblib.Parallel() + f = jl( + [ + joblib.delayed(joblib_handler)( + (self._format_cls.preprocessing_function, job) + ) + for job in jobs + ] + ) + for res in f: + print(res) + + def get_attribute(self, key: str) -> Any: + return getattr(self._attrs, key) + + def partition(self, strategy, *args, **kwargs) -> List[CloudObjectSlice]: + assert self.is_preprocessed(), "Object must be preprocessed before partitioning" + + slices = strategy(self, *args, **kwargs) + # Store a reference to this CloudObject instance in the slice + for s in slices: + s.cloud_object = self + return slices + + def __getitem__(self, item): + return self._attrs.__getattribute__(item) + + def __repr__(self): + return f"{self.__class__.__name__}<{self._format_cls.co_class.__name__}>({self.path.as_uri()})" diff --git a/dataplug/storage/filesystem.py b/dataplug/storage/filesystem.py index 3ff314b..c993000 100644 --- a/dataplug/storage/filesystem.py +++ b/dataplug/storage/filesystem.py @@ -22,10 +22,17 @@ def __del__(self): for path, fh in self.__file_handles.items(): fh.close() - def abort_multipart_upload(self, Bucket: str, Key: str, UploadId: str, *args, **kwargs): + def _build_path(self, Bucket: str, Key: str) -> pathlib.Path: + return pathlib.Path(Bucket).joinpath(Key) + + def abort_multipart_upload( + self, Bucket: str, Key: str, UploadId: str, *args, **kwargs + ): raise NotImplementedError() - def complete_multipart_upload(self, Bucket: str, Key: str, UploadId: str, *args, **kwargs): + def complete_multipart_upload( + self, Bucket: str, Key: str, UploadId: str, *args, **kwargs + ): raise NotImplementedError() def create_bucket(self, Bucket: str, *args, **kwargs): @@ -52,7 +59,14 @@ def download_file(self, Bucket: str, Key: str, Filename: str, *args, **kwargs): with open(Filename, "wb") as f2: shutil.copyfileobj(f1, f2) - def download_fileobj(self, Bucket: str, Key: str, Fileobj: Union[IO[Any], StreamingBody], *args, **kwargs): + def download_fileobj( + self, + Bucket: str, + Key: str, + Fileobj: Union[IO[Any], StreamingBody], + *args, + **kwargs, + ): with self._open_as_file(Bucket, Key, "rb") as f: shutil.copyfileobj(f, Fileobj) @@ -62,20 +76,24 @@ def get_object(self, Bucket: str, Key: str, *args, **kwargs): size = path.stat().st_size if "Range" in kwargs: range_0, range_1 = kwargs["Range"].replace("bytes=", "").split("-") - range_0, range_1 = int(range_0), int(range_1) + range_0, range_1 = int(range_0), int(range_1) + 1 # +1 to be inclusive, like S3 API with path.open("rb") as f: # Read all chunk into memory, as we could seek after the intended range if we just open the file f.seek(range_0) chunk = f.read(range_1 - range_0) buff = io.BytesIO(chunk) return { - "Body": StreamingBody(raw_stream=buff, content_length=range_1 - range_0), + "Body": StreamingBody( + raw_stream=buff, content_length=range_1 - range_0 + ), "ContentLength": range_1 - range_0, "ResponseMetadata": {"HTTPStatusCode": 206}, } else: return { - "Body": StreamingBody(raw_stream=path.open("rb"), content_length=size), + "Body": StreamingBody( + raw_stream=path.open("rb"), content_length=size + ), "ContentLength": size, "ResponseMetadata": {"HTTPStatusCode": 200}, } @@ -117,7 +135,13 @@ def list_objects(self, Bucket: str, *args, **kwargs): if path.exists() and path.is_file(): return {"Contents": [{"Key": path.relative_to(path.parent).as_posix()}]} if path.exists() and path.is_dir(): - return {"Contents": [{"Key": p.relative_to(path).as_posix()} for p in path.glob("**/*") if p.is_file()]} + return { + "Contents": [ + {"Key": p.relative_to(path).as_posix()} + for p in path.glob("**/*") + if p.is_file() + ] + } else: # List parent if it is an incomplete path parent = path.parent @@ -147,12 +171,85 @@ def put_object(self, Bucket: str, Key: str, *args, **kwargs): path.parent.mkdir(parents=True, exist_ok=True) path.touch() - def upload_file(self, Bucket: str, Key: str, Filename: str, *args, **kwargs) -> None: + def upload_file( + self, Bucket: str, Key: str, Filename: str, *args, **kwargs + ) -> None: with open(Filename, "rb") as f: self.put_object(Bucket, Key, Body=f, *args, **kwargs) - def upload_fileobj(self, Bucket: str, Key: str, Fileobj: Union[IO[Any], StreamingBody], *args, **kwargs) -> None: + def upload_fileobj( + self, + Bucket: str, + Key: str, + Fileobj: Union[IO[Any], StreamingBody], + *args, + **kwargs, + ) -> None: self.put_object(Bucket, Key, Body=Fileobj, *args, **kwargs) - def upload_part(self, Bucket: str, Key: str, PartNumber: int, UploadId: str, *args, **kwargs): + def upload_part( + self, Bucket: str, Key: str, PartNumber: int, UploadId: str, *args, **kwargs + ): raise NotImplementedError() + + +class FilePath: + """Virtual bucket-key pair that links to a local FS path.""" + + def __init__(self, bucket_path, key_path): + self._bucket_path = bucket_path + self._key_path = key_path + + @classmethod + def from_bucket_key(cls, bucket: str, key: str) -> "FilePath": + bucket_path = pathlib.PurePath(bucket) + key_path = pathlib.PurePath(key) + if key_path.is_absolute(): + key_path = key_path.relative_to(bucket_path) + path = cls(bucket_path, key_path) + return path + + @property + def bucket(self) -> str: + """ + The virtual Bucket name + """ + return self._bucket_path + + @property + def key(self) -> str: + """ + The virtual Key name + """ + return self._key_path + + @property + def full(self) -> str: + """ + The full path name + """ + return self._bucket_path / self._key_path + + # @property + # def virtual_directory(self) -> str: + # """ + # The parent virtual directory of a key + # Example: foo/bar/baz -> foo/baz + # """ + # vdir, _ = self.key.rsplit("/", 1) + # return vdir + + def as_uri(self) -> str: + """ + Return the path as a URI. + """ + return self.full.as_uri() + + # def _absolute_path_validation(self): + # if not self.is_absolute(): + # raise ValueError("relative path have no bucket, key specification") + + def __repr__(self) -> str: + return "{}(bucket={},key={})".format( + self.__class__.__name__, self.bucket, self.key + ) diff --git a/dataplug/util.py b/dataplug/util.py index f162240..864ca5b 100644 --- a/dataplug/util.py +++ b/dataplug/util.py @@ -47,7 +47,8 @@ def head_object(s3client, bucket, key): metadata = {} try: head_res = s3client.head_object(Bucket=bucket, Key=key) - del head_res["ResponseMetadata"] + if "ResponseMetadata" in head_res: + del head_res["ResponseMetadata"] response = head_res if "Metadata" in head_res: metadata.update(head_res["Metadata"])