Skip to content

Commit ae41525

Browse files
authored
Merge branch 'main' into modernize-liblzma-install
2 parents b144c1c + 0892a1c commit ae41525

File tree

68 files changed

+2123
-538
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2123
-538
lines changed

components/clp-package-utils/clp_package_utils/general.py

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
1717
CLPConfig,
1818
DB_COMPONENT_NAME,
19+
QueryEngine,
1920
QUEUE_COMPONENT_NAME,
2021
REDIS_COMPONENT_NAME,
2122
REDUCER_COMPONENT_NAME,
@@ -96,6 +97,13 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
9697
self.aws_config_dir: typing.Optional[DockerMount] = None
9798

9899

100+
def _validate_data_directory(data_dir: pathlib.Path, component_name: str) -> None:
101+
try:
102+
validate_path_could_be_dir(data_dir)
103+
except ValueError as ex:
104+
raise ValueError(f"{component_name} data directory is invalid: {ex}")
105+
106+
99107
def get_clp_home():
100108
# Determine CLP_HOME from an environment variable or this script's path
101109
clp_home = None
@@ -175,6 +183,13 @@ def is_container_exited(container_name):
175183
return False
176184

177185

186+
def validate_log_directory(logs_dir: pathlib.Path, component_name: str) -> None:
187+
try:
188+
validate_path_could_be_dir(logs_dir)
189+
except ValueError as ex:
190+
raise ValueError(f"{component_name} logs directory is invalid: {ex}")
191+
192+
178193
def validate_port(port_name: str, hostname: str, port: int):
179194
try:
180195
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -431,40 +446,23 @@ def validate_and_load_redis_credentials_file(
431446

432447

433448
def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path):
434-
try:
435-
validate_path_could_be_dir(data_dir)
436-
except ValueError as ex:
437-
raise ValueError(f"{DB_COMPONENT_NAME} data directory is invalid: {ex}")
438-
439-
try:
440-
validate_path_could_be_dir(logs_dir)
441-
except ValueError as ex:
442-
raise ValueError(f"{DB_COMPONENT_NAME} logs directory is invalid: {ex}")
449+
_validate_data_directory(data_dir, DB_COMPONENT_NAME)
450+
validate_log_directory(logs_dir, DB_COMPONENT_NAME)
443451

444452
validate_port(f"{DB_COMPONENT_NAME}.port", clp_config.database.host, clp_config.database.port)
445453

446454

447455
def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path):
448-
try:
449-
validate_path_could_be_dir(logs_dir)
450-
except ValueError as ex:
451-
raise ValueError(f"{QUEUE_COMPONENT_NAME} logs directory is invalid: {ex}")
456+
validate_log_directory(logs_dir, QUEUE_COMPONENT_NAME)
452457

453458
validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port)
454459

455460

456461
def validate_redis_config(
457462
clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path, base_config: pathlib.Path
458463
):
459-
try:
460-
validate_path_could_be_dir(data_dir)
461-
except ValueError as ex:
462-
raise ValueError(f"{REDIS_COMPONENT_NAME} data directory is invalid {ex}")
463-
464-
try:
465-
validate_path_could_be_dir(logs_dir)
466-
except ValueError as ex:
467-
raise ValueError(f"{REDIS_COMPONENT_NAME} logs directory is invalid: {ex}")
464+
_validate_data_directory(data_dir, REDIS_COMPONENT_NAME)
465+
validate_log_directory(logs_dir, REDIS_COMPONENT_NAME)
468466

