Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
import subprocess
import typing
import uuid
from contextlib import closing
from enum import auto
from typing import List, Optional, Tuple

import yaml
from clp_py_utils.clp_config import (
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
Database,
DB_COMPONENT_NAME,
fetch_existing_datasets,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
Expand All @@ -23,12 +27,14 @@
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
from clp_py_utils.clp_metadata_db_utils import validate_and_cache_dataset
from clp_py_utils.core import (
get_config_value,
make_config_path_absolute,
read_yaml_config_file,
validate_path_could_be_dir,
)
from clp_py_utils.sql_adapter import SQL_Adapter
from strenum import KebabCaseStrEnum

# CONSTANTS
Expand Down Expand Up @@ -556,3 +562,21 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None:
f"Invalid path: `{path}` cannot be under '{prefix}' which may overlap with a path"
f" in the container."
)


def validate_dataset(clp_config: CLPConfig, dataset: str) -> None:
"""
Checks if the provided dataset currently exists in the metadata database.
:param clp_config:
:param dataset:
:raise: ValueError
"""
db_config: Database = clp_config.database
sql_adapter: SQL_Adapter = SQL_Adapter(db_config)
clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True)
table_prefix: str = clp_db_connection_params["table_prefix"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
if dataset not in fetch_existing_datasets(db_cursor, table_prefix):
raise ValueError(f"Dataset `{dataset}` does not exist.")
Original file line number Diff line number Diff line change
Expand Up @@ -619,21 +619,6 @@ def dispatch_job_and_update_db(
)


def _validate_dataset(
db_cursor,
table_prefix: str,
dataset: str,
existing_datasets: Set[str],
) -> bool:
if dataset in existing_datasets:
return True

# NOTE: This assumes we never delete a dataset.
new_datasets = fetch_existing_datasets(db_cursor, table_prefix)
existing_datasets.update(new_datasets)
return dataset in existing_datasets


def handle_pending_query_jobs(
db_conn_pool,
clp_metadata_db_conn_params: Dict[str, any],
Expand Down Expand Up @@ -662,21 +647,22 @@ def handle_pending_query_jobs(

table_prefix = clp_metadata_db_conn_params["table_prefix"]
dataset = QueryJobConfig.parse_obj(job_config).dataset
if dataset is not None and not _validate_dataset(
db_cursor, table_prefix, dataset, existing_datasets
):
logger.error(f"Dataset `{dataset}` doesn't exist.")
if not set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
job_id,
QueryJobStatus.FAILED,
QueryJobStatus.PENDING,
start_time=datetime.datetime.now(),
duration=0,
):
logger.error(f"Failed to set job {job_id} as failed.")
continue
if dataset is not None and dataset not in existing_datasets:
# NOTE: This assumes we never delete a dataset.
existing_datasets.update(fetch_existing_datasets(db_cursor, table_prefix))
if dataset not in existing_datasets:
logger.error(f"Dataset `{dataset}` doesn't exist.")
if not set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
job_id,
QueryJobStatus.FAILED,
QueryJobStatus.PENDING,
start_time=datetime.datetime.now(),
duration=0,
):
logger.error(f"Failed to set job {job_id} as failed.")
continue

if QueryJobType.SEARCH_OR_AGGREGATION == job_type:
# Avoid double-dispatch when a job is WAITING_FOR_REDUCER
Expand Down