Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@
from clp_py_utils.clp_config import (
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
CLPConfig,
Database,
DB_COMPONENT_NAME,
LOG_VIEWER_WEBUI_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
SQL_Adapter,
StorageType,
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
from clp_py_utils.clp_metadata_db_utils import validate_and_cache_dataset
from clp_py_utils.core import (
get_config_value,
make_config_path_absolute,
Expand Down Expand Up @@ -574,3 +577,23 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None:
f"Invalid path: `{path}` cannot be under '{prefix}' which may overlap with a path"
f" in the container."
)


def validate_dataset(clp_config: CLPConfig, dataset: typing.Optional[str]) -> None:
"""
Checks if the provided dataset currently exists in the metadata database.
:param clp_config:
:param dataset:
"""
if not dataset:
raise ValueError(f"`{dataset}` is not a valid dataset name.")
db_config: Database = clp_config.database
sql_adapter: SQL_Adapter = SQL_Adapter(db_config)
clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True)
table_prefix: str = clp_db_connection_params["table_prefix"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
dataset_valid, _ = validate_and_cache_dataset(db_cursor, table_prefix, dataset)
if not dataset_valid:
raise ValueError(f"Dataset `{dataset}` does not exist.")
26 changes: 25 additions & 1 deletion components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from pathlib import Path
from typing import Set
from typing import Set, Tuple

from clp_py_utils.clp_config import (
ARCHIVE_TAGS_TABLE_SUFFIX,
Expand Down Expand Up @@ -157,6 +157,30 @@ def fetch_existing_datasets(
return {row["name"] for row in rows}


def validate_and_cache_dataset(
db_cursor,
table_prefix: str,
dataset: str | None,
existing_datasets: Set[str] | None = None,
) -> Tuple[bool, Set[str]]:
"""
Checks if the provided dataset currently exists in the metadata database and cache it locally.
If the dataset already exists in the local cache, the database query is skipped and the cache is
not updated.
:param db_cursor:
:param table_prefix:
:param dataset:
:param existing_datasets:
:return: Whether the dataset exists, and a refreshed cache of datasets if a lookup is required.
"""
if not dataset:
return False, existing_datasets
if existing_datasets is not None and dataset in existing_datasets:
return True, existing_datasets
existing_datasets = fetch_existing_datasets(db_cursor, table_prefix)
return dataset in existing_datasets, existing_datasets


def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None = None) -> None:
"""
Creates the standard set of tables for CLP's metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
TAGS_TABLE_SUFFIX,
)
from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level
from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets
from clp_py_utils.clp_metadata_db_utils import validate_and_cache_dataset
from clp_py_utils.core import read_yaml_config_file
from clp_py_utils.decorators import exception_default_value
from clp_py_utils.sql_adapter import SQL_Adapter
Expand Down Expand Up @@ -614,18 +614,6 @@ def dispatch_job_and_update_db(
)


def _validate_dataset(
db_cursor,
table_prefix: str,
dataset: str,
existing_datasets: Set[str],
) -> bool:
if dataset in existing_datasets:
return True
existing_datasets = fetch_existing_datasets(db_cursor, table_prefix)
return dataset in existing_datasets


def handle_pending_query_jobs(
db_conn_pool,
clp_metadata_db_conn_params: Dict[str, any],
Expand Down Expand Up @@ -656,7 +644,10 @@ def handle_pending_query_jobs(
table_prefix = clp_metadata_db_conn_params["table_prefix"]
if StorageEngine.CLP_S == clp_storage_engine:
dataset = QueryJobConfig.parse_obj(job_config).dataset
if not _validate_dataset(db_cursor, table_prefix, dataset, existing_datasets):
dataset_exists, existing_datasets = validate_and_cache_dataset(
db_cursor, table_prefix, dataset, existing_datasets
)
if not dataset_exists:
logger.error(f"Dataset `{dataset}` doesn't exist.")
if not set_job_or_task_status(
db_conn,
Expand Down
Loading