Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
InputType,
PathsToCompress,
S3InputConfig,
S3ObjectMetadataInputConfig,
)
from job_orchestration.scheduler.task_result import CompressionTaskResult
from job_orchestration.scheduler.utils import is_s3_based_input


def update_compression_task_metadata(db_cursor, task_id, kv):
Expand Down Expand Up @@ -137,7 +139,7 @@ def _generate_fs_logs_list(
def _generate_s3_logs_list(
output_file_path: pathlib.Path,
paths_to_compress: PathsToCompress,
s3_input_config: S3InputConfig,
s3_input_config: S3InputConfig | S3ObjectMetadataInputConfig,
) -> None:
# S3 object keys are stored as file_paths in `PathsToCompress`
object_keys = paths_to_compress.file_paths
Expand Down Expand Up @@ -271,7 +273,7 @@ def _make_clp_s_command_and_env(
# fmt: on

compression_env_vars = dict(os.environ)
if InputType.S3 == clp_config.input.type and not clp_config.input.unstructured:
if is_s3_based_input(clp_config.input.type) and not clp_config.input.unstructured:
compression_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication))
compression_cmd.append("--auth")
compression_cmd.append("s3")
Expand Down Expand Up @@ -313,7 +315,7 @@ def _make_log_converter_command_and_env(
# fmt: on

conversion_env_vars = dict(os.environ)
if InputType.S3 == clp_config.input.type:
if is_s3_based_input(clp_config.input.type):
conversion_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication))
conversion_cmd.append("--auth")
conversion_cmd.append("s3")
Expand Down Expand Up @@ -392,7 +394,7 @@ def run_clp(
logs_list_path = tmp_dir / f"{instance_id_str}-log-paths.txt"
if InputType.FS == input_type:
_generate_fs_logs_list(logs_list_path, paths_to_compress)
elif InputType.S3 == input_type:
elif is_s3_based_input(input_type):
_generate_s3_logs_list(logs_list_path, paths_to_compress, clp_config.input)
else:
error_msg = f"Unsupported input type: {input_type}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
fetch_existing_datasets,
)
from clp_py_utils.compression import validate_path_and_get_info
from clp_py_utils.core import read_yaml_config_file
from clp_py_utils.core import (
FileMetadata,
read_yaml_config_file,
)
from clp_py_utils.s3_utils import s3_get_object_metadata
from clp_py_utils.sql_adapter import SqlAdapter
from pydantic import ValidationError
Expand All @@ -38,13 +41,15 @@
from job_orchestration.scheduler.constants import (
CompressionJobStatus,
CompressionTaskStatus,
INGESTED_S3_OBJECT_METADATA_TABLE_NAME,
SchedulerType,
)
from job_orchestration.scheduler.job_config import (
ClpIoConfig,
FsInputConfig,
InputType,
S3InputConfig,
S3ObjectMetadataInputConfig,
)
from job_orchestration.scheduler.scheduler_data import (
CompressionJob,
Expand Down Expand Up @@ -183,6 +188,57 @@ def _process_s3_input(
paths_to_compress_buffer.add_file(object_metadata)


def _process_s3_object_metadata_input(
s3_object_metadata_input_config: S3ObjectMetadataInputConfig,
paths_to_compress_buffer: PathsToCompressBuffer,
db_context: DbContext,
) -> None:
"""
Fetches S3 object metadata rows from the `INGESTED_S3_OBJECT_METADATA_TABLE_NAME` table for the
given `s3_object_metadata_ids` and `ingestion_job_id`, and adds the metadata to
`paths_to_compress_buffer`.

:param s3_object_metadata_input_config:
:param paths_to_compress_buffer:
:param db_context:
:raises RuntimeError: If no rows are found, or if any requested metadata_id is missing.
"""
s3_object_metadata_ids = s3_object_metadata_input_config.s3_object_metadata_ids
ingestion_job_id = s3_object_metadata_input_config.ingestion_job_id

placeholders = ", ".join(["%s"] * len(s3_object_metadata_ids))
query = (
f"SELECT `id`, `key`, `size` FROM {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} "
f"WHERE id IN ({placeholders}) AND ingestion_job_id = %s"
)
params = (*s3_object_metadata_ids, ingestion_job_id)
db_context.cursor.execute(query, params)
metadata_list = db_context.cursor.fetchall()
if len(metadata_list) == 0:
raise RuntimeError(
f"No rows found in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for the given "
f"s3_object_metadata_ids and ingestion_job_id {ingestion_job_id}."
)
# Validate that all requested IDs are present.
returned_ids = {row["id"] for row in metadata_list}
requested_ids = set(s3_object_metadata_ids)
missing_ids = requested_ids - returned_ids
if len(missing_ids) > 0:
raise RuntimeError(
f"Missing metadata rows in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for "
f"ingestion_job_id {ingestion_job_id}: {sorted(missing_ids)}."
)

for metadata in metadata_list:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each key, we should also ensure it contains the expected prefix specified in S3Config (the base class of S3ObjectMetadataInputConfig).

if not metadata["key"].startswith(s3_object_metadata_input_config.key_prefix):
raise RuntimeError(
f"Metadata key {metadata['key']} does not start with the key prefix "
f"{s3_object_metadata_input_config.key_prefix}."
)
file_metadata = FileMetadata(path=Path(metadata["key"]), size=int(metadata["size"]))
paths_to_compress_buffer.add_file(file_metadata)


def _write_user_failure_log(
title: str,
content: list[str],
Expand Down Expand Up @@ -321,6 +377,22 @@ def search_and_schedule_new_tasks(
},
)
return
elif input_type == InputType.S3_OBJECT_METADATA.value:
try:
_process_s3_object_metadata_input(
input_config, paths_to_compress_buffer, db_context
)
except Exception as err:
logger.exception("Failed to process S3 object metadata input for job %s", job_id)
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": f"S3 object metadata input failure: {err}",
},
)
return
else:
logger.error(f"Unsupported input type {input_type}")
update_compression_job_metadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from enum import auto, IntEnum

