Skip to content

Commit 70adb90

Browse files
haiqi96Bill-hbrhbrkirkrodriguesLinZhihao-723
authored
feat(package): Add garbage collectors to delete archives and search results based on a configurable retention policy. (#1035)
Co-authored-by: Bingran Hu <[email protected]> Co-authored-by: Kirk Rodrigues <[email protected]> Co-authored-by: Lin Zhihao <[email protected]>
1 parent c778aef commit 70adb90

File tree

16 files changed

+868
-68
lines changed

16 files changed

+868
-68
lines changed

components/clp-package-utils/clp_package_utils/general.py

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,13 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
9696
self.aws_config_dir: typing.Optional[DockerMount] = None
9797

9898

99+
def _validate_data_directory(data_dir: pathlib.Path, component_name: str) -> None:
100+
try:
101+
validate_path_could_be_dir(data_dir)
102+
except ValueError as ex:
103+
raise ValueError(f"{component_name} data directory is invalid: {ex}")
104+
105+
99106
def get_clp_home():
100107
# Determine CLP_HOME from an environment variable or this script's path
101108
clp_home = None
@@ -175,6 +182,13 @@ def is_container_exited(container_name):
175182
return False
176183

177184

185+
def validate_log_directory(logs_dir: pathlib.Path, component_name: str) -> None:
186+
try:
187+
validate_path_could_be_dir(logs_dir)
188+
except ValueError as ex:
189+
raise ValueError(f"{component_name} logs directory is invalid: {ex}")
190+
191+
178192
def validate_port(port_name: str, hostname: str, port: int):
179193
try:
180194
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -431,40 +445,23 @@ def validate_and_load_redis_credentials_file(
431445

432446

433447
def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path):
434-
try:
435-
validate_path_could_be_dir(data_dir)
436-
except ValueError as ex:
437-
raise ValueError(f"{DB_COMPONENT_NAME} data directory is invalid: {ex}")
438-
439-
try:
440-
validate_path_could_be_dir(logs_dir)
441-
except ValueError as ex:
442-
raise ValueError(f"{DB_COMPONENT_NAME} logs directory is invalid: {ex}")
448+
_validate_data_directory(data_dir, DB_COMPONENT_NAME)
449+
validate_log_directory(logs_dir, DB_COMPONENT_NAME)
443450

444451
validate_port(f"{DB_COMPONENT_NAME}.port", clp_config.database.host, clp_config.database.port)
445452

446453

447454
def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path):
448-
try:
449-
validate_path_could_be_dir(logs_dir)
450-
except ValueError as ex:
451-
raise ValueError(f"{QUEUE_COMPONENT_NAME} logs directory is invalid: {ex}")
455+
validate_log_directory(logs_dir, QUEUE_COMPONENT_NAME)
452456

453457
validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port)
454458

455459

456460
def validate_redis_config(
457461
clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path, base_config: pathlib.Path
458462
):
459-
try:
460-
validate_path_could_be_dir(data_dir)
461-
except ValueError as ex:
462-
raise ValueError(f"{REDIS_COMPONENT_NAME} data directory is invalid {ex}")
463-
464-
try:
465-
validate_path_could_be_dir(logs_dir)
466-
except ValueError as ex:
467-
raise ValueError(f"{REDIS_COMPONENT_NAME} logs directory is invalid: {ex}")
463+
_validate_data_directory(data_dir, REDIS_COMPONENT_NAME)
464+
validate_log_directory(logs_dir, REDIS_COMPONENT_NAME)
468465

469466
if not base_config.exists():
470467
raise ValueError(
@@ -475,10 +472,7 @@ def validate_redis_config(
475472

476473

477474
def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_workers: int):
478-
try:
479-
validate_path_could_be_dir(logs_dir)
480-
except ValueError as ex:
481-
raise ValueError(f"{REDUCER_COMPONENT_NAME} logs directory is invalid: {ex}")
475+
validate_log_directory(logs_dir, REDUCER_COMPONENT_NAME)
482476

483477
for i in range(0, num_workers):
484478
validate_port(
@@ -491,15 +485,8 @@ def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_w
491485
def validate_results_cache_config(
492486
clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path
493487
):
494-
try:
495-
validate_path_could_be_dir(data_dir)
496-
except ValueError as ex:
497-
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} data directory is invalid: {ex}")
498-
499-
try:
500-
validate_path_could_be_dir(logs_dir)
501-
except ValueError as ex:
502-
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} logs directory is invalid: {ex}")
488+
_validate_data_directory(data_dir, RESULTS_CACHE_COMPONENT_NAME)
489+
validate_log_directory(logs_dir, RESULTS_CACHE_COMPONENT_NAME)
503490

