diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 9f73b223b9..dc75bca377 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -96,6 +96,13 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path): self.aws_config_dir: typing.Optional[DockerMount] = None +def _validate_data_directory(data_dir: pathlib.Path, component_name: str) -> None: + try: + validate_path_could_be_dir(data_dir) + except ValueError as ex: + raise ValueError(f"{component_name} data directory is invalid: {ex}") + + def get_clp_home(): # Determine CLP_HOME from an environment variable or this script's path clp_home = None @@ -175,6 +182,13 @@ def is_container_exited(container_name): return False +def validate_log_directory(logs_dir: pathlib.Path, component_name: str) -> None: + try: + validate_path_could_be_dir(logs_dir) + except ValueError as ex: + raise ValueError(f"{component_name} logs directory is invalid: {ex}") + + def validate_port(port_name: str, hostname: str, port: int): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -431,24 +445,14 @@ def validate_and_load_redis_credentials_file( def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path): - try: - validate_path_could_be_dir(data_dir) - except ValueError as ex: - raise ValueError(f"{DB_COMPONENT_NAME} data directory is invalid: {ex}") - - try: - validate_path_could_be_dir(logs_dir) - except ValueError as ex: - raise ValueError(f"{DB_COMPONENT_NAME} logs directory is invalid: {ex}") + _validate_data_directory(data_dir, DB_COMPONENT_NAME) + validate_log_directory(logs_dir, DB_COMPONENT_NAME) validate_port(f"{DB_COMPONENT_NAME}.port", clp_config.database.host, clp_config.database.port) def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path): - try: - validate_path_could_be_dir(logs_dir) - except ValueError as ex: - raise ValueError(f"{QUEUE_COMPONENT_NAME} logs directory is invalid: {ex}") + validate_log_directory(logs_dir, QUEUE_COMPONENT_NAME) validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port) @@ -456,15 +460,8 @@ def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path): def validate_redis_config( clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path, base_config: pathlib.Path ): - try: - validate_path_could_be_dir(data_dir) - except ValueError as ex: - raise ValueError(f"{REDIS_COMPONENT_NAME} data directory is invalid {ex}") - - try: - validate_path_could_be_dir(logs_dir) - except ValueError as ex: - raise ValueError(f"{REDIS_COMPONENT_NAME} logs directory is invalid: {ex}") + _validate_data_directory(data_dir, REDIS_COMPONENT_NAME) + validate_log_directory(logs_dir, REDIS_COMPONENT_NAME) if not base_config.exists(): raise ValueError( @@ -475,10 +472,7 @@ def validate_redis_config( def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_workers: int): - try: - validate_path_could_be_dir(logs_dir) - except ValueError as ex: - raise ValueError(f"{REDUCER_COMPONENT_NAME} logs directory is invalid: {ex}") + validate_log_directory(logs_dir, REDUCER_COMPONENT_NAME) for i in range(0, num_workers): validate_port( @@ -491,15 +485,8 @@ def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_w def validate_results_cache_config( clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path ): - try: - validate_path_could_be_dir(data_dir) - except ValueError as ex: - raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} data directory is invalid: {ex}") - - try: - validate_path_could_be_dir(logs_dir) - except ValueError as ex: - raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} logs directory is invalid: {ex}") + _validate_data_directory(data_dir, RESULTS_CACHE_COMPONENT_NAME) + validate_log_directory(logs_dir, RESULTS_CACHE_COMPONENT_NAME) validate_port( f"{RESULTS_CACHE_COMPONENT_NAME}.port", @@ -508,8 +495,11 @@ def validate_results_cache_config( ) -def validate_worker_config(clp_config: CLPConfig): +def validate_logs_input_config(clp_config: CLPConfig) -> None: clp_config.validate_logs_input_config() + + +def validate_output_storage_config(clp_config: CLPConfig) -> None: clp_config.validate_archive_output_config() clp_config.validate_stream_output_config() @@ -590,3 +580,13 @@ def validate_dataset_name(clp_table_prefix: str, dataset_name: str) -> None: f"Invalid dataset name: `{dataset_name}`. Names can only be a maximum of" f" {dataset_name_max_len} characters long." ) + + +def is_retention_period_configured(clp_config: CLPConfig) -> bool: + if clp_config.archive_output.retention_period is not None: + return True + + if clp_config.results_cache.retention_period is not None: + return True + + return False diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py index 786c455ac0..149fd59dbe 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py @@ -9,9 +9,8 @@ from clp_py_utils.clp_config import Database from clp_py_utils.clp_metadata_db_utils import ( - get_archive_tags_table_name, + delete_archives_from_metadata_db, get_archives_table_name, - get_files_table_name, ) from clp_py_utils.sql_adapter import SQL_Adapter @@ -325,13 +324,13 @@ def _delete_archives( archive_ids: typing.List[str] logger.info("Starting to delete archives from the database.") - try: - sql_adapter: SQL_Adapter = SQL_Adapter(database_config) - clp_db_connection_params: dict[str, any] = ( - database_config.get_clp_connection_params_and_type(True) - ) - table_prefix = clp_db_connection_params["table_prefix"] + sql_adapter: SQL_Adapter = SQL_Adapter(database_config) + clp_db_connection_params: dict[str, any] = database_config.get_clp_connection_params_and_type( + True + ) + table_prefix = clp_db_connection_params["table_prefix"] + try: with closing(sql_adapter.create_connection(True)) as db_conn, closing( db_conn.cursor(dictionary=True) ) as db_cursor: @@ -343,9 +342,8 @@ def _delete_archives( db_cursor.execute( f""" - DELETE FROM `{get_archives_table_name(table_prefix, dataset)}` + SELECT id FROM `{get_archives_table_name(table_prefix, dataset)}` WHERE {query_criteria} - RETURNING id """, query_params, ) @@ -358,21 +356,7 @@ def _delete_archives( archive_ids: typing.List[str] = [result["id"] for result in results] delete_handler.validate_results(archive_ids) - ids_list_string: str = ", ".join(["'%s'"] * len(archive_ids)) - - db_cursor.execute( - f""" - DELETE FROM `{get_files_table_name(table_prefix, dataset)}` - WHERE archive_id in ({ids_list_string}) - """ - ) - - db_cursor.execute( - f""" - DELETE FROM `{get_archive_tags_table_name(table_prefix, dataset)}` - WHERE archive_id in ({ids_list_string}) - """ - ) + delete_archives_from_metadata_db(db_cursor, archive_ids, table_prefix, dataset) for archive_id in archive_ids: logger.info(f"Deleted archive {archive_id} from the database.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 8af2dec39e..de3ee1a41a 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -22,6 +22,7 @@ COMPRESSION_WORKER_COMPONENT_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME, + GARBAGE_COLLECTOR_NAME, get_components_for_target, QUERY_JOBS_TABLE_NAME, QUERY_SCHEDULER_COMPONENT_NAME, @@ -55,17 +56,20 @@ get_clp_home, is_container_exited, is_container_running, + is_retention_period_configured, load_config_file, validate_and_load_db_credentials_file, validate_and_load_queue_credentials_file, validate_and_load_redis_credentials_file, validate_db_config, + validate_log_directory, + validate_logs_input_config, + validate_output_storage_config, validate_queue_config, validate_redis_config, validate_reducer_config, validate_results_cache_config, validate_webui_config, - validate_worker_config, ) logger = logging.getLogger(__file__) @@ -1057,6 +1061,88 @@ def start_reducer( logger.info(f"Started {component_name}.") +def start_garbage_collector( + instance_id: str, + clp_config: CLPConfig, + container_clp_config: CLPConfig, + mounts: CLPDockerMounts, +): + component_name = GARBAGE_COLLECTOR_NAME + + if not is_retention_period_configured(clp_config): + logger.info(f"Retention period is not configured, skipping {component_name} creation...") + return + + logger.info(f"Starting {component_name}...") + + container_name = f"clp-{component_name}-{instance_id}" + if container_exists(container_name): + return + + container_config_filename = f"{container_name}.yml" + container_config_file_path = clp_config.logs_directory / container_config_filename + with open(container_config_file_path, "w") as f: + yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f) + + logs_dir = clp_config.logs_directory / component_name + validate_log_directory(logs_dir, component_name) + # Create logs directory if necessary + logs_dir.mkdir(parents=True, exist_ok=True) + container_logs_dir = container_clp_config.logs_directory / component_name + + clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" + + # fmt: off + container_start_cmd = [ + "docker", "run", + "-di", + "--network", "host", + "-w", str(CONTAINER_CLP_HOME), + "--name", container_name, + "--log-driver", "local", + "-u", f"{os.getuid()}:{os.getgid()}", + ] + # fmt: on + + necessary_env_vars = [ + f"PYTHONPATH={clp_site_packages_dir}", + f"CLP_HOME={CONTAINER_CLP_HOME}", + f"CLP_LOGS_DIR={container_logs_dir}", + f"CLP_LOGGING_LEVEL={clp_config.garbage_collector.logging_level}", + ] + necessary_mounts = [ + mounts.clp_home, + mounts.logs_dir, + ] + + # Add necessary mounts for archives and streams. + if StorageType.FS == clp_config.archive_output.storage.type: + necessary_mounts.append(mounts.archives_output_dir) + if StorageType.FS == clp_config.stream_output.storage.type: + necessary_mounts.append(mounts.stream_output_dir) + + aws_mount, aws_env_vars = generate_container_auth_options(clp_config, component_name) + if aws_mount: + necessary_mounts.append(mounts.aws_config_dir) + if aws_env_vars: + necessary_env_vars.extend(aws_env_vars) + + append_docker_options(container_start_cmd, necessary_mounts, necessary_env_vars) + container_start_cmd.append(clp_config.execution_container) + + # fmt: off + garbage_collector_cmd = [ + "python3", "-u", + "-m", "job_orchestration.garbage_collector.garbage_collector", + "--config", str(container_clp_config.logs_directory / container_config_filename), + ] + # fmt: on + cmd = container_start_cmd + garbage_collector_cmd + subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True) + + logger.info(f"Started {component_name}.") + + def add_num_workers_argument(parser): parser.add_argument( "--num-workers", @@ -1093,6 +1179,7 @@ def main(argv): reducer_server_parser = component_args_parser.add_parser(REDUCER_COMPONENT_NAME) add_num_workers_argument(reducer_server_parser) component_args_parser.add_parser(WEBUI_COMPONENT_NAME) + component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME) parsed_args = args_parser.parse_args(argv[1:]) @@ -1126,6 +1213,7 @@ def main(argv): ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME, + GARBAGE_COLLECTOR_NAME, COMPRESSION_SCHEDULER_COMPONENT_NAME, QUERY_SCHEDULER_COMPONENT_NAME, WEBUI_COMPONENT_NAME, @@ -1151,12 +1239,18 @@ def main(argv): QUERY_WORKER_COMPONENT_NAME, ): validate_and_load_redis_credentials_file(clp_config, clp_home, True) + if target in ( + ALL_TARGET_NAME, + COMPRESSION_WORKER_COMPONENT_NAME, + ): + validate_logs_input_config(clp_config) if target in ( ALL_TARGET_NAME, COMPRESSION_WORKER_COMPONENT_NAME, QUERY_WORKER_COMPONENT_NAME, + GARBAGE_COLLECTOR_NAME, ): - validate_worker_config(clp_config) + validate_output_storage_config(clp_config) clp_config.validate_data_dir() clp_config.validate_logs_dir() @@ -1236,6 +1330,8 @@ def main(argv): if WEBUI_COMPONENT_NAME in components_to_start: start_webui(instance_id, clp_config, container_clp_config, mounts) + if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME): + start_garbage_collector(instance_id, clp_config, container_clp_config, mounts) except Exception as ex: if type(ex) == ValueError: diff --git a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py index 307eadaa75..f70a45d45f 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py @@ -11,6 +11,7 @@ COMPRESSION_WORKER_COMPONENT_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME, + GARBAGE_COLLECTOR_NAME, QUERY_SCHEDULER_COMPONENT_NAME, QUERY_WORKER_COMPONENT_NAME, QUEUE_COMPONENT_NAME, @@ -84,6 +85,7 @@ def main(argv): component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME) component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME) component_args_parser.add_parser(WEBUI_COMPONENT_NAME) + component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME) parsed_args = args_parser.parse_args(argv[1:]) @@ -130,6 +132,9 @@ def main(argv): already_exited_containers = [] force = parsed_args.force + if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME): + container_name = f"clp-{GARBAGE_COLLECTOR_NAME}-{instance_id}" + stop_running_container(container_name, already_exited_containers, force) if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME): container_name = f"clp-{WEBUI_COMPONENT_NAME}-{instance_id}" stop_running_container(container_name, already_exited_containers, force) diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index bc45176e72..ea42413b27 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -26,6 +26,7 @@ COMPRESSION_WORKER_COMPONENT_NAME = "compression_worker" QUERY_WORKER_COMPONENT_NAME = "query_worker" WEBUI_COMPONENT_NAME = "webui" +GARBAGE_COLLECTOR_NAME = "garbage_collector" # Component groups GENERAL_SCHEDULING_COMPONENTS = { @@ -354,6 +355,7 @@ class ResultsCache(BaseModel): port: int = 27017 db_name: str = "clp-query-results" stream_collection_name: str = "stream-files" + retention_period: Optional[int] = None @validator("host") def validate_host(cls, field): @@ -375,6 +377,12 @@ def validate_stream_collection_name(cls, field): ) return field + @validator("retention_period") + def validate_retention_period(cls, field): + if field is not None and field <= 0: + raise ValueError("retention_period must be greater than 0") + return field + def get_uri(self): return f"mongodb://{self.host}:{self.port}/{self.db_name}" @@ -566,6 +574,7 @@ class ArchiveOutput(BaseModel): target_encoded_file_size: int = 256 * 1024 * 1024 # 256 MB target_segment_size: int = 256 * 1024 * 1024 # 256 MB compression_level: int = 3 + retention_period: Optional[int] = None @validator("target_archive_size") def validate_target_archive_size(cls, field): @@ -597,6 +606,12 @@ def validate_compression_level(cls, field): raise ValueError("compression_level must be a value from 1 to 19") return field + @validator("retention_period") + def validate_retention_period(cls, field): + if field is not None and field <= 0: + raise ValueError("retention_period must be greater than 0") + return field + def set_directory(self, directory: pathlib.Path): _set_directory_for_storage_config(self.storage, directory) @@ -655,6 +670,32 @@ def validate_results_metadata_collection_name(cls, field): return field +class SweepInterval(BaseModel): + archive: int = 60 + search_result: int = 30 + + # Explicitly disallow any unexpected key + class Config: + extra = "forbid" + + @root_validator + def validate_sweep_interval(cls, values): + for field, value in values.items(): + if value <= 0: + raise ValueError(f"Sweep interval of {field} must be greater than 0") + return values + + +class GarbageCollector(BaseModel): + logging_level: str = "INFO" + sweep_interval: SweepInterval = SweepInterval() + + @validator("logging_level") + def validate_logging_level(cls, field): + _validate_logging_level(cls, field) + return field + + class CLPConfig(BaseModel): execution_container: Optional[str] = None @@ -671,6 +712,7 @@ class CLPConfig(BaseModel): compression_worker: CompressionWorker = CompressionWorker() query_worker: QueryWorker = QueryWorker() webui: WebUi = WebUi() + garbage_collector: GarbageCollector = GarbageCollector() credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH archive_output: ArchiveOutput = ArchiveOutput() diff --git a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py index 17855728c9..e6dd3c9d00 100644 --- a/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py +++ b/components/clp-py-utils/clp_py_utils/clp_metadata_db_utils.py @@ -1,7 +1,7 @@ from __future__ import annotations from pathlib import Path -from typing import Set +from typing import List, Set from clp_py_utils.clp_config import ( ArchiveOutput, @@ -219,6 +219,48 @@ def create_metadata_db_tables(db_cursor, table_prefix: str, dataset: str | None _create_files_table(db_cursor, table_prefix, dataset) +def delete_archives_from_metadata_db( + db_cursor, archive_ids: List[str], table_prefix: str, dataset: str | None +) -> None: + """ + Deletes archives from the metadata database specified by a list of IDs. It also deletes + the associated entries from `files` and `archive_tags` tables that reference these archives. + + The order of deletion follows the foreign key constraints, ensuring no violations occur during + the process. + + :param db_cursor: + :param archive_ids: The list of archive to delete. + :param table_prefix: + :param dataset: + """ + ids_list_string = ", ".join(["%s"] * len(archive_ids)) + + db_cursor.execute( + f""" + DELETE FROM `{get_files_table_name(table_prefix, dataset)}` + WHERE archive_id in ({ids_list_string}) + """, + archive_ids, + ) + + db_cursor.execute( + f""" + DELETE FROM `{get_archive_tags_table_name(table_prefix, dataset)}` + WHERE archive_id in ({ids_list_string}) + """, + archive_ids, + ) + + db_cursor.execute( + f""" + DELETE FROM `{get_archives_table_name(table_prefix, dataset)}` + WHERE id in ({ids_list_string}) + """, + archive_ids, + ) + + def get_archive_tags_table_name(table_prefix: str, dataset: str | None) -> str: return _get_table_name(table_prefix, ARCHIVE_TAGS_TABLE_SUFFIX, dataset) diff --git a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py index c291288536..07a32dd9cd 100644 --- a/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py +++ b/components/clp-py-utils/clp_py_utils/initialize-orchestration-db.py @@ -108,6 +108,7 @@ def main(argv): `duration` FLOAT NULL DEFAULT NULL, `job_config` VARBINARY(60000) NOT NULL, PRIMARY KEY (`id`) USING BTREE, + INDEX `CREATION_TIME` (`creation_time`) USING BTREE, INDEX `JOB_STATUS` (`status`) USING BTREE ) ROW_FORMAT=DYNAMIC """ diff --git a/components/clp-py-utils/clp_py_utils/s3_utils.py b/components/clp-py-utils/clp_py_utils/s3_utils.py index 13d7c4e144..c0cf29745b 100644 --- a/components/clp-py-utils/clp_py_utils/s3_utils.py +++ b/components/clp-py-utils/clp_py_utils/s3_utils.py @@ -1,7 +1,7 @@ import os import re from pathlib import Path -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Set, Tuple, Union import boto3 from botocore.config import Config @@ -14,6 +14,7 @@ COMPRESSION_SCHEDULER_COMPONENT_NAME, COMPRESSION_WORKER_COMPONENT_NAME, FsStorage, + GARBAGE_COLLECTOR_NAME, QUERY_SCHEDULER_COMPONENT_NAME, QUERY_WORKER_COMPONENT_NAME, S3Config, @@ -118,6 +119,7 @@ def generate_container_auth_options( elif component_type in (WEBUI_COMPONENT_NAME,): output_storages_by_component_type = [clp_config.stream_output.storage] elif component_type in ( + GARBAGE_COLLECTOR_NAME, QUERY_SCHEDULER_COMPONENT_NAME, QUERY_WORKER_COMPONENT_NAME, ): @@ -308,3 +310,43 @@ def s3_put(s3_config: S3Config, src_file: Path, dest_path: str) -> None: s3_client.put_object( Bucket=s3_config.bucket, Body=file_data, Key=s3_config.key_prefix + dest_path ) + + +def s3_delete_objects(s3_config: S3Config, object_keys: Set[str]) -> None: + """ + Deletes objects from an S3 bucket. The objects are identified by keys relative to + `s3_config.key_prefix`. + + Note: The AWS S3 `DeleteObjects` API, used by `boto3.client.delete_objects`, supports a maximum + of 1,000 objects per request. This method splits the provided keys into batches of up to 1,000 + and issues multiple delete requests until all objects are removed. + + :param s3_config: The S3 config specifying the credentials and the bucket to perform deletion. + :param object_keys: The set of object keys to delete, relative to `s3_config.key_prefix`. + :raises: Propagates `boto3.client`'s exceptions. + :raises: Propagates `boto3.client.delete_object`'s exceptions. + """ + boto3_config = Config(retries=dict(total_max_attempts=3, mode="adaptive")) + s3_client = _create_s3_client(s3_config, boto3_config) + max_num_objects_to_delete_per_batch = 1000 + + def _gen_deletion_config(objects_list: List[str]): + return {"Objects": [{"Key": object_to_delete} for object_to_delete in objects_list]} + + objects_to_delete: List[str] = [] + for relative_obj_key in object_keys: + objects_to_delete.append(s3_config.key_prefix + relative_obj_key) + if len(objects_to_delete) < max_num_objects_to_delete_per_batch: + continue + + s3_client.delete_objects( + Bucket=s3_config.bucket, + Delete=_gen_deletion_config(objects_to_delete), + ) + objects_to_delete = [] + + if len(objects_to_delete) != 0: + s3_client.delete_objects( + Bucket=s3_config.bucket, + Delete=_gen_deletion_config(objects_to_delete), + ) diff --git a/components/job-orchestration/job_orchestration/garbage_collector/archive_garbage_collector.py b/components/job-orchestration/job_orchestration/garbage_collector/archive_garbage_collector.py new file mode 100644 index 0000000000..b0f710b84b --- /dev/null +++ b/components/job-orchestration/job_orchestration/garbage_collector/archive_garbage_collector.py @@ -0,0 +1,215 @@ +import asyncio +import pathlib +import time +from contextlib import closing +from typing import List, Optional + +from clp_py_utils.clp_config import ( + ArchiveOutput, + CLPConfig, + Database, + QUERY_JOBS_TABLE_NAME, + StorageEngine, +) +from clp_py_utils.clp_logging import get_logger +from clp_py_utils.clp_metadata_db_utils import ( + delete_archives_from_metadata_db, + fetch_existing_datasets, + get_archives_table_name, +) +from clp_py_utils.sql_adapter import SQL_Adapter +from job_orchestration.garbage_collector.constants import ( + ARCHIVE_GARBAGE_COLLECTOR_NAME, + MIN_TO_SECONDS, + SECOND_TO_MILLISECOND, +) +from job_orchestration.garbage_collector.utils import ( + configure_logger, + DeletionCandidatesBuffer, + execute_deletion, + validate_storage_type, +) +from job_orchestration.scheduler.constants import QueryJobStatus + +logger = get_logger(ARCHIVE_GARBAGE_COLLECTOR_NAME) + + +def _delete_expired_archives( + db_conn, + db_cursor, + table_prefix: str, + archive_expiry_epoch_secs: int, + candidates_buffer: DeletionCandidatesBuffer, + archive_output_config: ArchiveOutput, + dataset: Optional[str], +) -> None: + archives_table = get_archives_table_name(table_prefix, dataset) + archive_end_ts_upper_bound = archive_expiry_epoch_secs * SECOND_TO_MILLISECOND + + db_cursor.execute( + f""" + SELECT id FROM `{archives_table}` + WHERE end_timestamp < %s + AND end_timestamp != 0 + """, + [archive_end_ts_upper_bound], + ) + + results = db_cursor.fetchall() + archive_ids = [result["id"] for result in results] + if len(archive_ids) != 0: + delete_archives_from_metadata_db(db_cursor, archive_ids, table_prefix, dataset) + + for candidate in archive_ids: + if dataset is not None: + candidate = f"{dataset}/{candidate}" + candidates_buffer.add_candidate(candidate) + + candidates_buffer.persist_new_candidates() + db_conn.commit() + + candidates_to_delete = candidates_buffer.get_candidates() + num_candidates_to_delete = len(candidates_to_delete) + if 0 == num_candidates_to_delete: + logger.debug( + f"No archives matched the expiry criteria: `end_ts < {archive_end_ts_upper_bound}`." + ) + return + + execute_deletion(archive_output_config, candidates_to_delete) + + # Prepare the log message + dataset_msg: str + deleted_candidates: List[str] + if dataset is not None: + dataset_log_msg = f" from dataset `{dataset}`" + # Note: If dataset is not None, candidates are expected to be in the format + # `/` + deleted_candidates = [candidate.split("/")[1] for candidate in candidates_to_delete] + else: + dataset_log_msg = "" + deleted_candidates = list(candidates_to_delete) + + candidates_buffer.clear() + logger.info( + f"Deleted {num_candidates_to_delete} archive(s){dataset_log_msg}: {deleted_candidates}" + ) + + +def _get_archive_safe_expiry_epoch( + db_cursor, + retention_period_minutes: int, +) -> int: + """ + Calculates a safe expiration timestamp such that archives with `end_ts` less than this value are + guaranteed not to be searched by any running query jobs. + + If no query jobs are running, the expiry time is set to `current_time - retention_period`. + If a query job is running and was created at `creation_time`, the query scheduler guarantees + that it will not search any archive whose end_ts < (creation_time - retention_period). + In this case, the expiry time can be safely adjusted to `creation_time - retention_period`. + + Note: This function does not consider query jobs that started before + `current_time - retention_period`, as such long-running jobs are likely hanging. Including them + would prevent the expiry time from advancing. + + :param db_cursor: Database cursor object + :param retention_period_minutes: Retention window in minutes + :return: Epoch timestamp indicating the safe expiration time (in seconds) + """ + retention_period_secs = retention_period_minutes * MIN_TO_SECONDS + current_epoch_secs = time.time() + archive_expiry_epoch: int + + db_cursor.execute( + f""" + SELECT id, creation_time + FROM `{QUERY_JOBS_TABLE_NAME}` + WHERE {QUERY_JOBS_TABLE_NAME}.status = {QueryJobStatus.RUNNING} + AND {QUERY_JOBS_TABLE_NAME}.creation_time < FROM_UNIXTIME(%s) + ORDER BY creation_time ASC + LIMIT 1 + """, + [current_epoch_secs], + ) + + row = db_cursor.fetchone() + if row is not None: + job_creation_time = row.get("creation_time") + archive_expiry_epoch = int(job_creation_time.timestamp()) - retention_period_secs + logger.debug(f"Discovered running query job created at {job_creation_time}.") + logger.debug(f"Using adjusted archive_expiry_epoch=`{archive_expiry_epoch}`.") + else: + archive_expiry_epoch = int(current_epoch_secs) - retention_period_secs + logger.debug(f"Using archive_expiry_epoch=`{archive_expiry_epoch}`.") + + return archive_expiry_epoch + + +def _collect_and_sweep_expired_archives( + archive_output_config: ArchiveOutput, + storage_engine: str, + database_config: Database, + recovery_file: pathlib.Path, +) -> None: + candidates_buffer = DeletionCandidatesBuffer(recovery_file) + + clp_connection_param = database_config.get_clp_connection_params_and_type() + table_prefix = clp_connection_param["table_prefix"] + sql_adapter = SQL_Adapter(database_config) + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + archive_expiry_epoch = _get_archive_safe_expiry_epoch( + db_cursor, + archive_output_config.retention_period, + ) + if StorageEngine.CLP_S == storage_engine: + datasets = fetch_existing_datasets(db_cursor, table_prefix) + for dataset in datasets: + logger.debug(f"Running garbage collection on dataset `{dataset}`") + _delete_expired_archives( + db_conn, + db_cursor, + table_prefix, + archive_expiry_epoch, + candidates_buffer, + archive_output_config, + dataset, + ) + elif StorageEngine.CLP == storage_engine: + _delete_expired_archives( + db_conn, + db_cursor, + table_prefix, + archive_expiry_epoch, + candidates_buffer, + archive_output_config, + None, + ) + else: + raise ValueError(f"Unsupported Storage engine: {storage_engine}.") + + +async def archive_garbage_collector( + clp_config: CLPConfig, log_directory: pathlib.Path, logging_level: str +) -> None: + configure_logger(logger, logging_level, log_directory, ARCHIVE_GARBAGE_COLLECTOR_NAME) + + archive_output_config = clp_config.archive_output + storage_engine = clp_config.package.storage_engine + validate_storage_type(archive_output_config, storage_engine) + + sweep_interval_secs = clp_config.garbage_collector.sweep_interval.archive * MIN_TO_SECONDS + recovery_file = clp_config.logs_directory / f"{ARCHIVE_GARBAGE_COLLECTOR_NAME}.tmp" + + logger.info(f"{ARCHIVE_GARBAGE_COLLECTOR_NAME} started.") + try: + while True: + _collect_and_sweep_expired_archives( + archive_output_config, storage_engine, clp_config.database, recovery_file + ) + await asyncio.sleep(sweep_interval_secs) + except Exception: + logger.exception(f"{ARCHIVE_GARBAGE_COLLECTOR_NAME} exited with failure.") + raise diff --git a/components/job-orchestration/job_orchestration/garbage_collector/constants.py b/components/job-orchestration/job_orchestration/garbage_collector/constants.py new file mode 100644 index 0000000000..f6cd57b9ab --- /dev/null +++ b/components/job-orchestration/job_orchestration/garbage_collector/constants.py @@ -0,0 +1,7 @@ +from typing_extensions import Final + +MIN_TO_SECONDS: Final[int] = 60 +SECOND_TO_MILLISECOND: Final[int] = 1000 + +ARCHIVE_GARBAGE_COLLECTOR_NAME: Final[str] = "archive-garbage-collector" +SEARCH_RESULT_GARBAGE_COLLECTOR_NAME: Final[str] = "search-result-garbage-collector" diff --git a/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py b/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py new file mode 100644 index 0000000000..3c16e32b9d --- /dev/null +++ b/components/job-orchestration/job_orchestration/garbage_collector/garbage_collector.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 + +import argparse +import asyncio +import os +import sys +from pathlib import Path +from typing import Callable, Dict, List, Optional, Tuple + +from clp_py_utils.clp_config import ( + CLPConfig, + GARBAGE_COLLECTOR_NAME, +) +from clp_py_utils.clp_logging import get_logger +from clp_py_utils.core import read_yaml_config_file +from job_orchestration.garbage_collector.archive_garbage_collector import archive_garbage_collector +from job_orchestration.garbage_collector.constants import ( + ARCHIVE_GARBAGE_COLLECTOR_NAME, + SEARCH_RESULT_GARBAGE_COLLECTOR_NAME, +) +from job_orchestration.garbage_collector.search_result_garbage_collector import ( + search_result_garbage_collector, +) +from job_orchestration.garbage_collector.utils import configure_logger +from pydantic import ValidationError + +logger = get_logger(GARBAGE_COLLECTOR_NAME) + + +async def main(argv: List[str]) -> int: + args_parser = argparse.ArgumentParser(description=f"Spin up the {GARBAGE_COLLECTOR_NAME}.") + args_parser.add_argument("--config", "-c", required=True, help="CLP configuration file.") + parsed_args = args_parser.parse_args(argv[1:]) + + # Setup logging to file + logs_directory = Path(os.getenv("CLP_LOGS_DIR")) + logging_level = os.getenv("CLP_LOGGING_LEVEL") + configure_logger(logger, logging_level, logs_directory, GARBAGE_COLLECTOR_NAME) + + # Load configuration + config_path = Path(parsed_args.config) + try: + clp_config = CLPConfig.parse_obj(read_yaml_config_file(config_path)) + except ValidationError as err: + logger.error(err) + return 1 + except Exception: + logger.exception("Failed to parse CLP configuration file.") + return 1 + + gc_task_configs: Dict[str, Tuple[Optional[int], Callable]] = { + ARCHIVE_GARBAGE_COLLECTOR_NAME: ( + clp_config.archive_output.retention_period, + archive_garbage_collector, + ), + SEARCH_RESULT_GARBAGE_COLLECTOR_NAME: ( + clp_config.results_cache.retention_period, + search_result_garbage_collector, + ), + } + gc_tasks: List[asyncio.Task[None]] = [] + + # Create GC tasks + for gc_name, (retention_period, task_method) in gc_task_configs.items(): + if retention_period is None: + logger.info(f"Retention period is not configured, skip creating {gc_name}.") + continue + logger.info(f"Creating {gc_name} with retention period = {retention_period} minutes") + gc_tasks.append( + asyncio.create_task( + task_method(clp_config, logs_directory, logging_level), name=gc_name + ) + ) + + # Poll and report any task that finished unexpectedly + while len(gc_tasks) != 0: + done, _ = await asyncio.wait(gc_tasks, return_when=asyncio.FIRST_COMPLETED) + for task in done: + gc_tasks.remove(task) + gc_name = task.get_name() + try: + _ = task.result() + logger.error(f"{gc_name} unexpectedly terminated without an error.") + except Exception as e: + logger.exception(f"{gc_name} failed.") + + logger.error("All garbage collectors terminated unexpectedly.") + return 1 + + +if "__main__" == __name__: + sys.exit(asyncio.run(main(sys.argv))) diff --git a/components/job-orchestration/job_orchestration/garbage_collector/search_result_garbage_collector.py b/components/job-orchestration/job_orchestration/garbage_collector/search_result_garbage_collector.py new file mode 100644 index 0000000000..ee406b48d6 --- /dev/null +++ b/components/job-orchestration/job_orchestration/garbage_collector/search_result_garbage_collector.py @@ -0,0 +1,88 @@ +import asyncio +import pathlib +from typing import Final, List + +import pymongo +import pymongo.database +from bson import ObjectId +from clp_py_utils.clp_config import CLPConfig, ResultsCache +from clp_py_utils.clp_logging import get_logger +from job_orchestration.garbage_collector.constants import ( + MIN_TO_SECONDS, + SEARCH_RESULT_GARBAGE_COLLECTOR_NAME, +) +from job_orchestration.garbage_collector.utils import ( + configure_logger, + get_expiry_epoch_secs, +) + +# Constants +MONGODB_ID_KEY: Final[str] = "_id" + +logger = get_logger(SEARCH_RESULT_GARBAGE_COLLECTOR_NAME) + + +def _get_latest_doc_timestamp(collection: pymongo.collection.Collection) -> int: + latest_doc = collection.find_one(sort=[(MONGODB_ID_KEY, pymongo.DESCENDING)]) + if latest_doc is None: + return 0 + + object_id = latest_doc[MONGODB_ID_KEY] + if isinstance(object_id, ObjectId): + return int(object_id.generation_time.timestamp()) + raise ValueError(f"{object_id} is not an ObjectID") + + +def _delete_result_metadata( + database: pymongo.database.Database, results_metadata_collection_name: str, job_id: str +) -> None: + results_metadata_collection = database.get_collection(results_metadata_collection_name) + results_metadata_collection.delete_one({MONGODB_ID_KEY: job_id}) + + +def _collect_and_sweep_expired_search_results( + result_cache_config: ResultsCache, results_metadata_collection_name: str +): + expiry_epoch = get_expiry_epoch_secs(result_cache_config.retention_period) + + logger.debug(f"Searching for search jobs finished before {expiry_epoch}.") + deleted_job_ids: List[int] = [] + with pymongo.MongoClient(result_cache_config.get_uri()) as results_cache_client: + results_cache_db = results_cache_client.get_default_database() + collection_names = results_cache_db.list_collection_names() + for job_id in collection_names: + if not job_id.isdigit(): + continue + + job_results_collection = results_cache_db.get_collection(job_id) + collection_timestamp = _get_latest_doc_timestamp(job_results_collection) + if collection_timestamp >= expiry_epoch: + continue + + _delete_result_metadata(results_cache_db, results_metadata_collection_name, job_id) + job_results_collection.drop() + deleted_job_ids.append(int(job_id)) + + if len(deleted_job_ids) != 0: + logger.debug(f"Deleted search results of job(s): {deleted_job_ids}.") + else: + logger.debug(f"No search results matched the expiry criteria.") + + +async def search_result_garbage_collector( + clp_config: CLPConfig, log_directory: pathlib.Path, logging_level: str +) -> None: + configure_logger(logger, logging_level, log_directory, SEARCH_RESULT_GARBAGE_COLLECTOR_NAME) + + sweep_interval_secs = clp_config.garbage_collector.sweep_interval.search_result * MIN_TO_SECONDS + + logger.info(f"{SEARCH_RESULT_GARBAGE_COLLECTOR_NAME} started.") + try: + while True: + _collect_and_sweep_expired_search_results( + clp_config.results_cache, clp_config.webui.results_metadata_collection_name + ) + await asyncio.sleep(sweep_interval_secs) + except Exception: + logger.exception(f"{SEARCH_RESULT_GARBAGE_COLLECTOR_NAME} exited with failure.") + raise diff --git a/components/job-orchestration/job_orchestration/garbage_collector/utils.py b/components/job-orchestration/job_orchestration/garbage_collector/utils.py new file mode 100644 index 0000000000..8816ee6e8b --- /dev/null +++ b/components/job-orchestration/job_orchestration/garbage_collector/utils.py @@ -0,0 +1,150 @@ +import logging +import os +import pathlib +import shutil +import time +from datetime import datetime, timezone +from typing import List, Set + +from bson import ObjectId +from clp_py_utils.clp_config import ( + ArchiveOutput, + FsStorage, + StorageEngine, + StorageType, +) +from clp_py_utils.clp_logging import get_logging_formatter, set_logging_level +from clp_py_utils.s3_utils import s3_delete_objects +from job_orchestration.garbage_collector.constants import MIN_TO_SECONDS + + +def configure_logger( + logger: logging.Logger, logging_level: str, log_directory: pathlib.Path, handler_name: str +): + log_file = log_directory / f"{handler_name}.log" + logging_file_handler = logging.FileHandler(filename=log_file, encoding="utf-8") + logging_file_handler.setFormatter(get_logging_formatter()) + logger.addHandler(logging_file_handler) + set_logging_level(logger, logging_level) + + +def validate_storage_type(output_config: ArchiveOutput, storage_engine: str) -> None: + storage_type = output_config.storage.type + if StorageType.S3 == storage_type: + if StorageEngine.CLP_S != storage_engine: + raise ValueError( + f"{storage_type} is not supported when using storage engine {storage_engine}" + ) + elif StorageType.FS != storage_type: + raise ValueError(f"Unsupported Storage type: {storage_type}") + + +def get_expiry_epoch_secs(retention_minutes: int) -> int: + """ + Returns a cutoff `expiry_epoch` based on the current timestamp and `retention_minutes`. Any + candidate with a timestamp (`ts`) less than `expiry_epoch` is considered expired. + The `expiry_epoch` is calculated as `expiry_epoch = cur_time - retention_secs`. + + :param retention_minutes: Retention period in minutes. + :return: The UTC epoch representing the expiry cutoff time. + """ + return int(time.time() - retention_minutes * MIN_TO_SECONDS) + + +def get_oid_with_expiry_time(expiry_epoch_secs: int) -> ObjectId: + return ObjectId.from_datetime(datetime.fromtimestamp(expiry_epoch_secs, tz=timezone.utc)) + + +def execute_fs_deletion(fs_storage_config: FsStorage, candidate: str) -> None: + """ + Deletes a candidate (either a directory or a file) from the filesystem storage. The full path + of the candidate is constructed as `fs_storage_config.directory / candidate`. The function + performs no action if the candidate does not exist. + + :param fs_storage_config: + :param candidate: Relative path of the file or directory to delete. + """ + path_to_delete = fs_storage_config.directory / candidate + if not path_to_delete.exists(): + return + + if path_to_delete.is_dir(): + shutil.rmtree(path_to_delete) + else: + os.remove(path_to_delete) + + +def execute_deletion(output_config: ArchiveOutput, deletion_candidates: Set[str]) -> None: + storage_config = output_config.storage + storage_type = storage_config.type + + if StorageType.S3 == storage_type: + s3_delete_objects(storage_config.s3_config, deletion_candidates) + elif StorageType.FS == storage_type: + for candidate in deletion_candidates: + execute_fs_deletion(storage_config, candidate) + else: + raise ValueError(f"Unsupported Storage type: {storage_type}") + + +class DeletionCandidatesBuffer: + """ + Represents an in-memory buffer for deletion candidates with fault-tolerance support. + + This class supports recovering from a previous failure by reading previously persisted + candidates from a recovery file. The user is expected to explicitly call + `persist_new_candidates` to persist any new candidates on to the disk for fault-tolerance + purposes. + + :param recovery_file_path: Path to the file used for recovering and persisting candidates. + :raises ValueError: If the recovery path exists but is not a file. + """ + + def __init__(self, recovery_file_path: pathlib.Path): + self._recovery_file_path: pathlib.Path = recovery_file_path + self._candidates: Set[str] = set() + self._candidates_to_persist: List[str] = list() + + if not self._recovery_file_path.exists(): + return + + if not self._recovery_file_path.is_file(): + raise ValueError(f"{self._recovery_file_path} does not resolve to a file") + + with open(self._recovery_file_path, "r") as f: + for line in f: + self._candidates.add(line.strip()) + + def add_candidate(self, candidate: str) -> None: + if candidate not in self._candidates: + self._candidates.add(candidate) + self._candidates_to_persist.append(candidate) + + def get_candidates(self) -> Set[str]: + return self._candidates + + def persist_new_candidates(self) -> None: + """ + Persists any new deletion candidates buffered in `_candidates_to_persist` by appending them + to the recovery file, then clears `_candidates_to_persist`. + This method returns immediately if `_candidates_to_persist` is empty. + """ + if len(self._candidates_to_persist) == 0: + return + + with open(self._recovery_file_path, "a") as f: + for candidate in self._candidates_to_persist: + f.write(f"{candidate}\n") + + self._candidates_to_persist.clear() + + def clear(self): + """ + Clears the in-memory buffer of candidates and removes the recovery file. + + This is intended to be called after the caller finished processing all candidates (i.e., + when recovery is no longer needed for the candidates.) + """ + self._candidates.clear() + if self._recovery_file_path.exists(): + os.unlink(self._recovery_file_path) diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 785c7858d8..4973fa9339 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -48,6 +48,7 @@ from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.extract_stream_task import extract_stream from job_orchestration.executor.query.fs_search_task import search +from job_orchestration.garbage_collector.constants import MIN_TO_SECONDS, SECOND_TO_MILLISECOND from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType, QueryTaskStatus from job_orchestration.scheduler.job_config import ( ExtractIrJobConfig, @@ -247,7 +248,8 @@ def fetch_new_query_jobs(db_conn) -> list: f""" SELECT {QUERY_JOBS_TABLE_NAME}.id as job_id, {QUERY_JOBS_TABLE_NAME}.job_config, - {QUERY_JOBS_TABLE_NAME}.type + {QUERY_JOBS_TABLE_NAME}.type, + {QUERY_JOBS_TABLE_NAME}.creation_time FROM {QUERY_JOBS_TABLE_NAME} WHERE {QUERY_JOBS_TABLE_NAME}.status={QueryJobStatus.PENDING} """ @@ -389,6 +391,7 @@ def get_archives_for_search( db_conn, table_prefix: str, search_config: SearchJobConfig, + archive_end_ts_lower_bound: Optional[int], ): dataset = search_config.dataset query = f"""SELECT id as archive_id, end_timestamp @@ -399,6 +402,10 @@ def get_archives_for_search( filter_clauses.append(f"begin_timestamp <= {search_config.end_timestamp}") if search_config.begin_timestamp is not None: filter_clauses.append(f"end_timestamp >= {search_config.begin_timestamp}") + if archive_end_ts_lower_bound is not None: + filter_clauses.append( + f"(end_timestamp >= {archive_end_ts_lower_bound} OR end_timestamp = 0)" + ) if search_config.tags is not None: archive_tags_table_name = get_archive_tags_table_name(table_prefix, dataset) tags_table_name = get_tags_table_name(table_prefix, dataset) @@ -626,6 +633,7 @@ def handle_pending_query_jobs( stream_collection_name: str, num_archives_to_search_per_sub_job: int, existing_datasets: Set[str], + archive_retention_period: Optional[int], ) -> List[asyncio.Task]: global active_jobs @@ -644,6 +652,7 @@ def handle_pending_query_jobs( job_id = str(job["job_id"]) job_type = job["type"] job_config = msgpack.unpackb(job["job_config"]) + job_creation_time = job["creation_time"].timestamp() table_prefix = clp_metadata_db_conn_params["table_prefix"] dataset = QueryJobConfig.parse_obj(job_config).dataset @@ -670,7 +679,15 @@ def handle_pending_query_jobs( continue search_config = SearchJobConfig.parse_obj(job_config) - archives_for_search = get_archives_for_search(db_conn, table_prefix, search_config) + archive_end_ts_lower_bound: Optional[int] = None + if archive_retention_period is not None: + archive_end_ts_lower_bound = SECOND_TO_MILLISECOND * ( + job_creation_time - archive_retention_period * MIN_TO_SECONDS + ) + + archives_for_search = get_archives_for_search( + db_conn, table_prefix, search_config, archive_end_ts_lower_bound + ) if len(archives_for_search) == 0: if set_job_or_task_status( db_conn, @@ -1081,6 +1098,7 @@ async def handle_jobs( stream_collection_name: str, jobs_poll_delay: float, num_archives_to_search_per_sub_job: int, + archive_retention_period: Optional[int], ) -> None: handle_updating_task = asyncio.create_task( handle_job_updates(db_conn_pool, results_cache_uri, jobs_poll_delay) @@ -1096,6 +1114,7 @@ async def handle_jobs( stream_collection_name, num_archives_to_search_per_sub_job, existing_datasets, + archive_retention_period, ) if 0 == len(reducer_acquisition_tasks): tasks.append(asyncio.create_task(asyncio.sleep(jobs_poll_delay))) @@ -1181,6 +1200,7 @@ async def main(argv: List[str]) -> int: stream_collection_name=clp_config.results_cache.stream_collection_name, jobs_poll_delay=clp_config.query_scheduler.jobs_poll_delay, num_archives_to_search_per_sub_job=batch_size, + archive_retention_period=clp_config.archive_output.retention_period, ) ) reducer_handler = asyncio.create_task(reducer_handler.serve_forever()) diff --git a/components/package-template/src/etc/clp-config.yml b/components/package-template/src/etc/clp-config.yml index 7ea6076a5d..026c4a1ee7 100644 --- a/components/package-template/src/etc/clp-config.yml +++ b/components/package-template/src/etc/clp-config.yml @@ -53,6 +53,9 @@ # db_name: "clp-query-results" # stream_collection_name: "stream-files" # +# # Retention period for search results, in minutes. Set to null to disable automatic deletion. +# retention_period: null +# #compression_worker: # logging_level: "INFO" # @@ -72,6 +75,9 @@ # # error will be raised if so. # directory: "var/data/archives" # +# # Retention period for archives, in minutes. Set to null to disable automatic deletion. +# retention_period: null +# # # How much data CLP should try to compress into each archive # target_archive_size: 268435456 # 256 MB # @@ -100,6 +106,15 @@ # # How large each stream file should be before being split into a new stream file # target_uncompressed_size: 134217728 # 128 MB # +## Garbage collector config +#garbage_collector: +# logging_level: "INFO" +# +# # Interval (in minutes) at which garbage collector jobs run +# sweep_interval: +# archive: 60 +# search_result: 30 +# ## Location where other data (besides archives) are stored. It will be created if ## it doesn't exist. ## NOTE: This directory must not overlap with any path used in CLP's execution container. An error diff --git a/docs/src/user-guide/guides-using-object-storage/object-storage-config.md b/docs/src/user-guide/guides-using-object-storage/object-storage-config.md index ff44ef2464..b6110e9fdc 100644 --- a/docs/src/user-guide/guides-using-object-storage/object-storage-config.md +++ b/docs/src/user-guide/guides-using-object-storage/object-storage-config.md @@ -61,7 +61,8 @@ the fields in angle brackets (`<>`) with the appropriate values: "Effect": "Allow", "Action": [ "s3:GetObject", - "s3:PutObject" + "s3:PutObject", + "s3:DeleteObject" ], "Resource": [ "arn:aws:s3::://*"