Skip to content

Commit d2b4dd4

Browse files
authored
feat(clp-package): Add utility function to validate dataset's existence in CLI scripts. (#1036)
1 parent b1121c0 commit d2b4dd4

File tree

2 files changed

+38
-30
lines changed

2 files changed

+38
-30
lines changed

components/clp-package-utils/clp_package_utils/general.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
import subprocess
88
import typing
99
import uuid
10+
from contextlib import closing
1011
from enum import auto
1112
from typing import List, Optional, Tuple
1213

1314
import yaml
1415
from clp_py_utils.clp_config import (
1516
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
1617
CLPConfig,
18+
Database,
1719
DB_COMPONENT_NAME,
1820
QUEUE_COMPONENT_NAME,
1921
REDIS_COMPONENT_NAME,
@@ -23,12 +25,14 @@
2325
WEBUI_COMPONENT_NAME,
2426
WorkerConfig,
2527
)
28+
from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets
2629
from clp_py_utils.core import (
2730
get_config_value,
2831
make_config_path_absolute,
2932
read_yaml_config_file,
3033
validate_path_could_be_dir,
3134
)
35+
from clp_py_utils.sql_adapter import SQL_Adapter
3236
from strenum import KebabCaseStrEnum
3337

3438
# CONSTANTS
@@ -556,3 +560,21 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None:
556560
f"Invalid path: `{path}` cannot be under '{prefix}' which may overlap with a path"
557561
f" in the container."
558562
)
563+
564+
565+
def validate_dataset(db_config: Database, dataset: str) -> None:
566+
"""
567+
Validates that `dataset` exists in the metadata database.
568+
569+
:param db_config:
570+
:param dataset:
571+
:raise: ValueError if the dataset doesn't exist.
572+
"""
573+
sql_adapter = SQL_Adapter(db_config)
574+
clp_db_connection_params = db_config.get_clp_connection_params_and_type(True)
575+
table_prefix = clp_db_connection_params["table_prefix"]
576+
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
577+
db_conn.cursor(dictionary=True)
578+
) as db_cursor:
579+
if dataset not in fetch_existing_datasets(db_cursor, table_prefix):
580+
raise ValueError(f"Dataset `{dataset}` doesn't exist.")

components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -619,21 +619,6 @@ def dispatch_job_and_update_db(
619619
)
620620

621621

622-
def _validate_dataset(
623-
db_cursor,
624-
table_prefix: str,
625-
dataset: str,
626-
existing_datasets: Set[str],
627-
) -> bool:
628-
if dataset in existing_datasets:
629-
return True
630-
631-
# NOTE: This assumes we never delete a dataset.
632-
new_datasets = fetch_existing_datasets(db_cursor, table_prefix)
633-
existing_datasets.update(new_datasets)
634-
return dataset in existing_datasets
635-
636-
637622
def handle_pending_query_jobs(
638623
db_conn_pool,
639624
clp_metadata_db_conn_params: Dict[str, any],
@@ -662,21 +647,22 @@ def handle_pending_query_jobs(
662647

663648
table_prefix = clp_metadata_db_conn_params["table_prefix"]
664649
dataset = QueryJobConfig.parse_obj(job_config).dataset
665-
if dataset is not None and not _validate_dataset(
666-
db_cursor, table_prefix, dataset, existing_datasets
667-
):
668-
logger.error(f"Dataset `{dataset}` doesn't exist.")
669-
if not set_job_or_task_status(
670-
db_conn,
671-
QUERY_JOBS_TABLE_NAME,
672-
job_id,
673-
QueryJobStatus.FAILED,
674-
QueryJobStatus.PENDING,
675-
start_time=datetime.datetime.now(),
676-
duration=0,
677-
):
678-
logger.error(f"Failed to set job {job_id} as failed.")
679-
continue
650+
if dataset is not None and dataset not in existing_datasets:
651+
# NOTE: This assumes we never delete a dataset.
652+
existing_datasets.update(fetch_existing_datasets(db_cursor, table_prefix))
653+
if dataset not in existing_datasets:
654+
logger.error(f"Dataset `{dataset}` doesn't exist.")
655+
if not set_job_or_task_status(
656+
db_conn,
657+
QUERY_JOBS_TABLE_NAME,
658+
job_id,
659+
QueryJobStatus.FAILED,
660+
QueryJobStatus.PENDING,
661+
start_time=datetime.datetime.now(),
662+
duration=0,
663+
):
664+
logger.error(f"Failed to set job {job_id} as failed.")
665+
continue
680666

681667
if QueryJobType.SEARCH_OR_AGGREGATION == job_type:
682668
# Avoid double-dispatch when a job is WAITING_FOR_REDUCER

0 commit comments

Comments
 (0)