504491
validate_port(
505492
f"{RESULTS_CACHE_COMPONENT_NAME}.port",
@@ -508,8 +495,11 @@ def validate_results_cache_config(
508495
)
509496

510497

511-
def validate_worker_config(clp_config: CLPConfig):
498+
def validate_logs_input_config(clp_config: CLPConfig) -> None:
512499
clp_config.validate_logs_input_config()
500+
501+
502+
def validate_output_storage_config(clp_config: CLPConfig) -> None:
513503
clp_config.validate_archive_output_config()
514504
clp_config.validate_stream_output_config()
515505

@@ -590,3 +580,13 @@ def validate_dataset_name(clp_table_prefix: str, dataset_name: str) -> None:
590580
f"Invalid dataset name: `{dataset_name}`. Names can only be a maximum of"
591581
f" {dataset_name_max_len} characters long."
592582
)
583+
584+
585+
def is_retention_period_configured(clp_config: CLPConfig) -> bool:
586+
if clp_config.archive_output.retention_period is not None:
587+
return True
588+
589+
if clp_config.results_cache.retention_period is not None:
590+
return True
591+
592+
return False

components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@
99

1010
from clp_py_utils.clp_config import Database
1111
from clp_py_utils.clp_metadata_db_utils import (
12-
get_archive_tags_table_name,
12+
delete_archives_from_metadata_db,
1313
get_archives_table_name,
14-
get_files_table_name,
1514
)
1615
from clp_py_utils.sql_adapter import SQL_Adapter
1716

