diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 4a927576cb..c890699578 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -7,6 +7,7 @@ import subprocess import typing import uuid +from contextlib import closing from enum import auto from typing import List, Optional, Tuple @@ -14,6 +15,7 @@ from clp_py_utils.clp_config import ( CLP_DEFAULT_CREDENTIALS_FILE_PATH, CLPConfig, + Database, DB_COMPONENT_NAME, QUEUE_COMPONENT_NAME, REDIS_COMPONENT_NAME, @@ -23,12 +25,14 @@ WEBUI_COMPONENT_NAME, WorkerConfig, ) +from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets from clp_py_utils.core import ( get_config_value, make_config_path_absolute, read_yaml_config_file, validate_path_could_be_dir, ) +from clp_py_utils.sql_adapter import SQL_Adapter from strenum import KebabCaseStrEnum # CONSTANTS @@ -556,3 +560,21 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None: f"Invalid path: `{path}` cannot be under '{prefix}' which may overlap with a path" f" in the container." ) + + +def validate_dataset(db_config: Database, dataset: str) -> None: + """ + Validates that `dataset` exists in the metadata database. + + :param db_config: + :param dataset: + :raise: ValueError if the dataset doesn't exist. + """ + sql_adapter = SQL_Adapter(db_config) + clp_db_connection_params = db_config.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + if dataset not in fetch_existing_datasets(db_cursor, table_prefix): + raise ValueError(f"Dataset `{dataset}` doesn't exist.") diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index c3789a28e1..785c7858d8 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -619,21 +619,6 @@ def dispatch_job_and_update_db( ) -def _validate_dataset( - db_cursor, - table_prefix: str, - dataset: str, - existing_datasets: Set[str], -) -> bool: - if dataset in existing_datasets: - return True - - # NOTE: This assumes we never delete a dataset. - new_datasets = fetch_existing_datasets(db_cursor, table_prefix) - existing_datasets.update(new_datasets) - return dataset in existing_datasets - - def handle_pending_query_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], @@ -662,21 +647,22 @@ def handle_pending_query_jobs( table_prefix = clp_metadata_db_conn_params["table_prefix"] dataset = QueryJobConfig.parse_obj(job_config).dataset - if dataset is not None and not _validate_dataset( - db_cursor, table_prefix, dataset, existing_datasets - ): - logger.error(f"Dataset `{dataset}` doesn't exist.") - if not set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - QueryJobStatus.FAILED, - QueryJobStatus.PENDING, - start_time=datetime.datetime.now(), - duration=0, - ): - logger.error(f"Failed to set job {job_id} as failed.") - continue + if dataset is not None and dataset not in existing_datasets: + # NOTE: This assumes we never delete a dataset. + existing_datasets.update(fetch_existing_datasets(db_cursor, table_prefix)) + if dataset not in existing_datasets: + logger.error(f"Dataset `{dataset}` doesn't exist.") + if not set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.FAILED, + QueryJobStatus.PENDING, + start_time=datetime.datetime.now(), + duration=0, + ): + logger.error(f"Failed to set job {job_id} as failed.") + continue if QueryJobType.SEARCH_OR_AGGREGATION == job_type: # Avoid double-dispatch when a job is WAITING_FOR_REDUCER