diff --git a/components/clp-py-utils/clp_py_utils/s3_utils.py b/components/clp-py-utils/clp_py_utils/s3_utils.py index 47d97d1884..dd19209a32 100644 --- a/components/clp-py-utils/clp_py_utils/s3_utils.py +++ b/components/clp-py-utils/clp_py_utils/s3_utils.py @@ -1,9 +1,10 @@ import os import re from pathlib import Path -from typing import Dict, Final, List, Optional, Set, Tuple, Union +from typing import Dict, Final, Generator, List, Optional, Set, Tuple, Union import boto3 +import botocore from botocore.config import Config from job_orchestration.scheduler.job_config import S3InputConfig @@ -254,36 +255,27 @@ def generate_s3_virtual_hosted_style_url( def s3_get_object_metadata(s3_input_config: S3InputConfig) -> List[FileMetadata]: """ - Gets the metadata of all objects under the / specified by s3_input_config. + Gets the metadata of all objects specified by the given input config. + NOTE: We reuse FileMetadata to store the metadata of S3 objects where the object's key is stored as `path` in FileMetadata. :param s3_input_config: - :return: List[FileMetadata] containing the object's metadata on success, - :raises: Propagates `boto3.client`'s exceptions. - :raises: Propagates `boto3.client.get_paginator`'s exceptions. - :raises: Propagates `boto3.paginator`'s exceptions. + :return: A list of `FileMetadata` containing the object's metadata on success. + :raise: Propagates `_create_s3_client`'s exceptions. + :raise: Propagates `_s3_get_object_metadata_from_single_prefix`'s exceptions. + :raise: Propagates `_s3_get_object_metadata_from_keys`'s exceptions. """ - s3_client = _create_s3_client(s3_input_config.region_code, s3_input_config.aws_authentication) - file_metadata_list: List[FileMetadata] = list() - paginator = s3_client.get_paginator("list_objects_v2") - pages = paginator.paginate(Bucket=s3_input_config.bucket, Prefix=s3_input_config.key_prefix) - for page in pages: - contents = page.get("Contents", None) - if contents is None: - continue - - for obj in contents: - object_key = obj["Key"] - if object_key.endswith("/"): - # Skip any object that resolves to a directory-like path - continue - - file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"])) + if s3_input_config.keys is None: + return _s3_get_object_metadata_from_single_prefix( + s3_client, s3_input_config.bucket, s3_input_config.key_prefix + ) - return file_metadata_list + return _s3_get_object_metadata_from_keys( + s3_client, s3_input_config.bucket, s3_input_config.key_prefix, s3_input_config.keys + ) def s3_put(s3_config: S3Config, src_file: Path, dest_path: str) -> None: @@ -392,3 +384,150 @@ def _gen_deletion_config(objects_list: List[str]): Bucket=s3_config.bucket, Delete=_gen_deletion_config(objects_to_delete), ) + + +def _s3_get_object_metadata_from_single_prefix( + s3_client: boto3.client, bucket: str, key_prefix: str +) -> List[FileMetadata]: + """ + Gets the metadata of all objects under the <`bucket`>/<`key_prefix`>. + + :param s3_client: + :param bucket: + :param key_prefix: + :return: A list of `FileMetadata` containing the object's metadata on success. + :raise: Propagates `_iter_s3_objects`'s exceptions. + """ + file_metadata_list: List[FileMetadata] = list() + for object_key, object_size in _iter_s3_objects(s3_client, bucket, key_prefix): + file_metadata_list.append(FileMetadata(Path(object_key), object_size)) + + return file_metadata_list + + +def _s3_get_object_metadata_from_keys( + s3_client: boto3.client, bucket: str, key_prefix: str, keys: List[str] +) -> List[FileMetadata]: + """ + Gets the metadata of all objects specified in `keys` under the <`bucket`>. + + :param s3_client: + :param bucket: + :param key_prefix: + :param keys: + :return: A list of `FileMetadata` containing the object's metadata on success. + :raise: ValueError if `keys` is an empty list. + :raise: ValueError if any key in `keys` doesn't start with `key_prefix`. + :raise: ValueError if duplicate keys are found in `keys`. + :raise: ValueError if any key in `keys` ends with `/`. + :raise: ValueError if any key in `keys` doesn't exist in the bucket. + :raise: Propagates `_s3_get_object_metadata_from_key`'s exceptions. + :raise: Propagates `_iter_s3_objects`'s exceptions. + """ + # Key validation + if len(keys) == 0: + raise ValueError("The list of keys is empty.") + + keys = sorted(keys) + for idx, key in enumerate(keys): + if not key.startswith(key_prefix): + raise ValueError(f"Key `{key}` doesn't start with the specified prefix `{key_prefix}`.") + if idx > 0 and key == keys[idx - 1]: + raise ValueError(f"Duplicate key found: `{key}`.") + if key.endswith("/"): + raise ValueError(f"Key `{key}` is invalid: S3 object keys must not end with `/`.") + + key_iterator = iter(keys) + first_key = next(key_iterator) + file_metadata_list: List[FileMetadata] = [] + file_metadata_list.append(_s3_get_object_metadata_from_key(s3_client, bucket, first_key)) + + next_key = next(key_iterator, None) + if next_key is None: + return file_metadata_list + + for object_key, object_size in _iter_s3_objects(s3_client, bucket, key_prefix, first_key): + # We need to do both < and > checks since they are handled differently. Ideally, we can do + # it with a single comparison. However, Python doesn't support three-way comparison. + if object_key < next_key: + continue + if object_key > next_key: + raise ValueError(f"Key `{next_key}` doesn't exist in the bucket `{bucket}`.") + + file_metadata_list.append(FileMetadata(Path(object_key), object_size)) + next_key = next(key_iterator, None) + if next_key is None: + # Early exit since all keys have been found. + return file_metadata_list + + # If control flow reaches here, it means there are still keys left to find. + absent_keys = [] + while next_key is not None: + absent_keys.append(next_key) + next_key = next(key_iterator, None) + serialized_absent_keys = "\n".join(absent_keys) + raise ValueError( + f"Cannot find following keys in the bucket `{bucket}`:\n{serialized_absent_keys}" + ) + + +def _s3_get_object_metadata_from_key( + s3_client: boto3.client, bucket: str, key: str +) -> FileMetadata: + """ + Gets the metadata of an object specified by the `key` under the <`bucket`>. + + :param s3_client: + :param bucket: + :param key: + :return: A `FileMetadata` containing the object's metadata on success. + :raise: ValueError if the object doesn't exist or fails to read the metadata. + :raise: Propagates `boto3.client.head_object`'s exceptions. + """ + try: + return FileMetadata( + Path(key), s3_client.head_object(Bucket=bucket, Key=key)["ContentLength"] + ) + except botocore.exceptions.ClientError as e: + raise ValueError( + f"Failed to read metadata of the key `{key}` from the bucket `{bucket}`" + f" with the error: {e}." + ) from e + + +def _iter_s3_objects( + s3_client: boto3.client, bucket: str, key_prefix: str, start_from: str | None = None +) -> Generator[Tuple[str, int], None, None]: + """ + Iterates over objects in an S3 bucket under the specified prefix, optionally starting after a + given key. + + NOTE: Any object key that resolves to a directory-like path (i.e., ends with `/`) will be + skipped. + + :param s3_client: + :param bucket: + :param key_prefix: + :param start_from: Optional key to start listing after. + :yield: The next object to iterator, presenting as a tuple that contains: + - The key of the object. + - The size of the object. + :raise: Propagates `boto3.client.get_paginator`'s exceptions. + :raise: Propagates `boto3.paginator`'s exceptions. + """ + paginator = s3_client.get_paginator("list_objects_v2") + paginator_args = {"Bucket": bucket, "Prefix": key_prefix} + if start_from is not None: + paginator_args["StartAfter"] = start_from + pages = paginator.paginate(**paginator_args) + for page in pages: + contents = page.get("Contents", None) + if contents is None: + continue + for obj in contents: + object_key = obj["Key"] + if object_key.endswith("/"): + # Skip any object that resolves to a directory-like path + continue + object_size = obj["Size"] + yield object_key, object_size diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index 9b941bec73..70288711e9 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -30,9 +30,17 @@ class FsInputConfig(BaseModel): class S3InputConfig(S3Config): type: Literal[InputType.S3.value] = InputType.S3.value + keys: Optional[List[str]] = None dataset: Optional[str] = None timestamp_key: Optional[str] = None + @field_validator("keys") + @classmethod + def validate_keys(cls, value): + if value is not None and len(value) == 0: + raise ValueError("Keys cannot be an empty list") + return value + class OutputConfig(BaseModel): tags: Optional[List[str]] = None