Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 144 additions & 21 deletions components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, Final, List, Optional, Set, Tuple, Union

import boto3
import botocore
from botocore.config import Config
from job_orchestration.scheduler.job_config import S3InputConfig

Expand Down Expand Up @@ -254,36 +255,27 @@ def generate_s3_virtual_hosted_style_url(

def s3_get_object_metadata(s3_input_config: S3InputConfig) -> List[FileMetadata]:
"""
Gets the metadata of all objects under the <bucket>/<key_prefix> specified by s3_input_config.
Gets the metadata of all objects specified by the given input config.

NOTE: We reuse FileMetadata to store the metadata of S3 objects where the object's key is stored
as `path` in FileMetadata.

:param s3_input_config:
:return: List[FileMetadata] containing the object's metadata on success,
:raises: Propagates `boto3.client`'s exceptions.
:raises: Propagates `boto3.client.get_paginator`'s exceptions.
:raises: Propagates `boto3.paginator`'s exceptions.
:return: A list of `FileMetadata` containing the object's metadata on success.
:raise: Propagates `_s3_get_object_metadata_from_single_prefix`'s exceptions.
:raise: Propagates `_s3_get_object_metadata_from_keys`'s exceptions.
"""

s3_client = _create_s3_client(s3_input_config.region_code, s3_input_config.aws_authentication)

file_metadata_list: List[FileMetadata] = list()
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=s3_input_config.bucket, Prefix=s3_input_config.key_prefix)
for page in pages:
contents = page.get("Contents", None)
if contents is None:
continue

for obj in contents:
object_key = obj["Key"]
if object_key.endswith("/"):
# Skip any object that resolves to a directory-like path
continue

file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"]))
if s3_input_config.keys is None:
return _s3_get_object_metadata_from_single_prefix(
s3_client, s3_input_config.bucket, s3_input_config.key_prefix
)

return file_metadata_list
return _s3_get_object_metadata_from_keys(
s3_client, s3_input_config.bucket, s3_input_config.key_prefix, s3_input_config.keys
)


def s3_put(s3_config: S3Config, src_file: Path, dest_path: str) -> None:
Expand Down Expand Up @@ -392,3 +384,134 @@ def _gen_deletion_config(objects_list: List[str]):
Bucket=s3_config.bucket,
Delete=_gen_deletion_config(objects_to_delete),
)


def _s3_get_object_metadata_from_single_prefix(
s3_client: boto3.client, bucket: str, key_prefix: str
) -> List[FileMetadata]:
"""
Gets the metadata of all objects under the <`bucket`>/<`key_prefix`>.

:param s3_client:
:param bucket:
:param key_prefix:
:return: A list of `FileMetadata` containing the object's metadata on success.
:raise: Propagates `boto3.client`'s exceptions.
:raise: Propagates `boto3.client.get_paginator`'s exceptions.
:raise: Propagates `boto3.paginator`'s exceptions.
"""
file_metadata_list: List[FileMetadata] = list()
paginator = s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=bucket, Prefix=key_prefix)
for page in pages:
contents = page.get("Contents", None)
if contents is None:
continue

for obj in contents:
object_key = obj["Key"]
if object_key.endswith("/"):
# Skip any object that resolves to a directory-like path
continue

file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"]))

return file_metadata_list


def _s3_get_object_metadata_from_keys(
s3_client: boto3.client, bucket: str, key_prefix: str, keys: List[str]
) -> List[FileMetadata]:
"""
Gets the metadata of all objects specified in `keys` under the <`bucket`>.

:param s3_client:
:param bucket:
:param keys:
:return: A list of `FileMetadata` containing the object's metadata on success.
:raises: ValueError if `keys` is an empty list.
:raises: ValueError if any key in `keys` doesn't start with `key_prefix`.
:raises: ValueError if duplicate keys are found in `keys`.
:raises: ValueError if any key in `keys` doesn't exist in the bucket.
:raises: Propagates `_s3_get_object_metadata_from_key`'s exceptions.
:raises: Propagates `boto3.client.get_paginator`'s exceptions.
:raises: Propagates `boto3.paginator`'s exceptions.
"""
# Key validation
if len(keys) == 0:
raise ValueError("The list of keys is empty.")