469467
if not base_config.exists():
470468
raise ValueError(
@@ -475,10 +473,7 @@ def validate_redis_config(
475473

476474

477475
def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_workers: int):
478-
try:
479-
validate_path_could_be_dir(logs_dir)
480-
except ValueError as ex:
481-
raise ValueError(f"{REDUCER_COMPONENT_NAME} logs directory is invalid: {ex}")
476+
validate_log_directory(logs_dir, REDUCER_COMPONENT_NAME)
482477

483478
for i in range(0, num_workers):
484479
validate_port(
@@ -491,15 +486,8 @@ def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_w
491486
def validate_results_cache_config(
492487
clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path
493488
):
494-
try:
495-
validate_path_could_be_dir(data_dir)
496-
except ValueError as ex:
497-
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} data directory is invalid: {ex}")
498-
499-
try:
500-
validate_path_could_be_dir(logs_dir)
501-
except ValueError as ex:
502-
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} logs directory is invalid: {ex}")
489+
_validate_data_directory(data_dir, RESULTS_CACHE_COMPONENT_NAME)
490+
validate_log_directory(logs_dir, RESULTS_CACHE_COMPONENT_NAME)
503491

504492
validate_port(
505493
f"{RESULTS_CACHE_COMPONENT_NAME}.port",
@@ -508,8 +496,11 @@ def validate_results_cache_config(
508496
)
509497

510498

511-
def validate_worker_config(clp_config: CLPConfig):
499+
def validate_logs_input_config(clp_config: CLPConfig) -> None:
512500
clp_config.validate_logs_input_config()
501+
502+
503+
def validate_output_storage_config(clp_config: CLPConfig) -> None:
513504
clp_config.validate_archive_output_config()
514505
clp_config.validate_stream_output_config()
515506

@@ -590,3 +581,21 @@ def validate_dataset_name(clp_table_prefix: str, dataset_name: str) -> None:
590581
f"Invalid dataset name: `{dataset_name}`. Names can only be a maximum of"
591582
f" {dataset_name_max_len} characters long."
592583
)
584+
585+
586+
def validate_retention_config(clp_config: CLPConfig) -> None:
587+
clp_query_engine = clp_config.package.query_engine
588+
if is_retention_period_configured(clp_config) and clp_query_engine == QueryEngine.PRESTO:
589+
raise ValueError(
590+
f"Retention control is not supported with query_engine `{clp_query_engine}`"
591+
)
592+
593+
594+
def is_retention_period_configured(clp_config: CLPConfig) -> bool:
595+
if clp_config.archive_output.retention_period is not None:
596+
return True
597+
598+
if clp_config.results_cache.retention_period is not None:
599+
return True
600+
601+
return False

components/clp-package-utils/clp_package_utils/scripts/archive_manager.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@
2626
)
2727

2828
# Command/Argument Constants
29-
from clp_package_utils.scripts.native.archive_manager import (
30-
BEGIN_TS_ARG,
31-
DEL_BY_FILTER_SUBCOMMAND,
32-
DEL_BY_IDS_SUBCOMMAND,
33-
DEL_COMMAND,
34-
DRY_RUN_ARG,
35-
END_TS_ARG,
36-
FIND_COMMAND,
37-
)
29+
FIND_COMMAND: typing.Final[str] = "find"
30+
DEL_COMMAND: typing.Final[str] = "del"
31+
DEL_BY_IDS_SUBCOMMAND: typing.Final[str] = "by-ids"
32+
DEL_BY_FILTER_SUBCOMMAND: typing.Final[str] = "by-filter"
33+
BEGIN_TS_ARG: typing.Final[str] = "--begin-ts"
34+
END_TS_ARG: typing.Final[str] = "--end-ts"
35+
DRY_RUN_ARG: typing.Final[str] = "--dry-run"
3836

3937
logger: logging.Logger = logging.getLogger(__file__)
4038

@@ -66,6 +64,12 @@ def main(argv: typing.List[str]) -> int:
6664
default=str(default_config_file_path),
6765
help="CLP package configuration file.",
6866
)
67+
args_parser.add_argument(
68+
"--verbose",
69+
"-v",
70+
action="store_true",
71+
help="Enable debug logging.",
72+
)
6973
args_parser.add_argument(
7074
"--dataset",
7175
type=str,
@@ -150,6 +154,10 @@ def main(argv: typing.List[str]) -> int:
150154
)
151155

152156
parsed_args: argparse.Namespace = args_parser.parse_args(argv[1:])
157+
if parsed_args.verbose:
158+
logger.setLevel(logging.DEBUG)
159+
else:
160+
logger.setLevel(logging.INFO)
153161

