Skip to content

Commit e6b4a20

Browse files
feat(job-orchestration): Allow compression jobs to specify S3 object keys via S3InputConfig. (#1407)
Co-authored-by: Junhao Liao <[email protected]>
1 parent 06297c0 commit e6b4a20

File tree

2 files changed

+170
-23
lines changed

2 files changed

+170
-23
lines changed

components/clp-py-utils/clp_py_utils/s3_utils.py

Lines changed: 162 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import os
22
import re
33
from pathlib import Path
4-
from typing import Dict, Final, List, Optional, Set, Tuple, Union
4+
from typing import Dict, Final, Generator, List, Optional, Set, Tuple, Union
55

66
import boto3
7+
import botocore
78
from botocore.config import Config
89
from job_orchestration.scheduler.job_config import S3InputConfig
910

@@ -254,36 +255,27 @@ def generate_s3_virtual_hosted_style_url(
254255

255256
def s3_get_object_metadata(s3_input_config: S3InputConfig) -> List[FileMetadata]:
256257
"""
257-
Gets the metadata of all objects under the <bucket>/<key_prefix> specified by s3_input_config.
258+
Gets the metadata of all objects specified by the given input config.
259+
258260
NOTE: We reuse FileMetadata to store the metadata of S3 objects where the object's key is stored
259261
as `path` in FileMetadata.
260262
261263
:param s3_input_config:
262-
:return: List[FileMetadata] containing the object's metadata on success,
263-
:raises: Propagates `boto3.client`'s exceptions.
264-
:raises: Propagates `boto3.client.get_paginator`'s exceptions.
265-
:raises: Propagates `boto3.paginator`'s exceptions.
264+
:return: A list of `FileMetadata` containing the object's metadata on success.
265+
:raise: Propagates `_create_s3_client`'s exceptions.
266+
:raise: Propagates `_s3_get_object_metadata_from_single_prefix`'s exceptions.
267+
:raise: Propagates `_s3_get_object_metadata_from_keys`'s exceptions.
266268
"""
267-
268269
s3_client = _create_s3_client(s3_input_config.region_code, s3_input_config.aws_authentication)
269270

270-
file_metadata_list: List[FileMetadata] = list()
271-
paginator = s3_client.get_paginator("list_objects_v2")
272-
pages = paginator.paginate(Bucket=s3_input_config.bucket, Prefix=s3_input_config.key_prefix)
273-
for page in pages:
274-
contents = page.get("Contents", None)
275-
if contents is None:
276-
continue
277-
278-
for obj in contents:
279-
object_key = obj["Key"]
280-
if object_key.endswith("/"):
281-
# Skip any object that resolves to a directory-like path
282-
continue
283-
284-
file_metadata_list.append(FileMetadata(Path(object_key), obj["Size"]))
271+
if s3_input_config.keys is None:
272+
return _s3_get_object_metadata_from_single_prefix(
273+
s3_client, s3_input_config.bucket, s3_input_config.key_prefix
274+
)
285275

286-
return file_metadata_list
276+
return _s3_get_object_metadata_from_keys(
277+
s3_client, s3_input_config.bucket, s3_input_config.key_prefix, s3_input_config.keys
278+
)
287279

288280

289281
def s3_put(s3_config: S3Config, src_file: Path, dest_path: str) -> None:
@@ -392,3 +384,150 @@ def _gen_deletion_config(objects_list: List[str]):
392384
Bucket=s3_config.bucket,
393385
Delete=_gen_deletion_config(objects_to_delete),
394386
)
387+
388+
389+
def _s3_get_object_metadata_from_single_prefix(
390+
s3_client: boto3.client, bucket: str, key_prefix: str
391+
) -> List[FileMetadata]:
392+
"""
393+
Gets the metadata of all objects under the <`bucket`>/<`key_prefix`>.
394+
395+
:param s3_client:
396+
:param bucket:
397+
:param key_prefix:
398+
:return: A list of `FileMetadata` containing the object's metadata on success.
399+
:raise: Propagates `_iter_s3_objects`'s exceptions.
400+
"""
401+
file_metadata_list: List[FileMetadata] = list()
402+
for object_key, object_size in _iter_s3_objects(s3_client, bucket, key_prefix):
403+
file_metadata_list.append(FileMetadata(Path(object_key), object_size))
404+
405+
return file_metadata_list
406+
407+
408+
def _s3_get_object_metadata_from_keys(
409+
s3_client: boto3.client, bucket: str, key_prefix: str, keys: List[str]
410+
) -> List[FileMetadata]:
411+
"""
412+
Gets the metadata of all objects specified in `keys` under the <`bucket`>.
413+
414+
:param s3_client:
415+
:param bucket:
416+
:param key_prefix:
417+
:param keys:
418+
:return: A list of `FileMetadata` containing the object's metadata on success.
419+
:raise: ValueError if `keys` is an empty list.
420+
:raise: ValueError if any key in `keys` doesn't start with `key_prefix`.
421+
:raise: ValueError if duplicate keys are found in `keys`.
422+
:raise: ValueError if any key in `keys` ends with `/`.
423+
:raise: ValueError if any key in `keys` doesn't exist in the bucket.
424+
:raise: Propagates `_s3_get_object_metadata_from_key`'s exceptions.
425+
:raise: Propagates `_iter_s3_objects`'s exceptions.
426+
"""
427+
# Key validation
428+
if len(keys) == 0:
429+
raise ValueError("The list of keys is empty.")
430+
431+
keys = sorted(keys)
432+
for idx, key in enumerate(keys):
433+
if not key.startswith(key_prefix):
434+
raise ValueError(f"Key `{key}` doesn't start with the specified prefix `{key_prefix}`.")
435+
if idx > 0 and key == keys[idx - 1]:
436+
raise ValueError(f"Duplicate key found: `{key}`.")
437+
if key.endswith("/"):
438+
raise ValueError(f"Key `{key}` is invalid: S3 object keys must not end with `/`.")
439+
440+
key_iterator = iter(keys)
441+
first_key = next(key_iterator)
442+
file_metadata_list: List[FileMetadata] = []
443+
file_metadata_list.append(_s3_get_object_metadata_from_key(s3_client, bucket, first_key))
444+
445+
next_key = next(key_iterator, None)
446+
if next_key is None:
447+
return file_metadata_list
448+
449+
for object_key, object_size in _iter_s3_objects(s3_client, bucket, key_prefix, first_key):
450+
# We need to do both < and > checks since they are handled differently. Ideally, we can do
451+
# it with a single comparison. However, Python doesn't support three-way comparison.
452+
if object_key < next_key:
453+
continue
454+
if object_key > next_key:
455+
raise ValueError(f"Key `{next_key}` doesn't exist in the bucket `{bucket}`.")
456+
457+
file_metadata_list.append(FileMetadata(Path(object_key), object_size))
458+
next_key = next(key_iterator, None)
459+
if next_key is None:
460+
# Early exit since all keys have been found.
461+
return file_metadata_list
462+
463+
# If control flow reaches here, it means there are still keys left to find.
464+
absent_keys = []
465+
while next_key is not None:
466+
absent_keys.append(next_key)
467+
next_key = next(key_iterator, None)
468+
serialized_absent_keys = "\n".join(absent_keys)
469+
raise ValueError(
470+
f"Cannot find following keys in the bucket `{bucket}`:\n{serialized_absent_keys}"
471+
)
472+
473+
474+
def _s3_get_object_metadata_from_key(
475+
s3_client: boto3.client, bucket: str, key: str
476+
) -> FileMetadata:
477+
"""
478+
Gets the metadata of an object specified by the `key` under the <`bucket`>.
479+
480+
:param s3_client:
481+
:param bucket:
482+
:param key:
483+
:return: A `FileMetadata` containing the object's metadata on success.
484+
:raise: ValueError if the object doesn't exist or fails to read the metadata.
485+
:raise: Propagates `boto3.client.head_object`'s exceptions.
486+
"""
487+
try:
488+
return FileMetadata(
489+
Path(key), s3_client.head_object(Bucket=bucket, Key=key)["ContentLength"]
490+
)
491+
except botocore.exceptions.ClientError as e:
492+
raise ValueError(
493+
f"Failed to read metadata of the key `{key}` from the bucket `{bucket}`"
494+
f" with the error: {e}."
495+
) from e
496+
497+
498+
def _iter_s3_objects(
499+
s3_client: boto3.client, bucket: str, key_prefix: str, start_from: str | None = None
500+
) -> Generator[Tuple[str, int], None, None]:
501+
"""
502+
Iterates over objects in an S3 bucket under the specified prefix, optionally starting after a
503+
given key.
504+
505+
NOTE: Any object key that resolves to a directory-like path (i.e., ends with `/`) will be
506+
skipped.
507+
508+
:param s3_client:
509+
:param bucket:
510+
:param key_prefix:
511+
:param start_from: Optional key to start listing after.
512+
:yield: The next object to iterator, presenting as a tuple that contains:
513+
- The key of the object.
514+
- The size of the object.
515+
:raise: Propagates `boto3.client.get_paginator`'s exceptions.
516+
:raise: Propagates `boto3.paginator`'s exceptions.
517+
"""
518+
paginator = s3_client.get_paginator("list_objects_v2")
519+
paginator_args = {"Bucket": bucket, "Prefix": key_prefix}
520+
if start_from is not None:
521+
paginator_args["StartAfter"] = start_from
522+
pages = paginator.paginate(**paginator_args)
523+
for page in pages:
524+
contents = page.get("Contents", None)
525+
if contents is None:
526+
continue
527+
for obj in contents:
528+
object_key = obj["Key"]
529+
if object_key.endswith("/"):
530+
# Skip any object that resolves to a directory-like path
531+
continue
532+
object_size = obj["Size"]
533+
yield object_key, object_size

components/job-orchestration/job_orchestration/scheduler/job_config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,17 @@ class FsInputConfig(BaseModel):
3030

3131
class S3InputConfig(S3Config):
3232
type: Literal[InputType.S3.value] = InputType.S3.value
33+
keys: Optional[List[str]] = None
3334
dataset: Optional[str] = None
3435
timestamp_key: Optional[str] = None
3536

37+
@field_validator("keys")
38+
@classmethod
39+
def validate_keys(cls, value):
40+
if value is not None and len(value) == 0:
41+
raise ValueError("Keys cannot be an empty list")
42+
return value
43+
3644

3745
class OutputConfig(BaseModel):
3846
tags: Optional[List[str]] = None

0 commit comments

Comments
 (0)