keys.sort()
for idx, key in enumerate(keys):
if not key.startswith(key_prefix):
raise ValueError(f"Key `{key}` doesn't start with the specified prefix `{key_prefix}`.")
if idx > 0 and key == keys[idx - 1]:
raise ValueError(f"Duplicate key found: `{key}`.")

key_iterator = iter(keys)
first_key = next(key_iterator)
file_metadata_list: List[FileMetadata] = list()
file_metadata_list.append(_s3_get_object_metadata_from_key(s3_client, bucket, first_key))

next_key = next(key_iterator, None)
if next_key is None:
return file_metadata_list

paginator = s3_client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=key_prefix, StartAfter=first_key):
contents = page.get("Contents", None)
if contents is None:
continue

for obj in contents:
object_key = obj["Key"]
if object_key.endswith("/"):
# Skip any object that resolves to a directory-like path
continue

# We need to do both < and > checks since they are handled differently. Ideally, we can
# do it with a single comparison. However, Python doesn't support three-way comparison.
if object_key < next_key:
continue
if object_key > next_key:
raise ValueError(f"Key `{next_key}` doesn't exist in the bucket `{bucket}`.")

file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"]))
next_key = next(key_iterator, None)
if next_key is None:
# Early exit sine all keys have been found.
return file_metadata_list

# If control flow reaches here, it means there are still keys left to find.
absent_keys = []
while next_key is not None:
absent_keys.append(next_key)
next_key = next(key_iterator, None)
serialized_absent_keys = "\n".join(absent_keys)
raise ValueError(
f"Cannot find following keys in the bucket `{bucket}`:\n{serialized_absent_keys}"
)


def _s3_get_object_metadata_from_key(
s3_client: boto3.client, bucket: str, key: str
) -> FileMetadata:
"""
Gets the metadata of an object specified by the `key` under the <`bucket`>.

:param s3_client:
:param bucket:
:param key:
:return: A `FileMetadata` containing the object's metadata on success.
:raises: ValueError if the object doesn't exist or fails to read the metadata.
:raises: Propagates `boto3.client.head_object`'s exceptions.
"""
try:
return FileMetadata(
Path(key), s3_client.head_object(Bucket=bucket, Key=key)["ContentLength"]
)
except botocore.exceptions.ClientError as e:
raise ValueError(
f"Failed to read metadata of the key `{key}` from the bucket `{bucket}`"
f" with the error: {e}."
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ class FsInputConfig(BaseModel):

class S3InputConfig(S3Config):
type: Literal[InputType.S3.value] = InputType.S3.value
keys: Optional[List[str]] = None
dataset: Optional[str] = None
timestamp_key: Optional[str] = None

@field_validator("keys")
@classmethod
def validate_keys(cls, value):
if value is not None and len(value) == 0:
raise ValueError("Keys cannot be an empty list")
return value
Comment on lines +37 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Annotate the validator return type to keep Ruff happy

Ruff’s ANN206 rule flags this classmethod because it lacks a return-type annotation. Adding -> Optional[List[str]] keeps lint clean and documents intent.

-    def validate_keys(cls, value):
+    def validate_keys(cls, value: Optional[List[str]]) -> Optional[List[str]]:
🧰 Tools
🪛 Ruff (0.14.0)

39-39: Missing return type annotation for classmethod validate_keys

(ANN206)


41-41: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In components/job-orchestration/job_orchestration/scheduler/job_config.py around
lines 37 to 42, the @field_validator classmethod validate_keys lacks a
return-type annotation which trips Ruff ANN206; add an explicit return type like
-> Optional[List[str]] to the method signature and, if not already imported,
import Optional and List from typing (or use from typing import Optional, List)
so the validator is properly annotated and the linter is satisfied.



class OutputConfig(BaseModel):
tags: Optional[List[str]] = None
Expand Down
Loading