Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
869b02d
Migrate new webui part of the dataset exposing
Bill-hbrhbr Jun 29, 2025
6f4ba3a
Migrate changes from PR #1036 and job_config changes from PR #1023
Bill-hbrhbr Jun 29, 2025
74c2cd2
Migrate CLI part of dataset exposing and add robust validations
Bill-hbrhbr Jun 29, 2025
dfb2a93
Fix bug where we require compress to have an existing input dataset
Bill-hbrhbr Jun 29, 2025
05e14b4
Address CodeRabbit review on CLI
Bill-hbrhbr Jun 29, 2025
7f4a220
review
Jun 30, 2025
6500555
Merge branch 'main' into datasset-interface-v2
Bill-hbrhbr Jul 3, 2025
cd9b7e0
Migrate changes from 1036
Bill-hbrhbr Jul 3, 2025
7c9a792
Move dataset validation into native scripts
Bill-hbrhbr Jul 3, 2025
94cdc38
Fixed unused dataset reference in the webui
Bill-hbrhbr Jul 3, 2025
de6f50c
Bug fix
Bill-hbrhbr Jul 3, 2025
42fb6f4
bug fix
Bill-hbrhbr Jul 3, 2025
b15b697
latest
Jul 3, 2025
926c76d
Fix validation order in decompression
Bill-hbrhbr Jul 3, 2025
7b51f37
Fix not nullable dataset var
Bill-hbrhbr Jul 3, 2025
6f67991
Update components/webui/server/src/fastify-v2/routes/api/search/index.ts
Bill-hbrhbr Jul 3, 2025
b057bbe
eslint fix
Bill-hbrhbr Jul 3, 2025
5493843
eslint fix
Bill-hbrhbr Jul 3, 2025
34a8234
latest
Jul 4, 2025
e3e2930
Merge branch 'main' into datasset-interface-v2
haiqi96 Jul 4, 2025
80512cd
lint fix
Bill-hbrhbr Jul 4, 2025
8f7136e
latest
Jul 5, 2025
17cd392
Fix: Pass correct argument to validate_dataset.
kirkrodrigues Jul 5, 2025
4608789
Minor refactoring.
kirkrodrigues Jul 5, 2025
bfb2539
Remove obsolete storage engine from webui server settings.
kirkrodrigues Jul 5, 2025
99e21da
Rename validate_dataset to validate_dataset_exists.
kirkrodrigues Jul 5, 2025
f22d3bb
Wrap validate_dataset_exists with a try catch for prettier errors.
kirkrodrigues Jul 5, 2025
df60151
Validate dataset name.
kirkrodrigues Jul 5, 2025
9622389
Merge branch 'main' into datasset-interface-v2
kirkrodrigues Jul 6, 2025
0c210d9
Don't hardcode dataset name length validation.
kirkrodrigues Jul 6, 2025
280d2a9
Apply suggestions from code review
kirkrodrigues Jul 6, 2025
250dd94
Disable dataset control when a query is in-progress.
kirkrodrigues Jul 6, 2025
aa55301
webui: Don't hardcode datasets table name.
kirkrodrigues Jul 6, 2025
5b48c8d
Apply linter.
kirkrodrigues Jul 6, 2025
46b0db2
Fix underscore count in validate_dataset_name.
kirkrodrigues Jul 7, 2025
af95be8
Take datasets table name from settings.
kirkrodrigues Jul 7, 2025
dde5d54
Translate an empty dataset name to null on the server.
kirkrodrigues Jul 7, 2025
3f1c3af
Fix lint violations.
kirkrodrigues Jul 7, 2025
06695f3
fix: Correct the schema for null dataset.
hoophalab Jul 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,34 @@
import subprocess
import typing
import uuid
from contextlib import closing
from enum import auto
from typing import List, Optional, Tuple

import yaml
from clp_py_utils.clp_config import (
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
Database,
DB_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageEngine,
StorageType,
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
from clp_py_utils.clp_metadata_db_utils import validate_and_cache_dataset
from clp_py_utils.core import (
get_config_value,
make_config_path_absolute,
read_yaml_config_file,
validate_path_could_be_dir,
)
from clp_py_utils.sql_adapter import SQL_Adapter
from strenum import KebabCaseStrEnum

# CONSTANTS
Expand Down Expand Up @@ -556,3 +562,31 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None:
f"Invalid path: `{path}` cannot be under '{prefix}' which may overlap with a path"
f" in the container."
)


def validate_dataset(clp_config: CLPConfig, input_dataset: Optional[str]) -> Optional[str]:
"""
Checks if the provided dataset currently exists in the metadata database.
:param clp_config:
:param input_dataset: Dataset from the CLI.
:return: The validated dataset to use.
:raise: ValueError
"""
storage_engine: StorageEngine = clp_config.package.storage_engine
if StorageEngine.CLP_S != storage_engine:
if input_dataset is not None:
raise ValueError("Dataset selection is only enabled for CLP_S storage engine.")
return None

