Skip to content

Commit 027c0b8

Browse files
Bill-hbrhbrMarcohaiqi96kirkrodrigueshoophalab
authored
feat(clp-package): Expose dataset selection in package CLI scripts and web UI. (#1050)
Co-authored-by: Marco <[email protected]> Co-authored-by: Haiqi Xu <[email protected]> Co-authored-by: Kirk Rodrigues <[email protected]> Co-authored-by: hoophalab <[email protected]>
1 parent f1d379d commit 027c0b8

File tree

43 files changed

+667
-126
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+667
-126
lines changed

components/clp-package-utils/clp_package_utils/general.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import errno
33
import os
44
import pathlib
5+
import re
56
import secrets
67
import socket
78
import subprocess
@@ -25,7 +26,11 @@
2526
WEBUI_COMPONENT_NAME,
2627
WorkerConfig,
2728
)
28-
from clp_py_utils.clp_metadata_db_utils import fetch_existing_datasets
29+
from clp_py_utils.clp_metadata_db_utils import (
30+
fetch_existing_datasets,
31+
MYSQL_TABLE_NAME_MAX_LEN,
32+
TABLE_SUFFIX_MAX_LEN,
33+
)
2934
from clp_py_utils.core import (
3035
get_config_value,
3136
make_config_path_absolute,
@@ -562,7 +567,7 @@ def validate_path_for_container_mount(path: pathlib.Path) -> None:
562567
)
563568

564569

