Skip to content

Commit 0892a1c

Browse files
authored
fix(clp-package): Start garbage collector as part of the "all" and "controller" targets; Validate retention periods aren't configured when using the Presto query engine. (#1205)
1 parent f87b505 commit 0892a1c

File tree

6 files changed

+42
-19
lines changed

6 files changed

+42
-19
lines changed

components/clp-package-utils/clp_package_utils/general.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
1717
CLPConfig,
1818
DB_COMPONENT_NAME,
19+
QueryEngine,
1920
QUEUE_COMPONENT_NAME,
2021
REDIS_COMPONENT_NAME,
2122
REDUCER_COMPONENT_NAME,
@@ -582,6 +583,14 @@ def validate_dataset_name(clp_table_prefix: str, dataset_name: str) -> None:
582583
)
583584

584585

586+
def validate_retention_config(clp_config: CLPConfig) -> None:
587+
clp_query_engine = clp_config.package.query_engine
588+
if is_retention_period_configured(clp_config) and clp_query_engine == QueryEngine.PRESTO:
589+
raise ValueError(
590+
f"Retention control is not supported with query_engine `{clp_query_engine}`"
591+
)
592+
593+
585594
def is_retention_period_configured(clp_config: CLPConfig) -> bool:
586595
if clp_config.archive_output.retention_period is not None:
587596
return True

components/clp-package-utils/clp_package_utils/scripts/start_clp.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
COMPRESSION_WORKER_COMPONENT_NAME,
2323
CONTROLLER_TARGET_NAME,
2424
DB_COMPONENT_NAME,
25-
GARBAGE_COLLECTOR_NAME,
25+
GARBAGE_COLLECTOR_COMPONENT_NAME,
2626
get_components_for_target,
2727
QUERY_JOBS_TABLE_NAME,
2828
QUERY_SCHEDULER_COMPONENT_NAME,
@@ -69,6 +69,7 @@
6969
validate_redis_config,
7070
validate_reducer_config,
7171
validate_results_cache_config,
72+
validate_retention_config,
7273
validate_webui_config,
7374
)
7475

@@ -1067,7 +1068,7 @@ def start_garbage_collector(
10671068
container_clp_config: CLPConfig,
10681069
mounts: CLPDockerMounts,
10691070
):
1070-
component_name = GARBAGE_COLLECTOR_NAME
1071+
component_name = GARBAGE_COLLECTOR_COMPONENT_NAME
10711072

10721073
if not is_retention_period_configured(clp_config):
10731074
logger.info(f"Retention period is not configured, skipping {component_name} creation...")
@@ -1179,7 +1180,7 @@ def main(argv):
11791180
reducer_server_parser = component_args_parser.add_parser(REDUCER_COMPONENT_NAME)
11801181
add_num_workers_argument(reducer_server_parser)
11811182
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
1182-
component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME)
1183+
component_args_parser.add_parser(GARBAGE_COLLECTOR_COMPONENT_NAME)
11831184

11841185
parsed_args = args_parser.parse_args(argv[1:])
11851186

@@ -1213,7 +1214,7 @@ def main(argv):
12131214
ALL_TARGET_NAME,
12141215
CONTROLLER_TARGET_NAME,
12151216
DB_COMPONENT_NAME,
1216-
GARBAGE_COLLECTOR_NAME,
1217+
GARBAGE_COLLECTOR_COMPONENT_NAME,
12171218
COMPRESSION_SCHEDULER_COMPONENT_NAME,
12181219
QUERY_SCHEDULER_COMPONENT_NAME,
12191220
WEBUI_COMPONENT_NAME,
@@ -1248,9 +1249,15 @@ def main(argv):
12481249
ALL_TARGET_NAME,
12491250
COMPRESSION_WORKER_COMPONENT_NAME,
12501251
QUERY_WORKER_COMPONENT_NAME,
1251-
GARBAGE_COLLECTOR_NAME,
1252+
GARBAGE_COLLECTOR_COMPONENT_NAME,
12521253
):
12531254
validate_output_storage_config(clp_config)
1255+
if target in (
1256+
ALL_TARGET_NAME,
1257+
CONTROLLER_TARGET_NAME,
1258+
GARBAGE_COLLECTOR_COMPONENT_NAME,
1259+
):
1260+
validate_retention_config(clp_config)
12541261

12551262
clp_config.validate_data_dir()
12561263
clp_config.validate_logs_dir()
@@ -1330,7 +1337,8 @@ def main(argv):
13301337

13311338
if WEBUI_COMPONENT_NAME in components_to_start:
13321339
start_webui(instance_id, clp_config, container_clp_config, mounts)
1333-
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME):
1340+
1341+
if GARBAGE_COLLECTOR_COMPONENT_NAME in components_to_start:
13341342
start_garbage_collector(instance_id, clp_config, container_clp_config, mounts)
13351343

13361344
except Exception as ex:

components/clp-package-utils/clp_package_utils/scripts/stop_clp.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
COMPRESSION_WORKER_COMPONENT_NAME,
1212
CONTROLLER_TARGET_NAME,
1313
DB_COMPONENT_NAME,
14-
GARBAGE_COLLECTOR_NAME,
14+
GARBAGE_COLLECTOR_COMPONENT_NAME,
1515
QUERY_SCHEDULER_COMPONENT_NAME,
1616
QUERY_WORKER_COMPONENT_NAME,
1717
QUEUE_COMPONENT_NAME,
@@ -85,7 +85,7 @@ def main(argv):
8585
component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME)
8686
component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME)
8787
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
88-
component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME)
88+
component_args_parser.add_parser(GARBAGE_COLLECTOR_COMPONENT_NAME)
8989