@@ -325,13 +324,13 @@ def _delete_archives(
325324

326325
archive_ids: typing.List[str]
327326
logger.info("Starting to delete archives from the database.")
328-
try:
329-
sql_adapter: SQL_Adapter = SQL_Adapter(database_config)
330-
clp_db_connection_params: dict[str, any] = (
331-
database_config.get_clp_connection_params_and_type(True)
332-
)
333-
table_prefix = clp_db_connection_params["table_prefix"]
327+
sql_adapter: SQL_Adapter = SQL_Adapter(database_config)
328+
clp_db_connection_params: dict[str, any] = database_config.get_clp_connection_params_and_type(
329+
True
330+
)
331+
table_prefix = clp_db_connection_params["table_prefix"]
334332

333+
try:
335334
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
336335
db_conn.cursor(dictionary=True)
337336
) as db_cursor:
@@ -343,9 +342,8 @@ def _delete_archives(
343342

344343
db_cursor.execute(
345344
f"""
346-
DELETE FROM `{get_archives_table_name(table_prefix, dataset)}`
345+
SELECT id FROM `{get_archives_table_name(table_prefix, dataset)}`
347346
WHERE {query_criteria}
348-
RETURNING id
349347
""",
350348
query_params,
351349
)
@@ -358,21 +356,7 @@ def _delete_archives(
358356
archive_ids: typing.List[str] = [result["id"] for result in results]
359357
delete_handler.validate_results(archive_ids)
360358

361-
ids_list_string: str = ", ".join(["'%s'"] * len(archive_ids))
362-
363-
db_cursor.execute(
364-
f"""
365-
DELETE FROM `{get_files_table_name(table_prefix, dataset)}`
366-
WHERE archive_id in ({ids_list_string})
367-
"""
368-
)
369-
370-
db_cursor.execute(
371-
f"""
372-
DELETE FROM `{get_archive_tags_table_name(table_prefix, dataset)}`
373-
WHERE archive_id in ({ids_list_string})
374-
"""
375-
)
359+
delete_archives_from_metadata_db(db_cursor, archive_ids, table_prefix, dataset)
376360
for archive_id in archive_ids:
377361
logger.info(f"Deleted archive {archive_id} from the database.")
378362

components/clp-package-utils/clp_package_utils/scripts/start_clp.py

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
COMPRESSION_WORKER_COMPONENT_NAME,
2323
CONTROLLER_TARGET_NAME,
2424
DB_COMPONENT_NAME,
25+
GARBAGE_COLLECTOR_NAME,
2526
get_components_for_target,
2627
QUERY_JOBS_TABLE_NAME,
2728
QUERY_SCHEDULER_COMPONENT_NAME,
@@ -55,17 +56,20 @@
5556
get_clp_home,
5657
is_container_exited,
5758
is_container_running,
59+
is_retention_period_configured,
5860
load_config_file,
5961
validate_and_load_db_credentials_file,
6062
validate_and_load_queue_credentials_file,
6163
validate_and_load_redis_credentials_file,
6264
validate_db_config,
65+
validate_log_directory,
66+
validate_logs_input_config,
67+
validate_output_storage_config,
6368
validate_queue_config,
6469
validate_redis_config,
6570
validate_reducer_config,
6671
validate_results_cache_config,
6772
validate_webui_config,
68-
validate_worker_config,
6973
)
7074

7175
logger = logging.getLogger(__file__)
@@ -1057,6 +1061,88 @@ def start_reducer(
10571061
logger.info(f"Started {component_name}.")
10581062

10591063

1064+
def start_garbage_collector(
1065+
instance_id: str,
1066+
clp_config: CLPConfig,
1067+
container_clp_config: CLPConfig,
1068+
mounts: CLPDockerMounts,
1069+
):
1070+
component_name = GARBAGE_COLLECTOR_NAME
1071+
1072+
if not is_retention_period_configured(clp_config):
1073+
logger.info(f"Retention period is not configured, skipping {component_name} creation...")
1074+
return
1075+
1076+
logger.info(f"Starting {component_name}...")
1077+
1078+
container_name = f"clp-{component_name}-{instance_id}"
1079+
if container_exists(container_name):
1080+
return
1081+
1082+
container_config_filename = f"{container_name}.yml"
1083+
container_config_file_path = clp_config.logs_directory / container_config_filename
1084+
with open(container_config_file_path, "w") as f:
1085+
yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f)
1086+
1087+
logs_dir = clp_config.logs_directory / component_name
1088+
validate_log_directory(logs_dir, component_name)
1089+
# Create logs directory if necessary
1090+
logs_dir.mkdir(parents=True, exist_ok=True)
1091+
container_logs_dir = container_clp_config.logs_directory / component_name
1092+
1093+
clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
1094+
1095+
# fmt: off
1096+
container_start_cmd = [
1097+
"docker", "run",
1098+
"-di",
1099+
"--network", "host",
1100+
"-w", str(CONTAINER_CLP_HOME),
1101+
"--name", container_name,
1102+
"--log-driver", "local",
1103+
"-u", f"{os.getuid()}:{os.getgid()}",
1104+
]
1105+
# fmt: on
1106+
1107+
necessary_env_vars = [
1108+
f"PYTHONPATH={clp_site_packages_dir}",
1109+
f"CLP_HOME={CONTAINER_CLP_HOME}",
1110+
f"CLP_LOGS_DIR={container_logs_dir}",
1111+
f"CLP_LOGGING_LEVEL={clp_config.garbage_collector.logging_level}",
1112+
]
1113+
necessary_mounts = [
1114+
mounts.clp_home,
1115+
mounts.logs_dir,
1116+
]
1117+
1118+
# Add necessary mounts for archives and streams.
1119+
if StorageType.FS == clp_config.archive_output.storage.type:
1120+
necessary_mounts.append(mounts.archives_output_dir)
1121+
if StorageType.FS == clp_config.stream_output.storage.type:
1122+
necessary_mounts.append(mounts.stream_output_dir)
1123+
1124+
aws_mount, aws_env_vars = generate_container_auth_options(clp_config, component_name)
1125+
if aws_mount:
1126+
necessary_mounts.append(mounts.aws_config_dir)
1127+
if aws_env_vars:
1128+
necessary_env_vars.extend(aws_env_vars)
1129+
1130+
append_docker_options(container_start_cmd, necessary_mounts, necessary_env_vars)
1131+
container_start_cmd.append(clp_config.execution_container)
1132+
1133+
# fmt: off
1134+
garbage_collector_cmd = [
1135+
"python3", "-u",
1136+
"-m", "job_orchestration.garbage_collector.garbage_collector",
1137+
"--config", str(container_clp_config.logs_directory / container_config_filename),
1138+
]
1139+
# fmt: on
1140+
cmd = container_start_cmd + garbage_collector_cmd
1141+
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)
1142+
1143+
logger.info(f"Started {component_name}.")
1144+
1145+
10601146
def add_num_workers_argument(parser):
10611147
parser.add_argument(
10621148
"--num-workers",
@@ -1093,6 +1179,7 @@ def main(argv):
10931179
reducer_server_parser = component_args_parser.add_parser(REDUCER_COMPONENT_NAME)
10941180
add_num_workers_argument(reducer_server_parser)
10951181
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
1182+
component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME)
10961183