565-
def validate_dataset(db_config: Database, dataset: str) -> None:
570+
def validate_dataset_exists(db_config: Database, dataset: str) -> None:
566571
"""
567572
Validates that `dataset` exists in the metadata database.
568573
@@ -578,3 +583,32 @@ def validate_dataset(db_config: Database, dataset: str) -> None:
578583
) as db_cursor:
579584
if dataset not in fetch_existing_datasets(db_cursor, table_prefix):
580585
raise ValueError(f"Dataset `{dataset}` doesn't exist.")
586+
587+
588+
def validate_dataset_name(clp_table_prefix: str, dataset_name: str) -> None:
589+
"""
590+
Validates that the given dataset name abides by the following rules:
591+
- Its length won't cause any metadata table names to exceed MySQL's max table name length.
592+
- It only contains alphanumeric characters and underscores.
593+
594+
:param clp_table_prefix:
595+
:param dataset_name:
596+
:raise: ValueError if the dataset name is invalid.
597+
"""
598+
if re.fullmatch(r"\w+", dataset_name) is None:
599+
raise ValueError(
600+
f"Invalid dataset name: `{dataset_name}`. Names can only contain alphanumeric"
601+
f" characters and underscores."
602+
)
603+
604+
dataset_name_max_len = (
605+
MYSQL_TABLE_NAME_MAX_LEN
606+
- len(clp_table_prefix)
607+
- 1 # For the separator between the dataset name and the table suffix
608+
- TABLE_SUFFIX_MAX_LEN
609+
)
610+
if len(dataset_name) > dataset_name_max_len:
611+
raise ValueError(
612+
f"Invalid dataset name: `{dataset_name}`. Names can only be a maximum of"
613+
f" {dataset_name_max_len} characters long."
614+
)

components/clp-package-utils/clp_package_utils/scripts/archive_manager.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
import typing
66
from pathlib import Path
77

8-
from clp_py_utils.clp_config import StorageType
8+
from clp_py_utils.clp_config import (
9+
CLP_DEFAULT_DATASET_NAME,
10+
StorageEngine,
11+
StorageType,
12+
)
913

1014
from clp_package_utils.general import (
1115
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
@@ -18,6 +22,7 @@
1822
get_clp_home,
1923
load_config_file,
2024
validate_and_load_db_credentials_file,
25+
validate_dataset_name,
2126
)
2227

2328
# Command/Argument Constants
@@ -61,6 +66,12 @@ def main(argv: typing.List[str]) -> int:
6166
default=str(default_config_file_path),
6267
help="CLP package configuration file.",
6368
)
69+
args_parser.add_argument(
70+
"--dataset",
71+
type=str,
72+
default=None,
73+
help="The dataset that the archives belong to.",
74+
)
6475

6576
# Top-level commands
6677
subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = args_parser.add_subparsers(
@@ -163,6 +174,20 @@ def main(argv: typing.List[str]) -> int:
163174
logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
164175
return -1
165176

177+
storage_engine: StorageEngine = clp_config.package.storage_engine
178+
dataset = parsed_args.dataset
179+
if StorageEngine.CLP_S == storage_engine:
180+
dataset = CLP_DEFAULT_DATASET_NAME if dataset is None else dataset
181+
try:
182+
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
183+
validate_dataset_name(clp_db_connection_params["table_prefix"], dataset)
184+
except Exception as e:
185+
logger.error(e)
186+
return -1
187+
elif dataset is not None:
188+
logger.error(f"Dataset selection is not supported for storage engine: {storage_engine}.")
189+
return -1
190+
166191
# Validate input depending on subcommands
167192
if (DEL_COMMAND == subcommand and DEL_BY_FILTER_SUBCOMMAND == parsed_args.del_subcommand) or (
168193
FIND_COMMAND == subcommand
@@ -196,9 +221,12 @@ def main(argv: typing.List[str]) -> int:
196221
"python3",
197222
"-m", "clp_package_utils.scripts.native.archive_manager",
198223
"--config", str(generated_config_path_on_container),
199-
str(subcommand),
200224
]
201225
# fmt : on
226+
if dataset is not None:
227+
archive_manager_cmd.append("--dataset")
228+
archive_manager_cmd.append(dataset)
229+
archive_manager_cmd.append(subcommand)
202230

203231
# Add subcommand-specific arguments
204232
if DEL_COMMAND == subcommand:

components/clp-package-utils/clp_package_utils/scripts/compress.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44
import subprocess
55
import sys
66
import uuid
7-
from typing import List
7+
from typing import List, Optional
88

9-
from clp_py_utils.clp_config import StorageEngine
9+
from clp_py_utils.clp_config import (
10+
CLP_DEFAULT_DATASET_NAME,
11+
StorageEngine,
12+
)
1013
from job_orchestration.scheduler.job_config import InputType
1114

1215
from clp_package_utils.general import (
@@ -20,6 +23,7 @@
2023
JobType,
2124
load_config_file,
2225
validate_and_load_db_credentials_file,
26+
validate_dataset_name,
2327
)
2428

2529
logger = logging.getLogger(__file__)
@@ -63,6 +67,7 @@ def _generate_logs_list(
6367

6468
def _generate_compress_cmd(
6569
parsed_args: argparse.Namespace,
70+
dataset: Optional[str],
6671
config_path: pathlib.Path,
6772
logs_list_path: pathlib.Path,
6873
) -> List[str]:
@@ -74,6 +79,9 @@ def _generate_compress_cmd(
7479
"--config", str(config_path),
7580
]
7681
# fmt: on
82+
if dataset is not None:
83+
compress_cmd.append("--dataset")
84+
compress_cmd.append(dataset)
7785
if parsed_args.timestamp_key is not None:
7886
compress_cmd.append("--timestamp-key")
7987
compress_cmd.append(parsed_args.timestamp_key)
@@ -131,6 +139,12 @@ def main(argv):
131139
default=str(default_config_file_path),
132140
help="CLP package configuration file.",
133141
)
142+
args_parser.add_argument(
143+
"--dataset",
144+
type=str,
145+
default=None,
146+
help="The dataset that the archives belong to.",
147+
)
134148
args_parser.add_argument(
135149
"--timestamp-key",
136150
help="The path (e.g. x.y) for the field containing the log event's timestamp.",
@@ -162,11 +176,25 @@ def main(argv):
162176
logger.exception("Failed to load config.")
163177
return -1
164178

179+
storage_engine: StorageEngine = clp_config.package.storage_engine
180+
dataset = parsed_args.dataset
181+
if StorageEngine.CLP_S == storage_engine:
182+
dataset = CLP_DEFAULT_DATASET_NAME if dataset is None else dataset
183+
try:
184+
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
185+
validate_dataset_name(clp_db_connection_params["table_prefix"], dataset)
186+
except Exception as e:
187+
logger.error(e)
188+
return -1
189+
elif dataset is not None:
190+
logger.error(f"Dataset selection is not supported for storage engine: {storage_engine}.")
191+
return -1
192+
165193
input_type = clp_config.logs_input.type
166194
if InputType.FS == input_type:
167195
_validate_fs_input_args(parsed_args, args_parser)
168196
elif InputType.S3 == input_type:
169-
_validate_s3_input_args(parsed_args, args_parser, clp_config.package.storage_engine)
197+
_validate_s3_input_args(parsed_args, args_parser, storage_engine)
170198
else:
171199
raise ValueError(f"Unsupported input type: {input_type}.")
172200

@@ -198,7 +226,7 @@ def main(argv):
198226
container_name, necessary_mounts, clp_config.execution_container
199227
)
200228
compress_cmd = _generate_compress_cmd(
201-
parsed_args, generated_config_path_on_container, logs_list_path_on_container
229+
parsed_args, dataset, generated_config_path_on_container, logs_list_path_on_container
202230
)
203231
cmd = container_start_cmd + compress_cmd
204232
subprocess.run(cmd, check=True)

components/clp-package-utils/clp_package_utils/scripts/decompress.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55
import sys
66
from typing import Optional
77

8-
from clp_py_utils.clp_config import CLPConfig, StorageEngine, StorageType
8+
from clp_py_utils.clp_config import (
9+
CLP_DEFAULT_DATASET_NAME,
10+
CLPConfig,
11+
StorageEngine,
12+
StorageType,
13+
)
914

1015
from clp_package_utils.general import (
1116
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
@@ -22,6 +27,7 @@
2227
JobType,
2328
load_config_file,
2429
validate_and_load_db_credentials_file,
30+
validate_dataset_name,
2531
validate_path_could_be_dir,
2632
)
2733

@@ -174,6 +180,24 @@ def handle_extract_stream_cmd(
174180
)
175181
return -1
176182

183+
job_command = parsed_args.command
184+
if EXTRACT_JSON_CMD == job_command and StorageEngine.CLP_S != storage_engine:
185+
logger.error(f"JSON extraction is not supported for storage engine `{storage_engine}`.")
186+
return -1
187+
188+
dataset = parsed_args.dataset
189+
if StorageEngine.CLP_S == storage_engine:
190+
dataset = CLP_DEFAULT_DATASET_NAME if dataset is None else dataset
191+
try:
192+
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
193+
validate_dataset_name(clp_db_connection_params["table_prefix"], dataset)
194+
except Exception as e:
195+
logger.error(e)
196+
return -1
197+
elif dataset is not None:
198+
logger.error(f"Dataset selection is not supported for storage engine: {storage_engine}.")
199+
return -1
200+
177201
container_name = generate_container_name(str(JobType.IR_EXTRACTION))
178202
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
179203
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
@@ -185,7 +209,6 @@ def handle_extract_stream_cmd(
185209
)
186210

187211
# fmt: off
188-
job_command = parsed_args.command
189212
extract_cmd = [
190213
"python3",
191214
"-m", "clp_package_utils.scripts.native.decompress",
@@ -207,6 +230,9 @@ def handle_extract_stream_cmd(
207230
extract_cmd.append(str(parsed_args.target_uncompressed_size))
208231
elif EXTRACT_JSON_CMD == job_command:
209232
extract_cmd.append(str(parsed_args.archive_id))
233+
if dataset is not None:
234+
extract_cmd.append("--dataset")
235+
extract_cmd.append(dataset)
210236
if parsed_args.target_chunk_size:
211237
extract_cmd.append("--target-chunk-size")
212238
extract_cmd.append(str(parsed_args.target_chunk_size))
@@ -267,6 +293,12 @@ def main(argv):
267293
# JSON extraction command parser
268294
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
269295
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
296+
json_extraction_parser.add_argument(
297+
"--dataset",
298+
type=str,
299+
default=None,
300+
help="The dataset that the archives belong to.",
301+
)
270302
json_extraction_parser.add_argument(
271303
"--target-chunk-size",
272304
type=int,

components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,7 @@
77
from contextlib import closing
88
from pathlib import Path
99

10-
from clp_py_utils.clp_config import (
11-
CLP_DEFAULT_DATASET_NAME,
12-
Database,
13-
StorageEngine,
14-
)
10+
from clp_py_utils.clp_config import Database
1511
from clp_py_utils.clp_metadata_db_utils import (
1612
get_archive_tags_table_name,
1713
get_archives_table_name,
@@ -24,6 +20,7 @@
2420
CLPConfig,
2521
get_clp_home,
2622
load_config_file,
23+
validate_dataset_exists,
2724
)
2825

2926
# Command/Argument Constants
@@ -99,6 +96,12 @@ def main(argv: typing.List[str]) -> int:
9996
default=str(default_config_file_path),
10097
help="CLP configuration file.",
10198
)
99+
args_parser.add_argument(
100+
"--dataset",
101+
type=str,
102+
default=None,
103+
help="The dataset that the archives belong to.",
104+
)
102105

103106
# Top-level commands
104107
subparsers: argparse._SubParsersAction[argparse.ArgumentParser] = args_parser.add_subparsers(
@@ -186,17 +189,20 @@ def main(argv: typing.List[str]) -> int:
186189
logger.exception("Failed to load config.")
187190
return -1
188191

189-
storage_engine: StorageEngine = clp_config.package.storage_engine
190192
database_config: Database = clp_config.database
193+
dataset = parsed_args.dataset
194+
if dataset is not None:
195+
try:
196+
validate_dataset_exists(database_config, dataset)
197+
except Exception as e:
198+
logger.error(e)
199+
return -1
200+
191201
archives_dir: Path = clp_config.archive_output.get_directory()
192202
if not archives_dir.exists():
193203
logger.error("`archive_output.directory` doesn't exist.")
194204
return -1
195205

196-
dataset: typing.Optional[str] = None
197-
if StorageEngine.CLP_S == storage_engine:
198-
dataset = CLP_DEFAULT_DATASET_NAME
199-
200206
if FIND_COMMAND == parsed_args.subcommand:
201207
return _find_archives(
202208
archives_dir,
@@ -302,7 +308,7 @@ def _find_archives(
302308
def _delete_archives(
303309
archives_dir: Path,
304310
database_config: Database,
305-
dataset: str,
311+
dataset: typing.Optional[str],
306312
delete_handler: DeleteHandler,
307313
dry_run: bool = False,
308314
) -> int:

0 commit comments

Comments
 (0)