dataset: str = CLP_DEFAULT_DATASET_NAME if input_dataset is None else input_dataset
db_config: Database = clp_config.database
sql_adapter: SQL_Adapter = SQL_Adapter(db_config)
clp_db_connection_params: dict[str, any] = db_config.get_clp_connection_params_and_type(True)
table_prefix: str = clp_db_connection_params["table_prefix"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
dataset_exists, _ = validate_and_cache_dataset(db_cursor, table_prefix, dataset)
if not dataset_exists:
raise ValueError(f"Dataset `{dataset}` does not exist.")
return dataset
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
get_clp_home,
load_config_file,
validate_and_load_db_credentials_file,
validate_dataset,
)

# Command/Argument Constants
Expand Down Expand Up @@ -61,6 +62,12 @@ def main(argv: typing.List[str]) -> int:
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--dataset",
type=str,
default=None,
help="The dataset that the archives belong to.",
)

# Top-level commands
subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = args_parser.add_subparsers(
Expand Down Expand Up @@ -158,6 +165,8 @@ def main(argv: typing.List[str]) -> int:
logger.exception("Failed to load config.")
return -1

dataset = validate_dataset(clp_config, parsed_args.dataset)

storage_type: StorageType = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
Expand Down Expand Up @@ -196,6 +205,7 @@ def main(argv: typing.List[str]) -> int:
"python3",
"-m", "clp_package_utils.scripts.native.archive_manager",
"--config", str(generated_config_path_on_container),
"--dataset", str(dataset),
str(subcommand),
]
# fmt : on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import subprocess
import sys
import uuid
from typing import List
from typing import List, Optional

from clp_py_utils.clp_config import StorageEngine
from job_orchestration.scheduler.job_config import InputType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
CLP_DEFAULT_DATASET_NAME,
CONTAINER_INPUT_LOGS_ROOT_DIR,
dump_container_config,
generate_container_config,
Expand Down Expand Up @@ -63,6 +64,7 @@ def _generate_logs_list(

def _generate_compress_cmd(
parsed_args: argparse.Namespace,
dataset: Optional[str],
config_path: pathlib.Path,
logs_list_path: pathlib.Path,
) -> List[str]:
Expand All @@ -74,6 +76,9 @@ def _generate_compress_cmd(
"--config", str(config_path),
]
# fmt: on
if dataset is not None:
compress_cmd.append("--dataset")
compress_cmd.append(dataset)
if parsed_args.timestamp_key is not None:
compress_cmd.append("--timestamp-key")
compress_cmd.append(parsed_args.timestamp_key)
Expand Down Expand Up @@ -131,6 +136,12 @@ def main(argv):
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--dataset",
type=str,
default=None,
help="The dataset that the archives belong to.",
)
args_parser.add_argument(
"--timestamp-key",
help="The path (e.g. x.y) for the field containing the log event's timestamp.",
Expand Down Expand Up @@ -163,10 +174,16 @@ def main(argv):
return -1

input_type = clp_config.logs_input.type
storage_engine: StorageEngine = clp_config.package.storage_engine

dataset: Optional[str] = parsed_args.dataset
if StorageEngine.CLP_S == storage_engine and dataset is None:
dataset = CLP_DEFAULT_DATASET_NAME

if InputType.FS == input_type:
_validate_fs_input_args(parsed_args, args_parser)
elif InputType.S3 == input_type:
_validate_s3_input_args(parsed_args, args_parser, clp_config.package.storage_engine)
_validate_s3_input_args(parsed_args, args_parser, storage_engine)
else:
raise ValueError(f"Unsupported input type: {input_type}.")

Expand Down Expand Up @@ -198,7 +215,7 @@ def main(argv):
container_name, necessary_mounts, clp_config.execution_container
)
compress_cmd = _generate_compress_cmd(
parsed_args, generated_config_path_on_container, logs_list_path_on_container
parsed_args, dataset, generated_config_path_on_container, logs_list_path_on_container
)
cmd = container_start_cmd + compress_cmd
subprocess.run(cmd, check=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import sys
from typing import Optional

from clp_py_utils.clp_config import CLPConfig, StorageEngine, StorageType
from clp_py_utils.clp_config import (
CLP_DEFAULT_DATASET_NAME,
CLPConfig,
StorageEngine,
StorageType,
)

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand All @@ -22,6 +27,7 @@
JobType,
load_config_file,
validate_and_load_db_credentials_file,
validate_dataset,
validate_path_could_be_dir,
)

