Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,34 @@
import subprocess
import typing
import uuid
from contextlib import closing
from enum import auto
from typing import List, Optional, Tuple

import yaml
from clp_py_utils.clp_config import (
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
Database,
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageEngine,
StorageType,
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
from clp_py_utils.clp_metadata_db_utils import validate_and_cache_dataset
from clp_py_utils.core import (
get_config_value,
make_config_path_absolute,
read_yaml_config_file,
validate_path_could_be_dir,
)
from clp_py_utils.sql_adapter import SQL_Adapter
from strenum import KebabCaseStrEnum

# CONSTANTS
Expand Down Expand Up @@ -556,3 +562,31 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None:
f"Invalid path: `{path}` cannot be under '{prefix}' which may overlap with a path"
f" in the container."
)


def validate_dataset(clp_config: CLPConfig, input_dataset: Optional[str]) -> Optional[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the function name is "validate"_dataset, I don't think it should return a string. some suggestions have been made in PR1050

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. so here's the plan

  1. If we are using CLP_S, we turn None into default
  2. Move dataset validation into native scripts.

"""
Checks if the provided dataset currently exists in the metadata database, or provides a default
value if the CLP_S storage engine is used.
:param clp_config:
:param input_dataset: Dataset from the CLI.
:return: The validated dataset to use.
:raise: ValueError
"""
storage_engine: StorageEngine = clp_config.package.storage_engine
if StorageEngine.CLP_S != storage_engine:
if input_dataset is not None:
raise ValueError("Dataset selection is only enabled for CLP_S storage engine.")
return None

dataset: str = CLP_DEFAULT_DATASET_NAME if input_dataset is None else input_dataset
db_config: Database = clp_config.database
sql_adapter: SQL_Adapter = SQL_Adapter(db_config)
clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True)
table_prefix: str = clp_db_connection_params["table_prefix"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
if not validate_and_cache_dataset(db_cursor, table_prefix, dataset):
raise ValueError(f"Dataset `{dataset}` does not exist.")
return dataset
26 changes: 26 additions & 0 deletions components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,32 @@ def fetch_existing_datasets(
return {row["name"] for row in rows}


def validate_and_cache_dataset(
db_cursor,
table_prefix: str,
dataset: str,
existing_datasets: Set[str] | None = None,
) -> bool:
"""
Checks if the provided dataset currently exists in the metadata database and cache it locally.
If the dataset already exists in the local cache, the database query is skipped and the cache is
not updated.
:param db_cursor:
:param table_prefix:
:param dataset:
:param existing_datasets: Returns a refreshed dataset cache if a database lookup is required.
:return: Whether the dataset exists.
"""
if existing_datasets is not None and dataset in existing_datasets:
return True

# NOTE: This assumes we never delete a dataset.
new_datasets = fetch_existing_datasets(db_cursor, table_prefix)
if existing_datasets is not None:
existing_datasets.update(new_datasets)
return dataset in existing_datasets


def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None = None) -> None:
"""
Creates the standard set of tables for CLP's metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
get_archives_table_name,
get_files_table_name,
get_tags_table_name,
validate_and_cache_dataset,
)
from clp_py_utils.core import read_yaml_config_file
from clp_py_utils.decorators import exception_default_value
Expand Down Expand Up @@ -619,21 +620,6 @@ def dispatch_job_and_update_db(
)


def _validate_dataset(
db_cursor,
table_prefix: str,
dataset: str,
existing_datasets: Set[str],
) -> bool:
if dataset in existing_datasets:
return True

# NOTE: This assumes we never delete a dataset.
new_datasets = fetch_existing_datasets(db_cursor, table_prefix)
existing_datasets.update(new_datasets)
return dataset in existing_datasets


def handle_pending_query_jobs(
db_conn_pool,
clp_metadata_db_conn_params: Dict[str, any],
Expand Down Expand Up @@ -662,7 +648,7 @@ def handle_pending_query_jobs(

table_prefix = clp_metadata_db_conn_params["table_prefix"]
dataset = QueryJobConfig.parse_obj(job_config).dataset
if dataset is not None and not _validate_dataset(
if dataset is not None and not validate_and_cache_dataset(
db_cursor, table_prefix, dataset, existing_datasets
):
logger.error(f"Dataset `{dataset}` doesn't exist.")
Expand Down