diff --git a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py index 0d03542472..dcdca505cb 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py @@ -38,8 +38,10 @@ InputType, PathsToCompress, S3InputConfig, + S3ObjectMetadataInputConfig, ) from job_orchestration.scheduler.task_result import CompressionTaskResult +from job_orchestration.scheduler.utils import is_s3_based_input def update_compression_task_metadata(db_cursor, task_id, kv): @@ -137,7 +139,7 @@ def _generate_fs_logs_list( def _generate_s3_logs_list( output_file_path: pathlib.Path, paths_to_compress: PathsToCompress, - s3_input_config: S3InputConfig, + s3_input_config: S3InputConfig | S3ObjectMetadataInputConfig, ) -> None: # S3 object keys are stored as file_paths in `PathsToCompress` object_keys = paths_to_compress.file_paths @@ -271,7 +273,7 @@ def _make_clp_s_command_and_env( # fmt: on compression_env_vars = dict(os.environ) - if InputType.S3 == clp_config.input.type and not clp_config.input.unstructured: + if is_s3_based_input(clp_config.input.type) and not clp_config.input.unstructured: compression_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication)) compression_cmd.append("--auth") compression_cmd.append("s3") @@ -313,7 +315,7 @@ def _make_log_converter_command_and_env( # fmt: on conversion_env_vars = dict(os.environ) - if InputType.S3 == clp_config.input.type: + if is_s3_based_input(clp_config.input.type): conversion_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication)) conversion_cmd.append("--auth") conversion_cmd.append("s3") @@ -392,7 +394,7 @@ def run_clp( logs_list_path = tmp_dir / f"{instance_id_str}-log-paths.txt" if InputType.FS == input_type: _generate_fs_logs_list(logs_list_path, paths_to_compress) - elif InputType.S3 == input_type: + elif is_s3_based_input(input_type): _generate_s3_logs_list(logs_list_path, paths_to_compress, clp_config.input) else: error_msg = f"Unsupported input type: {input_type}." diff --git a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py index 4975fe5f0d..009aca7f6d 100644 --- a/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py @@ -26,7 +26,10 @@ fetch_existing_datasets, ) from clp_py_utils.compression import validate_path_and_get_info -from clp_py_utils.core import read_yaml_config_file +from clp_py_utils.core import ( + FileMetadata, + read_yaml_config_file, +) from clp_py_utils.s3_utils import s3_get_object_metadata from clp_py_utils.sql_adapter import SqlAdapter from pydantic import ValidationError @@ -38,6 +41,7 @@ from job_orchestration.scheduler.constants import ( CompressionJobStatus, CompressionTaskStatus, + INGESTED_S3_OBJECT_METADATA_TABLE_NAME, SchedulerType, ) from job_orchestration.scheduler.job_config import ( @@ -45,6 +49,7 @@ FsInputConfig, InputType, S3InputConfig, + S3ObjectMetadataInputConfig, ) from job_orchestration.scheduler.scheduler_data import ( CompressionJob, @@ -183,6 +188,58 @@ def _process_s3_input( paths_to_compress_buffer.add_file(object_metadata) +def _process_s3_object_metadata_input( + s3_object_metadata_input_config: S3ObjectMetadataInputConfig, + paths_to_compress_buffer: PathsToCompressBuffer, + db_context: DbContext, +) -> None: + """ + Fetches S3 object metadata rows from the `INGESTED_S3_OBJECT_METADATA_TABLE_NAME` table for the + given `s3_object_metadata_ids` and `ingestion_job_id`, and adds the metadata to + `paths_to_compress_buffer`. + + :param s3_object_metadata_input_config: + :param paths_to_compress_buffer: + :param db_context: + :raises RuntimeError: If no rows are found, or if any requested metadata_id is missing. + """ + s3_object_metadata_ids = s3_object_metadata_input_config.s3_object_metadata_ids + ingestion_job_id = s3_object_metadata_input_config.ingestion_job_id + + placeholders = ", ".join(["%s"] * len(s3_object_metadata_ids)) + query = ( + f"SELECT `id`, `key`, `size` FROM {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} " + f"WHERE `id` IN ({placeholders}) AND `ingestion_job_id` = %s" + ) + params = (*s3_object_metadata_ids, ingestion_job_id) + db_context.cursor.execute(query, params) + metadata_list = db_context.cursor.fetchall() + if len(metadata_list) == 0: + raise RuntimeError( + f"No rows found in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for the given " + f"s3_object_metadata_ids and ingestion_job_id {ingestion_job_id}." + ) + + # Validate that all requested IDs are present. + returned_ids = {row["id"] for row in metadata_list} + requested_ids = set(s3_object_metadata_ids) + missing_ids = requested_ids - returned_ids + if len(missing_ids) > 0: + raise RuntimeError( + f"Missing metadata rows in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for " + f"ingestion_job_id {ingestion_job_id}: {sorted(missing_ids)}." + ) + + for metadata in metadata_list: + if not metadata["key"].startswith(s3_object_metadata_input_config.key_prefix): + raise RuntimeError( + f"Metadata key {metadata['key']} does not start with the key prefix " + f"{s3_object_metadata_input_config.key_prefix}." + ) + file_metadata = FileMetadata(path=Path(metadata["key"]), size=int(metadata["size"])) + paths_to_compress_buffer.add_file(file_metadata) + + def _write_user_failure_log( title: str, content: list[str], @@ -321,6 +378,22 @@ def search_and_schedule_new_tasks( }, ) return + elif input_type == InputType.S3_OBJECT_METADATA.value: + try: + _process_s3_object_metadata_input( + input_config, paths_to_compress_buffer, db_context + ) + except Exception as err: + logger.exception("Failed to process S3 object metadata input for job %s", job_id) + update_compression_job_metadata( + db_context, + job_id, + { + "status": CompressionJobStatus.FAILED, + "status_msg": f"S3 object metadata input failure: {err}", + }, + ) + return else: logger.error(f"Unsupported input type {input_type}") update_compression_job_metadata( diff --git a/components/job-orchestration/job_orchestration/scheduler/constants.py b/components/job-orchestration/job_orchestration/scheduler/constants.py index 4a66ab462e..c5bd6fdcb8 100644 --- a/components/job-orchestration/job_orchestration/scheduler/constants.py +++ b/components/job-orchestration/job_orchestration/scheduler/constants.py @@ -2,6 +2,8 @@ from enum import auto, IntEnum +INGESTED_S3_OBJECT_METADATA_TABLE_NAME = "ingested_s3_object_metadata" + TASK_QUEUE_LOWEST_PRIORITY = 1 TASK_QUEUE_HIGHEST_PRIORITY = 3 diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index d7ce48296f..535a6a7e5a 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -11,6 +11,7 @@ class InputType(LowercaseStrEnum): FS = auto() S3 = auto() + S3_OBJECT_METADATA = auto() class PathsToCompress(BaseModel): @@ -44,6 +45,24 @@ def validate_keys(cls, value): return value +class S3ObjectMetadataInputConfig(S3Config): + type: Literal[InputType.S3_OBJECT_METADATA.value] = InputType.S3_OBJECT_METADATA.value + ingestion_job_id: int + dataset: str | None = None + timestamp_key: str | None = None + unstructured: bool = False + s3_object_metadata_ids: list[int] + + @field_validator("s3_object_metadata_ids") + @classmethod + def validate_s3_object_metadata_ids(cls, value: list[int]) -> list[int]: + if len(value) == 0: + raise ValueError("s3_object_metadata_ids cannot be an empty list") + if len(value) != len(set(value)): + raise ValueError("s3_object_metadata_ids must be a list of unique IDs") + return value + + class OutputConfig(BaseModel): target_archive_size: int target_dictionaries_size: int @@ -53,7 +72,7 @@ class OutputConfig(BaseModel): class ClpIoConfig(BaseModel): - input: FsInputConfig | S3InputConfig + input: FsInputConfig | S3InputConfig | S3ObjectMetadataInputConfig output: OutputConfig diff --git a/components/job-orchestration/job_orchestration/scheduler/utils.py b/components/job-orchestration/job_orchestration/scheduler/utils.py index 3735113a33..4b526d798f 100644 --- a/components/job-orchestration/job_orchestration/scheduler/utils.py +++ b/components/job-orchestration/job_orchestration/scheduler/utils.py @@ -17,6 +17,11 @@ QueryTaskStatus, SchedulerType, ) +from job_orchestration.scheduler.job_config import InputType + + +def is_s3_based_input(input_type: InputType) -> bool: + return InputType.S3 == input_type or InputType.S3_OBJECT_METADATA == input_type def kill_hanging_jobs(sql_adapter: SqlAdapter, scheduler_type: str) -> list[int] | None: