Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import subprocess
import typing
import uuid
from contextlib import closing
from enum import auto
from typing import List, Optional, Tuple

import yaml
from clp_py_utils.clp_config import (
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
CLPConfig,
Database,
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
Expand All @@ -23,12 +25,14 @@
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets
from clp_py_utils.core import (
get_config_value,
make_config_path_absolute,
read_yaml_config_file,
validate_path_could_be_dir,
)
from clp_py_utils.sql_adapter import SQL_Adapter
from strenum import KebabCaseStrEnum

# CONSTANTS
Expand Down Expand Up @@ -556,3 +560,21 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None:
f"Invalid path: `{path}` cannot be under '{prefix}' which may overlap with a path"
f" in the container."
)


def validate_dataset(db_config: Database, dataset: str) -> None:
"""
Validates that `dataset` exists in the metadata database.

:param db_config:
:param dataset:
:raise: ValueError if the dataset doesn't exist.
"""
sql_adapter = SQL_Adapter(db_config)
clp_db_connection_params = db_config.get_clp_connection_params_and_type(True)
table_prefix = clp_db_connection_params["table_prefix"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
if dataset not in fetch_existing_datasets(db_cursor, table_prefix):
raise ValueError(f"Dataset `{dataset}` doesn't exist.")
Original file line number Diff line number Diff line change
Expand Up @@ -619,21 +619,6 @@ def dispatch_job_and_update_db(
)


def _validate_dataset(
db_cursor,
table_prefix: str,
dataset: str,
existing_datasets: Set[str],
) -> bool:
if dataset in existing_datasets:
return True

# NOTE: This assumes we never delete a dataset.
new_datasets = fetch_existing_datasets(db_cursor, table_prefix)
existing_datasets.update(new_datasets)
return dataset in existing_datasets


def handle_pending_query_jobs(
db_conn_pool,
clp_metadata_db_conn_params: Dict[str, any],
Expand Down Expand Up @@ -662,21 +647,22 @@ def handle_pending_query_jobs(

table_prefix = clp_metadata_db_conn_params["table_prefix"]
dataset = QueryJobConfig.parse_obj(job_config).dataset
if dataset is not None and not _validate_dataset(
db_cursor, table_prefix, dataset, existing_datasets
):
logger.error(f"Dataset `{dataset}` doesn't exist.")
if not set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
job_id,
QueryJobStatus.FAILED,
QueryJobStatus.PENDING,
start_time=datetime.datetime.now(),
duration=0,
):
logger.error(f"Failed to set job {job_id} as failed.")
continue
if dataset is not None and dataset not in existing_datasets:
# NOTE: This assumes we never delete a dataset.
existing_datasets.update(fetch_existing_datasets(db_cursor, table_prefix))
if dataset not in existing_datasets:
logger.error(f"Dataset `{dataset}` doesn't exist.")
if not set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
job_id,
QueryJobStatus.FAILED,
QueryJobStatus.PENDING,
start_time=datetime.datetime.now(),
duration=0,
):
logger.error(f"Failed to set job {job_id} as failed.")
continue

if QueryJobType.SEARCH_OR_AGGREGATION == job_type:
# Avoid double-dispatch when a job is WAITING_FOR_REDUCER
Expand Down