10971184
parsed_args = args_parser.parse_args(argv[1:])
10981185

@@ -1126,6 +1213,7 @@ def main(argv):
11261213
ALL_TARGET_NAME,
11271214
CONTROLLER_TARGET_NAME,
11281215
DB_COMPONENT_NAME,
1216+
GARBAGE_COLLECTOR_NAME,
11291217
COMPRESSION_SCHEDULER_COMPONENT_NAME,
11301218
QUERY_SCHEDULER_COMPONENT_NAME,
11311219
WEBUI_COMPONENT_NAME,
@@ -1151,12 +1239,18 @@ def main(argv):
11511239
QUERY_WORKER_COMPONENT_NAME,
11521240
):
11531241
validate_and_load_redis_credentials_file(clp_config, clp_home, True)
1242+
if target in (
1243+
ALL_TARGET_NAME,
1244+
COMPRESSION_WORKER_COMPONENT_NAME,
1245+
):
1246+
validate_logs_input_config(clp_config)
11541247
if target in (
11551248
ALL_TARGET_NAME,
11561249
COMPRESSION_WORKER_COMPONENT_NAME,
11571250
QUERY_WORKER_COMPONENT_NAME,
1251+
GARBAGE_COLLECTOR_NAME,
11581252
):
1159-
validate_worker_config(clp_config)
1253+
validate_output_storage_config(clp_config)
11601254

11611255
clp_config.validate_data_dir()
11621256
clp_config.validate_logs_dir()
@@ -1236,6 +1330,8 @@ def main(argv):
12361330

12371331
if WEBUI_COMPONENT_NAME in components_to_start:
12381332
start_webui(instance_id, clp_config, container_clp_config, mounts)
1333+
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME):
1334+
start_garbage_collector(instance_id, clp_config, container_clp_config, mounts)
12391335

12401336
except Exception as ex:
12411337
if type(ex) == ValueError:

components/clp-package-utils/clp_package_utils/scripts/stop_clp.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
COMPRESSION_WORKER_COMPONENT_NAME,
1212
CONTROLLER_TARGET_NAME,
1313
DB_COMPONENT_NAME,
14+
GARBAGE_COLLECTOR_NAME,
1415
QUERY_SCHEDULER_COMPONENT_NAME,
1516
QUERY_WORKER_COMPONENT_NAME,
1617
QUEUE_COMPONENT_NAME,
@@ -84,6 +85,7 @@ def main(argv):
8485
component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME)
8586
component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME)
8687
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
88+
component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME)
8789

8890
parsed_args = args_parser.parse_args(argv[1:])
8991

@@ -130,6 +132,9 @@ def main(argv):
130132

131133
already_exited_containers = []
132134
force = parsed_args.force
135+
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME):
136+
container_name = f"clp-{GARBAGE_COLLECTOR_NAME}-{instance_id}"
137+
stop_running_container(container_name, already_exited_containers, force)
133138
if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME):
134139
container_name = f"clp-{WEBUI_COMPONENT_NAME}-{instance_id}"
135140
stop_running_container(container_name, already_exited_containers, force)

0 commit comments

Comments
 (0)