Expand Down Expand Up @@ -167,13 +173,19 @@ def handle_extract_stream_cmd(

storage_type = clp_config.archive_output.storage.type
storage_engine = clp_config.package.storage_engine
job_command = parsed_args.command

if StorageType.S3 == storage_type and StorageEngine.CLP == storage_engine:
logger.error(
f"Stream extraction is not supported for archive storage type `{storage_type}` with"
f" storage engine `{storage_engine}`."
)
return -1

if EXTRACT_JSON_CMD == job_command and StorageEngine.CLP_S != storage_engine:
logger.error(f"Json extraction is only supported with storage engine `{storage_engine}`.")
return -1

container_name = generate_container_name(str(JobType.IR_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand All @@ -185,7 +197,6 @@ def handle_extract_stream_cmd(
)

# fmt: off
job_command = parsed_args.command
extract_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.decompress",
Expand All @@ -207,6 +218,9 @@ def handle_extract_stream_cmd(
extract_cmd.append(str(parsed_args.target_uncompressed_size))
elif EXTRACT_JSON_CMD == job_command:
extract_cmd.append(str(parsed_args.archive_id))
dataset = validate_dataset(clp_config, parsed_args.dataset)
extract_cmd.append("--dataset")
extract_cmd.append(dataset)
if parsed_args.target_chunk_size:
extract_cmd.append("--target-chunk-size")
extract_cmd.append(str(parsed_args.target_chunk_size))
Expand Down Expand Up @@ -267,6 +281,12 @@ def main(argv):
# JSON extraction command parser
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--dataset",
type=str,
default=CLP_DEFAULT_DATASET_NAME,
help="The dataset that the archives belong to.",
)
json_extraction_parser.add_argument(
"--target-chunk-size",
type=int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from clp_py_utils.clp_config import (
ARCHIVE_TAGS_TABLE_SUFFIX,
ARCHIVES_TABLE_SUFFIX,
CLP_DEFAULT_DATASET_NAME,
Database,
FILES_TABLE_SUFFIX,
StorageEngine,
Expand Down Expand Up @@ -97,6 +96,12 @@ def main(argv: typing.List[str]) -> int:
default=str(default_config_file_path),
help="CLP configuration file.",
)
args_parser.add_argument(
"--dataset",
type=str,
default=None,
help="The dataset that the archives belong to.",
)

# Top-level commands
subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = args_parser.add_subparsers(
Expand Down Expand Up @@ -196,7 +201,7 @@ def main(argv: typing.List[str]) -> int:
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
parsed_args.dataset,
parsed_args.begin_ts,
parsed_args.end_ts,
)
Expand All @@ -208,7 +213,7 @@ def main(argv: typing.List[str]) -> int:
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
parsed_args.dataset,
delete_handler,
parsed_args.dry_run,
)
Expand All @@ -220,7 +225,7 @@ def main(argv: typing.List[str]) -> int:
archives_dir,
database_config,
storage_engine,
CLP_DEFAULT_DATASET_NAME,
parsed_args.dataset,
delete_handler,
parsed_args.dry_run,
)
Expand All @@ -236,7 +241,7 @@ def _find_archives(
archives_dir: Path,
database_config: Database,
storage_engine: StorageEngine,
dataset: str,
dataset: typing.Optional[str],
begin_ts: int,
end_ts: int = typing.Optional[int],
) -> int:
Expand Down Expand Up @@ -285,9 +290,12 @@ def _find_archives(
return 0

logger.info(f"Found {len(archive_ids)} archives within the specified time range.")
archive_output_dir: Path = (
archives_dir / dataset if dataset is not None else archives_dir
)
for archive_id in archive_ids:
logger.info(archive_id)
archive_path: Path = archives_dir / dataset / archive_id
archive_path = archive_output_dir / archive_id
if not archive_path.is_dir():
logger.warning(f"Archive {archive_id} in database not found on disk.")

Expand All @@ -303,7 +311,7 @@ def _delete_archives(
archives_dir: Path,
database_config: Database,
storage_engine: StorageEngine,
dataset: str,
dataset: typing.Optional[str],
delete_handler: DeleteHandler,
dry_run: bool = False,
) -> int:
Expand Down Expand Up @@ -387,8 +395,9 @@ def _delete_archives(

logger.info(f"Finished deleting archives from the database.")

archive_output_dir: Path = archives_dir / dataset if dataset is not None else archives_dir
for archive_id in archive_ids:
archive_path: Path = archives_dir / dataset / archive_id
archive_path = archive_output_dir / archive_id
if not archive_path.is_dir():
logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.")
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def _generate_clp_io_config(
if len(logs_to_compress) == 0:
raise ValueError(f"No input paths given.")
return FsInputConfig(
dataset=parsed_args.dataset,
paths_to_compress=logs_to_compress,
timestamp_key=parsed_args.timestamp_key,
path_prefix_to_remove=str(CONTAINER_INPUT_LOGS_ROOT_DIR),
Expand All @@ -154,6 +155,7 @@ def _generate_clp_io_config(
region_code, bucket_name, key_prefix = parse_s3_url(s3_url)
aws_authentication = clp_config.logs_input.aws_authentication
return S3InputConfig(
dataset=parsed_args.dataset,
region_code=region_code,
bucket=bucket_name,
key_prefix=key_prefix,
Expand Down Expand Up @@ -190,6 +192,12 @@ def main(argv):
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--dataset",
type=str,
default=None,
help="The dataset that the archives belong to.",
)
args_parser.add_argument(
"-f",
"--logs-list",
Expand Down
Loading