INGESTED_S3_OBJECT_METADATA_TABLE_NAME = "ingested_s3_object_metadata"

TASK_QUEUE_LOWEST_PRIORITY = 1
TASK_QUEUE_HIGHEST_PRIORITY = 3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
class InputType(LowercaseStrEnum):
FS = auto()
S3 = auto()
S3_OBJECT_METADATA = auto()


class PathsToCompress(BaseModel):
Expand Down Expand Up @@ -44,6 +45,24 @@ def validate_keys(cls, value):
return value


class S3ObjectMetadataInputConfig(S3Config):
type: Literal[InputType.S3_OBJECT_METADATA.value] = InputType.S3_OBJECT_METADATA.value
ingestion_job_id: int
dataset: str | None = None
timestamp_key: str | None = None
unstructured: bool = False
s3_object_metadata_ids: list[int]

@field_validator("s3_object_metadata_ids")
@classmethod
def validate_s3_object_metadata_ids_non_empty(cls, value: list[int]) -> list[int]:
if len(value) == 0:
raise ValueError("s3_object_metadata_ids cannot be an empty list")
if len(value) != len(set(value)):
raise ValueError("s3_object_metadata_ids must be a list of unique IDs")
return value


class OutputConfig(BaseModel):
target_archive_size: int
target_dictionaries_size: int
Expand All @@ -53,7 +72,7 @@ class OutputConfig(BaseModel):


class ClpIoConfig(BaseModel):
input: FsInputConfig | S3InputConfig
input: FsInputConfig | S3InputConfig | S3ObjectMetadataInputConfig
output: OutputConfig


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
QueryTaskStatus,
SchedulerType,
)
from job_orchestration.scheduler.job_config import InputType


def is_s3_based_input(input_type: InputType) -> bool:
return InputType.S3 == input_type or InputType.S3_OBJECT_METADATA == input_type


def kill_hanging_jobs(sql_adapter: SqlAdapter, scheduler_type: str) -> list[int] | None:
Expand Down
Loading