diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index dc75bca377..4fa4816e63 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -16,6 +16,7 @@ CLP_DEFAULT_CREDENTIALS_FILE_PATH, CLPConfig, DB_COMPONENT_NAME, + QueryEngine, QUEUE_COMPONENT_NAME, REDIS_COMPONENT_NAME, REDUCER_COMPONENT_NAME, @@ -582,6 +583,14 @@ def validate_dataset_name(clp_table_prefix: str, dataset_name: str) -> None: ) +def validate_retention_config(clp_config: CLPConfig) -> None: + clp_query_engine = clp_config.package.query_engine + if is_retention_period_configured(clp_config) and clp_query_engine == QueryEngine.PRESTO: + raise ValueError( + f"Retention control is not supported with query_engine `{clp_query_engine}`" + ) + + def is_retention_period_configured(clp_config: CLPConfig) -> bool: if clp_config.archive_output.retention_period is not None: return True diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index de3ee1a41a..faa087a26b 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -22,7 +22,7 @@ COMPRESSION_WORKER_COMPONENT_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME, - GARBAGE_COLLECTOR_NAME, + GARBAGE_COLLECTOR_COMPONENT_NAME, get_components_for_target, QUERY_JOBS_TABLE_NAME, QUERY_SCHEDULER_COMPONENT_NAME, @@ -69,6 +69,7 @@ validate_redis_config, validate_reducer_config, validate_results_cache_config, + validate_retention_config, validate_webui_config, ) @@ -1067,7 +1068,7 @@ def start_garbage_collector( container_clp_config: CLPConfig, mounts: CLPDockerMounts, ): - component_name = GARBAGE_COLLECTOR_NAME + component_name = GARBAGE_COLLECTOR_COMPONENT_NAME if not is_retention_period_configured(clp_config): logger.info(f"Retention period is not configured, skipping {component_name} creation...") @@ -1179,7 +1180,7 @@ def main(argv): reducer_server_parser = component_args_parser.add_parser(REDUCER_COMPONENT_NAME) add_num_workers_argument(reducer_server_parser) component_args_parser.add_parser(WEBUI_COMPONENT_NAME) - component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME) + component_args_parser.add_parser(GARBAGE_COLLECTOR_COMPONENT_NAME) parsed_args = args_parser.parse_args(argv[1:]) @@ -1213,7 +1214,7 @@ def main(argv): ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME, - GARBAGE_COLLECTOR_NAME, + GARBAGE_COLLECTOR_COMPONENT_NAME, COMPRESSION_SCHEDULER_COMPONENT_NAME, QUERY_SCHEDULER_COMPONENT_NAME, WEBUI_COMPONENT_NAME, @@ -1248,9 +1249,15 @@ def main(argv): ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME, QUERY_WORKER_COMPONENT_NAME, - GARBAGE_COLLECTOR_NAME, + GARBAGE_COLLECTOR_COMPONENT_NAME, ): validate_output_storage_config(clp_config) + if target in ( + ALL_TARGET_NAME, + CONTROLLER_TARGET_NAME, + GARBAGE_COLLECTOR_COMPONENT_NAME, + ): + validate_retention_config(clp_config) clp_config.validate_data_dir() clp_config.validate_logs_dir() @@ -1330,7 +1337,8 @@ def main(argv): if WEBUI_COMPONENT_NAME in components_to_start: start_webui(instance_id, clp_config, container_clp_config, mounts) - if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME): + + if GARBAGE_COLLECTOR_COMPONENT_NAME in components_to_start: start_garbage_collector(instance_id, clp_config, container_clp_config, mounts) except Exception as ex: diff --git a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py index f70a45d45f..899b7a96a3 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py @@ -11,7 +11,7 @@ COMPRESSION_WORKER_COMPONENT_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME, - GARBAGE_COLLECTOR_NAME, + GARBAGE_COLLECTOR_COMPONENT_NAME, QUERY_SCHEDULER_COMPONENT_NAME, QUERY_WORKER_COMPONENT_NAME, QUEUE_COMPONENT_NAME, @@ -85,7 +85,7 @@ def main(argv): component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME) component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME) component_args_parser.add_parser(WEBUI_COMPONENT_NAME) - component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME) + component_args_parser.add_parser(GARBAGE_COLLECTOR_COMPONENT_NAME) parsed_args = args_parser.parse_args(argv[1:]) @@ -132,8 +132,8 @@ def main(argv): already_exited_containers = [] force = parsed_args.force - if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME): - container_name = f"clp-{GARBAGE_COLLECTOR_NAME}-{instance_id}" + if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_COMPONENT_NAME): + container_name = f"clp-{GARBAGE_COLLECTOR_COMPONENT_NAME}-{instance_id}" stop_running_container(container_name, already_exited_containers, force) if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME): container_name = f"clp-{WEBUI_COMPONENT_NAME}-{instance_id}" diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index ea42413b27..7e4fcb23dd 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -26,7 +26,7 @@ COMPRESSION_WORKER_COMPONENT_NAME = "compression_worker" QUERY_WORKER_COMPONENT_NAME = "query_worker" WEBUI_COMPONENT_NAME = "webui" -GARBAGE_COLLECTOR_NAME = "garbage_collector" +GARBAGE_COLLECTOR_COMPONENT_NAME = "garbage_collector" # Component groups GENERAL_SCHEDULING_COMPONENTS = { @@ -48,7 +48,10 @@ RESULTS_CACHE_COMPONENT_NAME, WEBUI_COMPONENT_NAME, } -ALL_COMPONENTS = COMPRESSION_COMPONENTS | QUERY_COMPONENTS | UI_COMPONENTS +STORAGE_MANAGEMENT_COMPONENTS = {GARBAGE_COLLECTOR_COMPONENT_NAME} +ALL_COMPONENTS = ( + COMPRESSION_COMPONENTS | QUERY_COMPONENTS | UI_COMPONENTS | STORAGE_MANAGEMENT_COMPONENTS +) # Target names ALL_TARGET_NAME = "" @@ -60,7 +63,8 @@ | { COMPRESSION_SCHEDULER_COMPONENT_NAME, QUERY_SCHEDULER_COMPONENT_NAME, - }, + } + | STORAGE_MANAGEMENT_COMPONENTS, } QUERY_JOBS_TABLE_NAME = "query_jobs" diff --git a/components/clp-py-utils/clp_py_utils/s3_utils.py b/components/clp-py-utils/clp_py_utils/s3_utils.py index c0cf29745b..9e376a4e68 100644 --- a/components/clp-py-utils/clp_py_utils/s3_utils.py +++ b/components/clp-py-utils/clp_py_utils/s3_utils.py @@ -14,7 +14,7 @@ COMPRESSION_SCHEDULER_COMPONENT_NAME, COMPRESSION_WORKER_COMPONENT_NAME, FsStorage, - GARBAGE_COLLECTOR_NAME, + GARBAGE_COLLECTOR_COMPONENT_NAME, QUERY_SCHEDULER_COMPONENT_NAME, QUERY_WORKER_COMPONENT_NAME, S3Config, @@ -119,7 +119,7 @@ def generate_container_auth_options( elif component_type in (WEBUI_COMPONENT_NAME,): output_storages_by_component_type = [clp_config.stream_output.storage] elif component_type in ( - GARBAGE_COLLECTOR_NAME, + GARBAGE_COLLECTOR_COMPONENT_NAME, QUERY_SCHEDULER_COMPONENT_NAME, QUERY_WORKER_COMPONENT_NAME, ): diff --git a/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py b/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py index 3c16e32b9d..108e7c244e 100644 --- a/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py +++ b/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py @@ -9,7 +9,7 @@ from clp_py_utils.clp_config import ( CLPConfig, - GARBAGE_COLLECTOR_NAME, + GARBAGE_COLLECTOR_COMPONENT_NAME, ) from clp_py_utils.clp_logging import get_logger from clp_py_utils.core import read_yaml_config_file @@ -24,18 +24,20 @@ from job_orchestration.garbage_collector.utils import configure_logger from pydantic import ValidationError -logger = get_logger(GARBAGE_COLLECTOR_NAME) +logger = get_logger(GARBAGE_COLLECTOR_COMPONENT_NAME) async def main(argv: List[str]) -> int: - args_parser = argparse.ArgumentParser(description=f"Spin up the {GARBAGE_COLLECTOR_NAME}.") + args_parser = argparse.ArgumentParser( + description=f"Spin up the {GARBAGE_COLLECTOR_COMPONENT_NAME}." + ) args_parser.add_argument("--config", "-c", required=True, help="CLP configuration file.") parsed_args = args_parser.parse_args(argv[1:]) # Setup logging to file logs_directory = Path(os.getenv("CLP_LOGS_DIR")) logging_level = os.getenv("CLP_LOGGING_LEVEL") - configure_logger(logger, logging_level, logs_directory, GARBAGE_COLLECTOR_NAME) + configure_logger(logger, logging_level, logs_directory, GARBAGE_COLLECTOR_COMPONENT_NAME) # Load configuration config_path = Path(parsed_args.config)