154162
begin_timestamp: typing.Optional[int]
155163
end_timestamp: typing.Optional[int]
@@ -171,7 +179,7 @@ def main(argv: typing.List[str]) -> int:
171179

172180
storage_type: StorageType = clp_config.archive_output.storage.type
173181
if StorageType.FS != storage_type:
174-
logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
182+
logger.error(f"Archive manager is not supported for storage type: {storage_type}.")
175183
return -1
176184

177185
storage_engine: StorageEngine = clp_config.package.storage_engine
@@ -226,6 +234,9 @@ def main(argv: typing.List[str]) -> int:
226234
if dataset is not None:
227235
archive_manager_cmd.append("--dataset")
228236
archive_manager_cmd.append(dataset)
237+
if parsed_args.verbose:
238+
archive_manager_cmd.append("--verbose")
239+
229240
archive_manager_cmd.append(subcommand)
230241

231242
# Add subcommand-specific arguments
@@ -251,15 +262,20 @@ def main(argv: typing.List[str]) -> int:
251262
archive_manager_cmd.extend([END_TS_ARG, str(end_timestamp)])
252263
else:
253264
logger.error(f"Unsupported subcommand: `{subcommand}`.")
265+
return -1
254266

255267
cmd: typing.List[str] = container_start_cmd + archive_manager_cmd
256268

257-
subprocess.run(cmd, check=True)
269+
proc = subprocess.run(cmd)
270+
ret_code = proc.returncode
271+
if 0 != ret_code:
272+
logger.error("Archive manager failed.")
273+
logger.debug(f"Docker command failed: {' '.join(cmd)}")
258274

259275
# Remove generated files
260276
generated_config_path_on_host.unlink()
261277

262-
return 0
278+
return ret_code
263279

264280

265281
if "__main__" == __name__:

components/clp-package-utils/clp_package_utils/scripts/decompress.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -181,23 +181,13 @@ def handle_extract_stream_cmd(
181181
return -1
182182

183183
job_command = parsed_args.command
184+
if EXTRACT_IR_CMD == job_command and StorageEngine.CLP != storage_engine:
185+
logger.error(f"IR extraction is not supported for storage engine `{storage_engine}`.")
186+
return -1
184187
if EXTRACT_JSON_CMD == job_command and StorageEngine.CLP_S != storage_engine:
185188
logger.error(f"JSON extraction is not supported for storage engine `{storage_engine}`.")
186189
return -1
187190

188-
dataset = parsed_args.dataset
189-
if StorageEngine.CLP_S == storage_engine:
190-
dataset = CLP_DEFAULT_DATASET_NAME if dataset is None else dataset
191-
try:
192-
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
193-
validate_dataset_name(clp_db_connection_params["table_prefix"], dataset)
194-
except Exception as e:
195-
logger.error(e)
196-
return -1
197-
elif dataset is not None:
198-
logger.error(f"Dataset selection is not supported for storage engine: {storage_engine}.")
199-
return -1
200-
201191
container_name = generate_container_name(str(JobType.IR_EXTRACTION))
202192
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
203193
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
@@ -229,6 +219,15 @@ def handle_extract_stream_cmd(
229219
extract_cmd.append("--target-uncompressed-size")
230220
extract_cmd.append(str(parsed_args.target_uncompressed_size))
231221
elif EXTRACT_JSON_CMD == job_command:
222+
dataset = parsed_args.dataset
223+
dataset = CLP_DEFAULT_DATASET_NAME if dataset is None else dataset
224+
try:
225+
clp_db_connection_params = clp_config.database.get_clp_connection_params_and_type(True)
226+
validate_dataset_name(clp_db_connection_params["table_prefix"], dataset)
227+
except Exception as e:
228+
logger.error(e)
229+
return -1
230+
232231
extract_cmd.append(str(parsed_args.archive_id))
233232
if dataset is not None:
234233
extract_cmd.append("--dataset")

0 commit comments

Comments
 (0)