Skip to content

Commit 528dfe2

Browse files
committed
Extract dataset checker function into common utils
1 parent 848af5b commit 528dfe2

File tree

2 files changed

+24
-14
lines changed

2 files changed

+24
-14
lines changed

components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,28 @@ def fetch_existing_datasets(
163163
return {row["name"] for row in rows}
164164

165165

166+
def validate_dataset(
167+
db_cursor,
168+
table_prefix: str,
169+
dataset: str,
170+
existing_datasets: Set[str] | None = None,
171+
) -> bool:
172+
"""
173+
Checks if a dataset currently exists in the metadata or in the local dataset cache.
174+
175+
:param db_cursor:
176+
:param table_prefix:
177+
:param dataset: The dataset to validate.
178+
:param existing_datasets: Returns a refreshed cache of dataset names fetched from the metadata
179+
if the current cache doesn not contain the provided dataset and a
180+
lookup is required.
181+
"""
182+
if existing_datasets is not None and dataset in existing_datasets:
183+
return True
184+
existing_datasets = fetch_existing_datasets(db_cursor, table_prefix)
185+
return dataset in existing_datasets
186+
187+
166188
def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None = None) -> None:
167189
"""
168190
Creates the standard set of tables for CLP's metadata.

components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
TAGS_TABLE_SUFFIX,
4242
)
4343
from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level
44-
from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets
44+
from clp_py_utils.clp_metadata_db_utils import validate_dataset
4545
from clp_py_utils.core import read_yaml_config_file
4646
from clp_py_utils.decorators import exception_default_value
4747
from clp_py_utils.sql_adapter import SQL_Adapter
@@ -614,18 +614,6 @@ def dispatch_job_and_update_db(
614614
)
615615

616616

617-
def _validate_dataset(
618-
db_cursor,
619-
table_prefix: str,
620-
dataset: str,
621-
existing_datasets: Set[str],
622-
) -> bool:
623-
if dataset in existing_datasets:
624-
return True
625-
existing_datasets = fetch_existing_datasets(db_cursor, table_prefix)
626-
return dataset in existing_datasets
627-
628-
629617
def handle_pending_query_jobs(
630618
db_conn_pool,
631619
clp_metadata_db_conn_params: Dict[str, any],
@@ -656,7 +644,7 @@ def handle_pending_query_jobs(
656644
table_prefix = clp_metadata_db_conn_params["table_prefix"]
657645
if StorageEngine.CLP_S == clp_storage_engine:
658646
dataset = QueryJobConfig.parse_obj(job_config).dataset
659-
if not _validate_dataset(db_cursor, table_prefix, dataset, existing_datasets):
647+
if not validate_dataset(db_cursor, table_prefix, dataset, existing_datasets):
660648
logger.error(f"Dataset `{dataset}` doesn't exist.")
661649
if not set_job_or_task_status(
662650
db_conn,

0 commit comments

Comments
 (0)