9090
parsed_args = args_parser.parse_args(argv[1:])
9191

@@ -132,8 +132,8 @@ def main(argv):
132132

133133
already_exited_containers = []
134134
force = parsed_args.force
135-
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME):
136-
container_name = f"clp-{GARBAGE_COLLECTOR_NAME}-{instance_id}"
135+
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_COMPONENT_NAME):
136+
container_name = f"clp-{GARBAGE_COLLECTOR_COMPONENT_NAME}-{instance_id}"
137137
stop_running_container(container_name, already_exited_containers, force)
138138
if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME):
139139
container_name = f"clp-{WEBUI_COMPONENT_NAME}-{instance_id}"

components/clp-py-utils/clp_py_utils/clp_config.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
COMPRESSION_WORKER_COMPONENT_NAME = "compression_worker"
2727
QUERY_WORKER_COMPONENT_NAME = "query_worker"
2828
WEBUI_COMPONENT_NAME = "webui"
29-
GARBAGE_COLLECTOR_NAME = "garbage_collector"
29+
GARBAGE_COLLECTOR_COMPONENT_NAME = "garbage_collector"
3030

3131
# Component groups
3232
GENERAL_SCHEDULING_COMPONENTS = {
@@ -48,7 +48,10 @@
4848
RESULTS_CACHE_COMPONENT_NAME,
4949
WEBUI_COMPONENT_NAME,
5050
}
51-
ALL_COMPONENTS = COMPRESSION_COMPONENTS | QUERY_COMPONENTS | UI_COMPONENTS
51+
STORAGE_MANAGEMENT_COMPONENTS = {GARBAGE_COLLECTOR_COMPONENT_NAME}
52+
ALL_COMPONENTS = (
53+
COMPRESSION_COMPONENTS | QUERY_COMPONENTS | UI_COMPONENTS | STORAGE_MANAGEMENT_COMPONENTS
54+
)
5255

5356
# Target names
5457
ALL_TARGET_NAME = ""
@@ -60,7 +63,8 @@
6063
| {
6164
COMPRESSION_SCHEDULER_COMPONENT_NAME,
6265
QUERY_SCHEDULER_COMPONENT_NAME,
63-
},
66+
}
67+
| STORAGE_MANAGEMENT_COMPONENTS,
6468
}
6569

6670
QUERY_JOBS_TABLE_NAME = "query_jobs"

components/clp-py-utils/clp_py_utils/s3_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
COMPRESSION_SCHEDULER_COMPONENT_NAME,
1515
COMPRESSION_WORKER_COMPONENT_NAME,
1616
FsStorage,
17-
GARBAGE_COLLECTOR_NAME,
17+
GARBAGE_COLLECTOR_COMPONENT_NAME,
1818
QUERY_SCHEDULER_COMPONENT_NAME,
1919
QUERY_WORKER_COMPONENT_NAME,
2020
S3Config,
@@ -119,7 +119,7 @@ def generate_container_auth_options(
119119
elif component_type in (WEBUI_COMPONENT_NAME,):
120120
output_storages_by_component_type = [clp_config.stream_output.storage]
121121
elif component_type in (
122-
GARBAGE_COLLECTOR_NAME,
122+
GARBAGE_COLLECTOR_COMPONENT_NAME,
123123
QUERY_SCHEDULER_COMPONENT_NAME,
124124
QUERY_WORKER_COMPONENT_NAME,
125125
):

components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from clp_py_utils.clp_config import (
1111
CLPConfig,
12-
GARBAGE_COLLECTOR_NAME,
12+
GARBAGE_COLLECTOR_COMPONENT_NAME,
1313
)
1414
from clp_py_utils.clp_logging import get_logger
1515
from clp_py_utils.core import read_yaml_config_file
@@ -24,18 +24,20 @@
2424
from job_orchestration.garbage_collector.utils import configure_logger
2525
from pydantic import ValidationError
2626

27-
logger = get_logger(GARBAGE_COLLECTOR_NAME)
27+
logger = get_logger(GARBAGE_COLLECTOR_COMPONENT_NAME)
2828

2929

3030
async def main(argv: List[str]) -> int:
31-
args_parser = argparse.ArgumentParser(description=f"Spin up the {GARBAGE_COLLECTOR_NAME}.")
31+
args_parser = argparse.ArgumentParser(
32+
description=f"Spin up the {GARBAGE_COLLECTOR_COMPONENT_NAME}."
33+
)
3234
args_parser.add_argument("--config", "-c", required=True, help="CLP configuration file.")
3335
parsed_args = args_parser.parse_args(argv[1:])
3436

3537
# Setup logging to file
3638
logs_directory = Path(os.getenv("CLP_LOGS_DIR"))
3739
logging_level = os.getenv("CLP_LOGGING_LEVEL")
38-
configure_logger(logger, logging_level, logs_directory, GARBAGE_COLLECTOR_NAME)
40+
configure_logger(logger, logging_level, logs_directory, GARBAGE_COLLECTOR_COMPONENT_NAME)
3941

4042
# Load configuration
4143
config_path = Path(parsed_args.config)

0 commit comments

Comments
 (0)