Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from job_orchestration.scheduler.job_config import (
ClpIoConfig,
InputType,
LogIngestorSubmittedS3InputConfig,
PathsToCompress,
S3InputConfig,
)
Expand Down Expand Up @@ -137,7 +138,7 @@ def _generate_fs_logs_list(
def _generate_s3_logs_list(
output_file_path: pathlib.Path,
paths_to_compress: PathsToCompress,
s3_input_config: S3InputConfig,
s3_input_config: S3InputConfig | LogIngestorSubmittedS3InputConfig,
) -> None:
# S3 object keys are stored as file_paths in `PathsToCompress`
object_keys = paths_to_compress.file_paths
Expand Down Expand Up @@ -271,7 +272,9 @@ def _make_clp_s_command_and_env(
# fmt: on

compression_env_vars = dict(os.environ)
if InputType.S3 == clp_config.input.type and not clp_config.input.unstructured:
if (
InputType.S3 == clp_config.input.type or InputType.INGESTOR == clp_config.input.type
) and not clp_config.input.unstructured:
compression_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication))
compression_cmd.append("--auth")
compression_cmd.append("s3")
Expand Down Expand Up @@ -313,7 +316,7 @@ def _make_log_converter_command_and_env(
# fmt: on

conversion_env_vars = dict(os.environ)
if InputType.S3 == clp_config.input.type:
if InputType.S3 == clp_config.input.type or InputType.INGESTOR == clp_config.input.type:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is repeated quite a few times - shall we create a helper like

def _is_s3_based_input(input_type: InputType) -> bool:
    return input_type in {InputType.S3, InputType.INGESTOR}

conversion_env_vars.update(get_credential_env_vars(clp_config.input.aws_authentication))
conversion_cmd.append("--auth")
conversion_cmd.append("s3")
Expand Down Expand Up @@ -392,7 +395,7 @@ def run_clp(
logs_list_path = tmp_dir / f"{instance_id_str}-log-paths.txt"
if InputType.FS == input_type:
_generate_fs_logs_list(logs_list_path, paths_to_compress)
elif InputType.S3 == input_type:
elif InputType.S3 == input_type or InputType.INGESTOR == input_type:
_generate_s3_logs_list(logs_list_path, paths_to_compress, clp_config.input)
else:
error_msg = f"Unsupported input type: {input_type}."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
fetch_existing_datasets,
)
from clp_py_utils.compression import validate_path_and_get_info
from clp_py_utils.core import read_yaml_config_file
from clp_py_utils.core import (
FileMetadata,
read_yaml_config_file,
)
from clp_py_utils.s3_utils import s3_get_object_metadata
from clp_py_utils.sql_adapter import SqlAdapter
from pydantic import ValidationError
Expand All @@ -38,12 +41,14 @@
from job_orchestration.scheduler.constants import (
CompressionJobStatus,
CompressionTaskStatus,
INGESTED_S3_OBJECT_METADATA_TABLE_NAME,
SchedulerType,
)
from job_orchestration.scheduler.job_config import (
ClpIoConfig,
FsInputConfig,
InputType,
LogIngestorSubmittedS3InputConfig,
S3InputConfig,
)
from job_orchestration.scheduler.scheduler_data import (
Expand Down Expand Up @@ -183,6 +188,75 @@ def _process_s3_input(
paths_to_compress_buffer.add_file(object_metadata)


def _fetch_ingested_s3_object_metadata(
metadata_ids: list[int],
ingestion_job_id: int,
db_cursor: Any,
) -> list[dict[str, Any]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider returning a list of FileMetadata directly from this function (or a small dataclass/NamedTuple) instead of passing raw dicts through. this would also eliminate the separate loop in _process_log_ingestor_submitted_s3_input and make the code more self-documenting

"""
Fetches S3 object metadata rows from the INGESTED_S3_OBJECT_METADATA_TABLE_NAME table for the
given metadata_ids and ingestion_job_id.

:param metadata_ids: IDs to fetch.
:param ingestion_job_id: Ingestion job to filter by.
:param db_cursor: Database cursor for the query.
:return: List of row dicts with "id", "key", and "size".
:raises RuntimeError: If no rows are found, or if any requested metadata_id is missing.
"""
if not metadata_ids:
return []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this dead code given we already

raise ValueError("metadata_ids cannot be an empty list")


placeholders = ", ".join(["%s"] * len(metadata_ids))
query = (
f"SELECT id, `key`, size FROM {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} "
f"WHERE id IN ({placeholders}) AND ingestion_job_id = %s"
)
params = (*metadata_ids, ingestion_job_id)
db_cursor.execute(query, params)
metadata_list = db_cursor.fetchall()
if not metadata_list:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, please use the explicit size check.

raise RuntimeError(
f"No rows found in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for the given "
f"metadata_ids and ingestion_job_id {ingestion_job_id}."
)

# Validate that all requested IDs are present.
returned_ids = {row["id"] for row in metadata_list}
requested_ids = set(metadata_ids)
missing_ids = requested_ids - returned_ids
if missing_ids:
raise RuntimeError(
f"Missing metadata rows in {INGESTED_S3_OBJECT_METADATA_TABLE_NAME} for "
f"ingestion_job_id {ingestion_job_id}: {sorted(missing_ids)}."
)

return metadata_list


def _process_log_ingestor_submitted_s3_input(
log_ingestor_submitted_s3_input_config: LogIngestorSubmittedS3InputConfig,
paths_to_compress_buffer: PathsToCompressBuffer,
db_context: DbContext,
) -> None:
"""
Retrieves S3 object metadata based on the config's metadata_ids and ingestion_job_id,
then adds FileMetadata for each row to paths_to_compress_buffer.

:param log_ingestor_submitted_s3_input_config:
:param paths_to_compress_buffer:
:param db_context:
:raises: Propagates `_fetch_ingested_s3_object_metadata`'s exceptions.
"""
metadata_ids = log_ingestor_submitted_s3_input_config.metadata_ids
ingestion_job_id = log_ingestor_submitted_s3_input_config.ingestion_job_id
metadata_list = _fetch_ingested_s3_object_metadata(
metadata_ids, ingestion_job_id, db_context.cursor
)
for metadata in metadata_list:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each key, we should also ensure it contains the expected prefix specified in S3Config (the base class of S3ObjectMetadataInputConfig).

file_metadata = FileMetadata(path=Path(metadata["key"]), size=int(metadata["size"]))
paths_to_compress_buffer.add_file(file_metadata)


def _write_user_failure_log(
title: str,
content: list[str],
Expand Down Expand Up @@ -321,6 +395,24 @@ def search_and_schedule_new_tasks(
},
)
return
elif input_type == InputType.INGESTOR.value:
try:
_process_log_ingestor_submitted_s3_input(
input_config, paths_to_compress_buffer, db_context
)
except Exception as err:
logger.exception(
"Failed to process log ingestor submitted S3 input for job %s", job_id
)
update_compression_job_metadata(
db_context,
job_id,
{
"status": CompressionJobStatus.FAILED,
"status_msg": f"Log ingestor submitted S3 input failure: {err}",
},
)
return
else:
logger.error(f"Unsupported input type {input_type}")
update_compression_job_metadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from enum import auto, IntEnum

INGESTED_S3_OBJECT_METADATA_TABLE_NAME = "ingested_s3_object_metadata"

TASK_QUEUE_LOWEST_PRIORITY = 1
TASK_QUEUE_HIGHEST_PRIORITY = 3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
class InputType(LowercaseStrEnum):
FS = auto()
S3 = auto()
INGESTOR = auto()


class PathsToCompress(BaseModel):
Expand Down Expand Up @@ -44,6 +45,22 @@ def validate_keys(cls, value):
return value


class LogIngestorSubmittedS3InputConfig(S3Config):
type: Literal[InputType.INGESTOR.value] = InputType.INGESTOR.value
ingestion_job_id: int
dataset: str | None = None
timestamp_key: str | None = None
unstructured: bool = False
metadata_ids: list[int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, prefer s3_object_metadata_ids.


@field_validator("metadata_ids")
@classmethod
def validate_metadata_ids_non_empty(cls, value: list[int]) -> list[int]:
if not value:
raise ValueError("metadata_ids cannot be an empty list")
return value


class OutputConfig(BaseModel):
target_archive_size: int
target_dictionaries_size: int
Expand All @@ -53,7 +70,7 @@ class OutputConfig(BaseModel):


class ClpIoConfig(BaseModel):
input: FsInputConfig | S3InputConfig
input: FsInputConfig | S3InputConfig | LogIngestorSubmittedS3InputConfig
output: OutputConfig


Expand Down
Loading