From 776cf6f5e89781a43ba7e973dab41185fed1016d Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 26 Jun 2025 11:32:37 +0000 Subject: [PATCH 01/15] Move dataset validation into a metadata db utils file --- .../clp_py_utils/clp_metadata_db_utils.py | 26 ++++++++++++++++++- .../scheduler/query/query_scheduler.py | 19 ++++---------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py index 4aeed84dc7..07c34f0c51 100644 --- a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py +++ b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py @@ -1,7 +1,7 @@ from __future__ import annotations from pathlib import Path -from typing import Set +from typing import Set, Tuple from clp_py_utils.clp_config import ( ARCHIVE_TAGS_TABLE_SUFFIX, @@ -157,6 +157,30 @@ def fetch_existing_datasets( return {row["name"] for row in rows} +def validate_and_cache_dataset( + db_cursor, + table_prefix: str, + dataset: str | None, + existing_datasets: Set[str] | None = None, +) -> Tuple[bool, Set[str]]: + """ + Checks if the provided dataset currently exists in the metadata database and cache it locally. + If the dataset already exists in the local cache, the database query is skipped and the cache is + not updated. + :param db_cursor: + :param table_prefix: + :param dataset: + :param existing_datasets: + :return: Whether the dataset exists, and a refreshed cache of datasets if a lookup is required. + """ + if not dataset: + return False, existing_datasets + if existing_datasets is not None and dataset in existing_datasets: + return True, existing_datasets + existing_datasets = fetch_existing_datasets(db_cursor, table_prefix) + return dataset in existing_datasets, existing_datasets + + def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None = None) -> None: """ Creates the standard set of tables for CLP's metadata. diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 95beb0d109..9f4a2f4821 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -41,7 +41,7 @@ TAGS_TABLE_SUFFIX, ) from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level -from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets +from clp_py_utils.clp_metadata_db_utils import validate_and_cache_dataset from clp_py_utils.core import read_yaml_config_file from clp_py_utils.decorators import exception_default_value from clp_py_utils.sql_adapter import SQL_Adapter @@ -614,18 +614,6 @@ def dispatch_job_and_update_db( ) -def _validate_dataset( - db_cursor, - table_prefix: str, - dataset: str, - existing_datasets: Set[str], -) -> bool: - if dataset in existing_datasets: - return True - existing_datasets = fetch_existing_datasets(db_cursor, table_prefix) - return dataset in existing_datasets - - def handle_pending_query_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], @@ -656,7 +644,10 @@ def handle_pending_query_jobs( table_prefix = clp_metadata_db_conn_params["table_prefix"] if StorageEngine.CLP_S == clp_storage_engine: dataset = QueryJobConfig.parse_obj(job_config).dataset - if not _validate_dataset(db_cursor, table_prefix, dataset, existing_datasets): + dataset_exists, existing_datasets = validate_and_cache_dataset( + db_cursor, table_prefix, dataset, existing_datasets + ) + if not dataset_exists: logger.error(f"Dataset `{dataset}` doesn't exist.") if not set_job_or_task_status( db_conn, From e589e4fb89eaa317762bfe07ca53b7e5f35365e4 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 26 Jun 2025 12:13:15 +0000 Subject: [PATCH 02/15] add clp package dataset validation --- .../clp_package_utils/general.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index e254d4f844..50fc1a9c6f 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -14,16 +14,19 @@ from clp_py_utils.clp_config import ( CLP_DEFAULT_CREDENTIALS_FILE_PATH, CLPConfig, + Database, DB_COMPONENT_NAME, LOG_VIEWER_WEBUI_COMPONENT_NAME, QUEUE_COMPONENT_NAME, REDIS_COMPONENT_NAME, REDUCER_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, + SQL_Adapter, StorageType, WEBUI_COMPONENT_NAME, WorkerConfig, ) +from clp_py_utils.clp_metadata_db_utils import validate_and_cache_dataset from clp_py_utils.core import ( get_config_value, make_config_path_absolute, @@ -574,3 +577,23 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None: f"Invalid path: `{path}` cannot be under '{prefix}' which may overlap with a path" f" in the container." ) + + +def validate_dataset(clp_config: CLPConfig, dataset: typing.Optional[str]) -> None: + """ + Checks if the provided dataset currently exists in the metadata database. + :param clp_config: + :param dataset: + """ + if not dataset: + raise ValueError(f"`{dataset}` is not a valid dataset name.") + db_config: Database = clp_config.database + sql_adapter: SQL_Adapter = SQL_Adapter(db_config) + clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True) + table_prefix: str = clp_db_connection_params["table_prefix"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + dataset_valid, _ = validate_and_cache_dataset(db_cursor, table_prefix, dataset) + if not dataset_valid: + raise ValueError(f"Dataset `{dataset}` does not exist.") From 5a6bb58a6e572a1abcdf416aa82672327cdd034f Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 26 Jun 2025 15:16:26 +0000 Subject: [PATCH 03/15] fix import error --- components/clp-package-utils/clp_package_utils/general.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 50fc1a9c6f..03ca4dcea3 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -21,7 +21,6 @@ REDIS_COMPONENT_NAME, REDUCER_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, - SQL_Adapter, StorageType, WEBUI_COMPONENT_NAME, WorkerConfig, @@ -33,6 +32,7 @@ read_yaml_config_file, validate_path_could_be_dir, ) +from clp_py_utils.sql_adapter import SQL_Adapter from strenum import KebabCaseStrEnum # CONSTANTS From 357bca947595ae878871a0f51cf4d9923515f780 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Sat, 28 Jun 2025 01:57:30 -0400 Subject: [PATCH 04/15] Remove None handling --- components/clp-package-utils/clp_package_utils/general.py | 8 +++----- .../clp-py-utils/clp_py_utils/clp_metadata_db_utils.py | 4 +--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 03ca4dcea3..500eb6c662 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -579,14 +579,12 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None: ) -def validate_dataset(clp_config: CLPConfig, dataset: typing.Optional[str]) -> None: +def validate_dataset(clp_config: CLPConfig, dataset: str) -> None: """ Checks if the provided dataset currently exists in the metadata database. :param clp_config: :param dataset: """ - if not dataset: - raise ValueError(f"`{dataset}` is not a valid dataset name.") db_config: Database = clp_config.database sql_adapter: SQL_Adapter = SQL_Adapter(db_config) clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True) @@ -594,6 +592,6 @@ def validate_dataset(clp_config: CLPConfig, dataset: typing.Optional[str]) -> No with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - dataset_valid, _ = validate_and_cache_dataset(db_cursor, table_prefix, dataset) - if not dataset_valid: + dataset_exists, _ = validate_and_cache_dataset(db_cursor, table_prefix, dataset) + if not dataset_exists: raise ValueError(f"Dataset `{dataset}` does not exist.") diff --git a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py index 07c34f0c51..cc5a0f1274 100644 --- a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py +++ b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py @@ -160,7 +160,7 @@ def fetch_existing_datasets( def validate_and_cache_dataset( db_cursor, table_prefix: str, - dataset: str | None, + dataset: str, existing_datasets: Set[str] | None = None, ) -> Tuple[bool, Set[str]]: """ @@ -173,8 +173,6 @@ def validate_and_cache_dataset( :param existing_datasets: :return: Whether the dataset exists, and a refreshed cache of datasets if a lookup is required. """ - if not dataset: - return False, existing_datasets if existing_datasets is not None and dataset in existing_datasets: return True, existing_datasets existing_datasets = fetch_existing_datasets(db_cursor, table_prefix) From 0d1c59eca0acd50a55de88d67b05adc191bd07b3 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Sat, 28 Jun 2025 03:18:26 -0400 Subject: [PATCH 05/15] Add missing import --- components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py index cc5a0f1274..4d7e913118 100644 --- a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py +++ b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +from contextlib import closing from pathlib import Path from typing import Set, Tuple From af92a9cde0781da3238b1e3a9c9935b853dab339 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Sun, 29 Jun 2025 05:44:04 -0400 Subject: [PATCH 06/15] Fix import error --- components/clp-package-utils/clp_package_utils/general.py | 1 + components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 91d867315a..142244b886 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -7,6 +7,7 @@ import subprocess import typing import uuid +from contextlib import closing from enum import auto from typing import List, Optional, Tuple diff --git a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py index 4d7e913118..cc5a0f1274 100644 --- a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py +++ b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py @@ -1,6 +1,5 @@ from __future__ import annotations -from contextlib import closing from pathlib import Path from typing import Set, Tuple From e840021dac31f273548a52d17b63adea961e5eb6 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Sun, 29 Jun 2025 18:13:19 -0400 Subject: [PATCH 07/15] Allow validate_dataset to check for error cases when users specify dataset when using CLP storage engine --- .../clp_package_utils/general.py | 16 ++++++++++++++-- .../scheduler/query/query_scheduler.py | 4 ++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 142244b886..79c978aac1 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -14,6 +14,7 @@ import yaml from clp_py_utils.clp_config import ( CLP_DEFAULT_CREDENTIALS_FILE_PATH, + CLP_DEFAULT_DATASET_NAME, CLPConfig, Database, DB_COMPONENT_NAME, @@ -21,6 +22,7 @@ REDIS_COMPONENT_NAME, REDUCER_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, + StorageEngine, StorageType, WEBUI_COMPONENT_NAME, WorkerConfig, @@ -562,12 +564,21 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None: ) -def validate_dataset(clp_config: CLPConfig, dataset: str) -> None: +def validate_dataset(clp_config: CLPConfig, input_dataset: Optional[str]) -> Optional[str]: """ Checks if the provided dataset currently exists in the metadata database. :param clp_config: - :param dataset: + :param input_dataset: Dataset from the CLI. + :return: The validated dataset to use. + :raise: ValueError """ + storage_engine: StorageEngine = clp_config.package.storage_engine + if StorageEngine.CLP_S != storage_engine: + if input_dataset is not None: + raise ValueError("Dataset selection is only enabled for CLP_S storage engine.") + return None + + dataset: str = CLP_DEFAULT_DATASET_NAME if input_dataset is None else input_dataset db_config: Database = clp_config.database sql_adapter: SQL_Adapter = SQL_Adapter(db_config) clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True) @@ -578,3 +589,4 @@ def validate_dataset(clp_config: CLPConfig, dataset: str) -> None: dataset_exists, _ = validate_and_cache_dataset(db_cursor, table_prefix, dataset) if not dataset_exists: raise ValueError(f"Dataset `{dataset}` does not exist.") + return dataset diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 9f4a2f4821..7bbcd10f47 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -642,8 +642,8 @@ def handle_pending_query_jobs( job_config = msgpack.unpackb(job["job_config"]) table_prefix = clp_metadata_db_conn_params["table_prefix"] - if StorageEngine.CLP_S == clp_storage_engine: - dataset = QueryJobConfig.parse_obj(job_config).dataset + dataset = QueryJobConfig.parse_obj(job_config).dataset + if StorageEngine.CLP_S == clp_storage_engine and dataset is not None: dataset_exists, existing_datasets = validate_and_cache_dataset( db_cursor, table_prefix, dataset, existing_datasets ) From 2e2ab541f878a715460c65fff11fb9f9c18d0a71 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 3 Jul 2025 07:20:11 -0400 Subject: [PATCH 08/15] Fix utility functions due to signature change --- .../clp_package_utils/general.py | 6 +++--- .../clp_py_utils/clp_metadata_db_utils.py | 18 +++++++++++------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 79c978aac1..74c8c92a71 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -566,7 +566,8 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None: def validate_dataset(clp_config: CLPConfig, input_dataset: Optional[str]) -> Optional[str]: """ - Checks if the provided dataset currently exists in the metadata database. + Checks if the provided dataset currently exists in the metadata database, or provides a default + value if the CLP_S storage engine is used. :param clp_config: :param input_dataset: Dataset from the CLI. :return: The validated dataset to use. @@ -586,7 +587,6 @@ def validate_dataset(clp_config: CLPConfig, input_dataset: Optional[str]) -> Opt with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - dataset_exists, _ = validate_and_cache_dataset(db_cursor, table_prefix, dataset) - if not dataset_exists: + if not validate_and_cache_dataset(db_cursor, table_prefix, dataset): raise ValueError(f"Dataset `{dataset}` does not exist.") return dataset diff --git a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py index ad0c543a7d..969b4b3e8d 100644 --- a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py +++ b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py @@ -1,7 +1,7 @@ from __future__ import annotations from pathlib import Path -from typing import Set, Tuple +from typing import Set from clp_py_utils.clp_config import ( ArchiveOutput, @@ -190,7 +190,7 @@ def validate_and_cache_dataset( table_prefix: str, dataset: str, existing_datasets: Set[str] | None = None, -) -> Tuple[bool, Set[str]]: +) -> bool: """ Checks if the provided dataset currently exists in the metadata database and cache it locally. If the dataset already exists in the local cache, the database query is skipped and the cache is @@ -198,13 +198,17 @@ def validate_and_cache_dataset( :param db_cursor: :param table_prefix: :param dataset: - :param existing_datasets: - :return: Whether the dataset exists, and a refreshed cache of datasets if a lookup is required. + :param existing_datasets: Returns a refreshed dataset cache if a database lookup is required. + :return: Whether the dataset exists. """ if existing_datasets is not None and dataset in existing_datasets: - return True, existing_datasets - existing_datasets = fetch_existing_datasets(db_cursor, table_prefix) - return dataset in existing_datasets, existing_datasets + return True + + # NOTE: This assumes we never delete a dataset. + new_datasets = fetch_existing_datasets(db_cursor, table_prefix) + if existing_datasets is not None: + existing_datasets.update(new_datasets) + return dataset in existing_datasets def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None = None) -> None: From 8d50cc010bd3851ae6921a98694fef99ef12c809 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 3 Jul 2025 07:32:03 -0400 Subject: [PATCH 09/15] Fix coderabbit AI comment --- components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py index 969b4b3e8d..7e3e9a6ecf 100644 --- a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py +++ b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py @@ -208,7 +208,7 @@ def validate_and_cache_dataset( new_datasets = fetch_existing_datasets(db_cursor, table_prefix) if existing_datasets is not None: existing_datasets.update(new_datasets) - return dataset in existing_datasets + return dataset in new_datasets def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None = None) -> None: From e9fb423774b168078b7ffa5f480cf58b9f690fa1 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 3 Jul 2025 09:32:39 -0400 Subject: [PATCH 10/15] remove unused import --- .../job_orchestration/scheduler/query/query_scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index f03683f84f..c6daa9a56a 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -37,7 +37,6 @@ ) from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level from clp_py_utils.clp_metadata_db_utils import ( - fetch_existing_datasets, get_archive_tags_table_name, get_archives_table_name, get_files_table_name, From c794714bcbed9e83b3072f04d359f125a752bc59 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 3 Jul 2025 12:42:58 -0400 Subject: [PATCH 11/15] Address review comments --- .../clp_package_utils/general.py | 20 +++-------- .../scheduler/query/query_scheduler.py | 33 ++++++++++--------- 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 74c8c92a71..3c275d8f90 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -18,11 +18,11 @@ CLPConfig, Database, DB_COMPONENT_NAME, + fetch_existing_datasets, QUEUE_COMPONENT_NAME, REDIS_COMPONENT_NAME, REDUCER_COMPONENT_NAME, RESULTS_CACHE_COMPONENT_NAME, - StorageEngine, StorageType, WEBUI_COMPONENT_NAME, WorkerConfig, @@ -564,22 +564,13 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None: ) -def validate_dataset(clp_config: CLPConfig, input_dataset: Optional[str]) -> Optional[str]: +def validate_dataset(clp_config: CLPConfig, dataset: str) -> None: """ - Checks if the provided dataset currently exists in the metadata database, or provides a default - value if the CLP_S storage engine is used. + Checks if the provided dataset currently exists in the metadata database. :param clp_config: - :param input_dataset: Dataset from the CLI. - :return: The validated dataset to use. + :param dataset: :raise: ValueError """ - storage_engine: StorageEngine = clp_config.package.storage_engine - if StorageEngine.CLP_S != storage_engine: - if input_dataset is not None: - raise ValueError("Dataset selection is only enabled for CLP_S storage engine.") - return None - - dataset: str = CLP_DEFAULT_DATASET_NAME if input_dataset is None else input_dataset db_config: Database = clp_config.database sql_adapter: SQL_Adapter = SQL_Adapter(db_config) clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True) @@ -587,6 +578,5 @@ def validate_dataset(clp_config: CLPConfig, input_dataset: Optional[str]) -> Opt with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: - if not validate_and_cache_dataset(db_cursor, table_prefix, dataset): + if dataset not in fetch_existing_datasets(db_cursor, table_prefix): raise ValueError(f"Dataset `{dataset}` does not exist.") - return dataset diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index c6daa9a56a..785c7858d8 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -37,11 +37,11 @@ ) from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level from clp_py_utils.clp_metadata_db_utils import ( + fetch_existing_datasets, get_archive_tags_table_name, get_archives_table_name, get_files_table_name, get_tags_table_name, - validate_and_cache_dataset, ) from clp_py_utils.core import read_yaml_config_file from clp_py_utils.decorators import exception_default_value @@ -647,21 +647,22 @@ def handle_pending_query_jobs( table_prefix = clp_metadata_db_conn_params["table_prefix"] dataset = QueryJobConfig.parse_obj(job_config).dataset - if dataset is not None and not validate_and_cache_dataset( - db_cursor, table_prefix, dataset, existing_datasets - ): - logger.error(f"Dataset `{dataset}` doesn't exist.") - if not set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - QueryJobStatus.FAILED, - QueryJobStatus.PENDING, - start_time=datetime.datetime.now(), - duration=0, - ): - logger.error(f"Failed to set job {job_id} as failed.") - continue + if dataset is not None and dataset not in existing_datasets: + # NOTE: This assumes we never delete a dataset. + existing_datasets.update(fetch_existing_datasets(db_cursor, table_prefix)) + if dataset not in existing_datasets: + logger.error(f"Dataset `{dataset}` doesn't exist.") + if not set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.FAILED, + QueryJobStatus.PENDING, + start_time=datetime.datetime.now(), + duration=0, + ): + logger.error(f"Failed to set job {job_id} as failed.") + continue if QueryJobType.SEARCH_OR_AGGREGATION == job_type: # Avoid double-dispatch when a job is WAITING_FOR_REDUCER From 8b1f1eee7c58635b6c9b95e63737dbb6d77f897f Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 3 Jul 2025 12:43:54 -0400 Subject: [PATCH 12/15] Removed unused funciton --- .../clp_py_utils/clp_metadata_db_utils.py | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py index 7e3e9a6ecf..a5e43f9a52 100644 --- a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py +++ b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py @@ -185,32 +185,6 @@ def fetch_existing_datasets( return {row["name"] for row in rows} -def validate_and_cache_dataset( - db_cursor, - table_prefix: str, - dataset: str, - existing_datasets: Set[str] | None = None, -) -> bool: - """ - Checks if the provided dataset currently exists in the metadata database and cache it locally. - If the dataset already exists in the local cache, the database query is skipped and the cache is - not updated. - :param db_cursor: - :param table_prefix: - :param dataset: - :param existing_datasets: Returns a refreshed dataset cache if a database lookup is required. - :return: Whether the dataset exists. - """ - if existing_datasets is not None and dataset in existing_datasets: - return True - - # NOTE: This assumes we never delete a dataset. - new_datasets = fetch_existing_datasets(db_cursor, table_prefix) - if existing_datasets is not None: - existing_datasets.update(new_datasets) - return dataset in new_datasets - - def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None = None) -> None: """ Creates the standard set of tables for CLP's metadata. From e30539160f6901b09121b456d240ad3206615a19 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Fri, 4 Jul 2025 02:15:15 +0800 Subject: [PATCH 13/15] Apply suggestions from code review Co-authored-by: haiqi96 <14502009+haiqi96@users.noreply.github.com> --- components/clp-package-utils/clp_package_utils/general.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 3c275d8f90..c661d31f90 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -14,7 +14,6 @@ import yaml from clp_py_utils.clp_config import ( CLP_DEFAULT_CREDENTIALS_FILE_PATH, - CLP_DEFAULT_DATASET_NAME, CLPConfig, Database, DB_COMPONENT_NAME, @@ -27,7 +26,6 @@ WEBUI_COMPONENT_NAME, WorkerConfig, ) -from clp_py_utils.clp_metadata_db_utils import validate_and_cache_dataset from clp_py_utils.core import ( get_config_value, make_config_path_absolute, @@ -564,9 +562,9 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None: ) -def validate_dataset(clp_config: CLPConfig, dataset: str) -> None: +def validate_dataset(db_config: Database, dataset: str) -> None: """ - Checks if the provided dataset currently exists in the metadata database. + Checks if `dataset` currently exists in the metadata database. :param clp_config: :param dataset: :raise: ValueError From 3c8f3cbc23a3f855b4942dee8f8913a7c77ae4c6 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 3 Jul 2025 15:16:31 -0400 Subject: [PATCH 14/15] bug fix --- components/clp-package-utils/clp_package_utils/general.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index c661d31f90..df5c1d71be 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -17,7 +17,6 @@ CLPConfig, Database, DB_COMPONENT_NAME, - fetch_existing_datasets, QUEUE_COMPONENT_NAME, REDIS_COMPONENT_NAME, REDUCER_COMPONENT_NAME, @@ -26,6 +25,7 @@ WEBUI_COMPONENT_NAME, WorkerConfig, ) +from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets from clp_py_utils.core import ( get_config_value, make_config_path_absolute, @@ -565,11 +565,10 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None: def validate_dataset(db_config: Database, dataset: str) -> None: """ Checks if `dataset` currently exists in the metadata database. - :param clp_config: + :param db_config: :param dataset: :raise: ValueError """ - db_config: Database = clp_config.database sql_adapter: SQL_Adapter = SQL_Adapter(db_config) clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True) table_prefix: str = clp_db_connection_params["table_prefix"] From 95e587da8cf2e0ac96e590de526d58ca350e3f53 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Fri, 4 Jul 2025 20:30:46 +0800 Subject: [PATCH 15/15] Apply suggestions from code review Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- .../clp-package-utils/clp_package_utils/general.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index df5c1d71be..c890699578 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -564,16 +564,17 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None: def validate_dataset(db_config: Database, dataset: str) -> None: """ - Checks if `dataset` currently exists in the metadata database. + Validates that `dataset` exists in the metadata database. + :param db_config: :param dataset: - :raise: ValueError + :raise: ValueError if the dataset doesn't exist. """ - sql_adapter: SQL_Adapter = SQL_Adapter(db_config) - clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True) - table_prefix: str = clp_db_connection_params["table_prefix"] + sql_adapter = SQL_Adapter(db_config) + clp_db_connection_params = db_config.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: if dataset not in fetch_existing_datasets(db_cursor, table_prefix): - raise ValueError(f"Dataset `{dataset}` does not exist.") + raise ValueError(f"Dataset `{dataset}` doesn't exist.")