Skip to content
Merged
Show file tree
Hide file tree
Changes from 119 commits
Commits
Show all changes
129 commits
Select commit Hold shift + click to select a range
73f0d69
Barebone
haiqi96 Jun 17, 2025
bc4c464
backup of progress
haiqi96 Jun 18, 2025
e6238a2
Backup for initial handling for streams
haiqi96 Jun 18, 2025
653f3dc
Backup for initial handling for streams
haiqi96 Jun 18, 2025
a269a97
fix
haiqi96 Jun 18, 2025
1742ff0
small update to use fancier syntax
haiqi96 Jun 18, 2025
efbb47b
fixes
haiqi96 Jun 18, 2025
5714945
linter yeah
haiqi96 Jun 18, 2025
b1fe0d4
Adding simple handler for reusing
haiqi96 Jun 19, 2025
a5f5a3b
commit to propogate change
haiqi96 Jun 19, 2025
53d7417
Add handler
haiqi96 Jun 19, 2025
ccd23dc
Fix mistakes in the handler logic
haiqi96 Jun 19, 2025
5bb3685
Update scheduler to handle logs of time.
haiqi96 Jun 19, 2025
bc688b9
Merge branch 'main' into retension_period
haiqi96 Jun 19, 2025
0c8c6f6
Refactor dataset related code
haiqi96 Jun 20, 2025
0d186e6
Refactor dataset related code
haiqi96 Jun 20, 2025
75ac0ff
further refactor
haiqi96 Jun 20, 2025
bb1e5f4
Linter
haiqi96 Jun 20, 2025
ba7cfe1
A few more fixes
haiqi96 Jun 20, 2025
68454c6
Linter fixes
haiqi96 Jun 20, 2025
5eccaaf
Merge branch 'DatasetRefactor' into retension_period
haiqi96 Jun 20, 2025
c1de746
missing fixes
haiqi96 Jun 20, 2025
f08802b
Merge remote-tracking branch 'origin/DatasetRefactor' into retension_…
haiqi96 Jun 20, 2025
d797198
Fix mistake
haiqi96 Jun 20, 2025
c5dc9b9
Merge remote-tracking branch 'origin/DatasetRefactor' into retension_…
haiqi96 Jun 20, 2025
8c39e77
actually fixing
haiqi96 Jun 20, 2025
ea4318e
Merge remote-tracking branch 'origin/DatasetRefactor' into retension_…
haiqi96 Jun 20, 2025
2c97441
Intermediate backup for archive retention
haiqi96 Jun 20, 2025
2eff448
Update
haiqi96 Jun 20, 2025
d570ab6
Linter again
haiqi96 Jun 20, 2025
06332f4
Merge remote-tracking branch 'origin/DatasetRefactor' into retension_…
haiqi96 Jun 20, 2025
d5e8e28
some renaming
haiqi96 Jun 20, 2025
8a79b9b
adding reminder for myself
haiqi96 Jun 23, 2025
8c77119
Fixing permissions
haiqi96 Jun 23, 2025
3a1afb2
Add batch deletion support
haiqi96 Jun 24, 2025
73d76ac
Linter + code clean up
haiqi96 Jun 24, 2025
5745e65
More refactor
haiqi96 Jun 24, 2025
f3ba8b0
renaming
haiqi96 Jun 24, 2025
e566e74
Prepare for rearrangement
haiqi96 Jun 24, 2025
db9a508
Optimize logger
haiqi96 Jun 24, 2025
2f0f95a
Further refactor
haiqi96 Jun 25, 2025
e310102
Use asyncio
haiqi96 Jun 25, 2025
a332799
Refactoring
haiqi96 Jun 25, 2025
cb53857
Refactoring
haiqi96 Jun 25, 2025
85b7823
Update clp-config
haiqi96 Jun 25, 2025
398ab5e
Merge branch 'main' into DatasetRefactor
haiqi96 Jun 25, 2025
3209ddd
Merge branch 'DatasetRefactor' into retension_period
haiqi96 Jun 25, 2025
3c5b0e4
New line at eof
haiqi96 Jun 25, 2025
fb41607
Refactor retention cleaner name
haiqi96 Jun 25, 2025
386453b
Clean up
haiqi96 Jun 25, 2025
945c97b
linter
haiqi96 Jun 25, 2025
1845462
Adding more docstrings
haiqi96 Jun 25, 2025
bed13df
Temporarily remove stream retention
haiqi96 Jun 25, 2025
0d8d679
Linter
haiqi96 Jun 25, 2025
d40e773
Revert change for stream
haiqi96 Jun 25, 2025
7759a7a
Merge remote-tracking branch 'origin/main' into DatasetRefactor
haiqi96 Jun 27, 2025
271b8b3
Merge branch 'DatasetRefactor' into retension_period
haiqi96 Jun 27, 2025
e6b8cc7
Linter
haiqi96 Jun 27, 2025
c0b8563
Merge branch 'DatasetRefactor' into retension_period
haiqi96 Jun 27, 2025
7a468c3
Merge branch 'main' into DatasetRefactor
Bill-hbrhbr Jun 29, 2025
1dd1cea
Move default dataset metadata table creation to start_clp
Bill-hbrhbr Jun 29, 2025
a0c3c29
Remove unused import
Bill-hbrhbr Jun 29, 2025
a9bf615
Address review comments
Bill-hbrhbr Jun 30, 2025
fe05f5f
Replace the missing SUFFIX
Bill-hbrhbr Jun 30, 2025
39a9278
Move suffix constants from clp_config to clp_metadata_db_utils local …
Bill-hbrhbr Jun 30, 2025
7124828
Refactor archive_manager.py.
kirkrodrigues Jun 30, 2025
eb80992
Refactor s3_utils.py.
kirkrodrigues Jun 30, 2025
5ed44e7
compression_task.py: Fix typing errors and minor refactoring.
kirkrodrigues Jun 30, 2025
af6b508
compression_scheduler.py: Remove exception swallow which will hide un…
kirkrodrigues Jun 30, 2025
67fb01f
Refactor query_scheduler.py.
kirkrodrigues Jun 30, 2025
d6ad4de
clp_metadata_db_utils.py: Minor refactoring.
kirkrodrigues Jun 30, 2025
ff7d700
clp_metadata_db_utils.py: Rename _generic_get_table_name -> _get_tabl…
kirkrodrigues Jun 30, 2025
7ffc77c
clp_metadata_db_utils.py: Alphabetize new public functions.
kirkrodrigues Jun 30, 2025
0255cbd
clp_metadata_db_utils.py: Reorder public and private functions for co…
kirkrodrigues Jun 30, 2025
1076a3f
initialize-clp-metadata-db.py: Remove changes unrelated to PR.
kirkrodrigues Jun 30, 2025
71c4d82
Move default dataset creation into compression_scheduler so that it r…
kirkrodrigues Jun 30, 2025
6bd9372
Apply suggestions from code review
kirkrodrigues Jul 1, 2025
84df2e2
Merge branch 'main' into DatasetRefactor
kirkrodrigues Jul 1, 2025
983bea1
Remove bug fix that's no longer necessary.
kirkrodrigues Jul 1, 2025
bdb7817
Fix bug where dataset has a default value instead of None when using …
Bill-hbrhbr Jul 1, 2025
a82a267
Correctly feed in the input config dataset names
Bill-hbrhbr Jul 1, 2025
f699496
Remove unnecessary changes
Bill-hbrhbr Jul 1, 2025
94e8ca1
Merge branch 'DatasetRefactor' into retension_period
haiqi96 Jul 2, 2025
90ce0a4
Update the webui to pass the dataset name in the clp-json code path (…
kirkrodrigues Jul 2, 2025
d6f9e5a
Move dataset into the user function
haiqi96 Jul 2, 2025
dc6a706
Merge branch 'DatasetRefactor' of https://github.com/haiqi96/clp_fork…
haiqi96 Jul 2, 2025
76bcb4a
Remove unnecessary f string specifier
haiqi96 Jul 2, 2025
a4e6f83
Apply suggestions from code review
haiqi96 Jul 2, 2025
3c53cb0
Merge branch 'DatasetRefactor' into retension_period
haiqi96 Jul 2, 2025
66eba87
Polishing
haiqi96 Jul 2, 2025
7b42568
Add import type.
kirkrodrigues Jul 2, 2025
097e47c
Polishing more
haiqi96 Jul 3, 2025
8dc8e26
try adding query job handling
haiqi96 Jul 3, 2025
afe43ce
Merge branch 'main' into DatasetRefactor
haiqi96 Jul 3, 2025
85a3164
Merge remote-tracking branch 'origin/DatasetRefactor' into retension_…
haiqi96 Jul 3, 2025
af75118
Merge remote-tracking branch 'origin/main' into retension_period
haiqi96 Jul 3, 2025
e5e90f7
Fix wrong order
haiqi96 Jul 3, 2025
bac6767
Linter
haiqi96 Jul 3, 2025
de1c334
submit not-fully-tested-code
haiqi96 Jul 3, 2025
9fdb3d5
Apply suggestions from code review
haiqi96 Jul 4, 2025
2245244
Update components/job-orchestration/job_orchestration/retention/archi…
haiqi96 Jul 4, 2025
b1e5a2c
Apply suggestions from code review
haiqi96 Jul 4, 2025
4e93a30
Fix
haiqi96 Jul 4, 2025
6719872
Merge remote-tracking branch 'origin/main' into retension_period
haiqi96 Jul 4, 2025
f9fa626
nit fixes
haiqi96 Jul 4, 2025
450e16a
Update the logic to consider all running query jobs
haiqi96 Jul 17, 2025
ade2e27
Merge remote-tracking branch 'origin/main' into retension_period
haiqi96 Jul 17, 2025
f1584ff
linter
haiqi96 Jul 30, 2025
2c57dd6
Apply suggestions from code review
haiqi96 Aug 1, 2025
5f479c5
address code review concern
haiqi96 Aug 1, 2025
8c5fb89
Batch renaming
haiqi96 Aug 1, 2025
11e695f
Linter
haiqi96 Aug 1, 2025
1291c3f
Further refactor
haiqi96 Aug 1, 2025
f8c7369
Linter
haiqi96 Aug 1, 2025
9b48c9b
Apply suggestions from code review
haiqi96 Aug 4, 2025
6cff24d
Merge remote-tracking branch 'origin/main' into retension_period
haiqi96 Aug 4, 2025
c367c15
address review concern
haiqi96 Aug 4, 2025
e282020
Update logging
haiqi96 Aug 4, 2025
b93bb4b
Update components/job-orchestration/job_orchestration/garbage_collect…
haiqi96 Aug 4, 2025
390333f
Address review comments
haiqi96 Aug 5, 2025
a4546cf
Fix timezone
haiqi96 Aug 5, 2025
2c4821a
Apply suggestions from code review
haiqi96 Aug 7, 2025
9d5d087
Address code review comments and slight improved logging.
haiqi96 Aug 7, 2025
74af600
Linter
haiqi96 Aug 7, 2025
34a06a5
Merge remote-tracking branch 'origin/main' into retension_period
haiqi96 Aug 12, 2025
7633a6c
Update components/job-orchestration/job_orchestration/garbage_collect…
haiqi96 Aug 13, 2025
bd48ca5
Merge remote-tracking branch 'origin/main' into retension_period
haiqi96 Aug 13, 2025
8ca206e
Merge branch 'main' into retension_period
LinZhihao-723 Aug 13, 2025
ee9dd59
Merge branch 'main' into retension_period
haiqi96 Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
self.aws_config_dir: typing.Optional[DockerMount] = None


def _validate_data_directory(data_dir: pathlib.Path, component_name: str) -> None:
try:
validate_path_could_be_dir(data_dir)
except ValueError as ex:
raise ValueError(f"{component_name} data directory is invalid: {ex}")


def get_clp_home():
# Determine CLP_HOME from an environment variable or this script's path
clp_home = None
Expand Down Expand Up @@ -175,6 +182,13 @@ def is_container_exited(container_name):
return False


def validate_log_directory(logs_dir: pathlib.Path, component_name: str) -> None:
try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"{component_name} logs directory is invalid: {ex}")


def validate_port(port_name: str, hostname: str, port: int):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand Down Expand Up @@ -431,40 +445,23 @@ def validate_and_load_redis_credentials_file(


def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path):
try:
validate_path_could_be_dir(data_dir)
except ValueError as ex:
raise ValueError(f"{DB_COMPONENT_NAME} data directory is invalid: {ex}")

try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"{DB_COMPONENT_NAME} logs directory is invalid: {ex}")
_validate_data_directory(data_dir, DB_COMPONENT_NAME)
validate_log_directory(logs_dir, DB_COMPONENT_NAME)

validate_port(f"{DB_COMPONENT_NAME}.port", clp_config.database.host, clp_config.database.port)


def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path):
try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"{QUEUE_COMPONENT_NAME} logs directory is invalid: {ex}")
validate_log_directory(logs_dir, QUEUE_COMPONENT_NAME)

validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port)


def validate_redis_config(
clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path, base_config: pathlib.Path
):
try:
validate_path_could_be_dir(data_dir)
except ValueError as ex:
raise ValueError(f"{REDIS_COMPONENT_NAME} data directory is invalid {ex}")

try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"{REDIS_COMPONENT_NAME} logs directory is invalid: {ex}")
_validate_data_directory(data_dir, REDIS_COMPONENT_NAME)
validate_log_directory(logs_dir, REDIS_COMPONENT_NAME)

if not base_config.exists():
raise ValueError(
Expand All @@ -475,10 +472,7 @@ def validate_redis_config(


def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_workers: int):
try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"{REDUCER_COMPONENT_NAME} logs directory is invalid: {ex}")
validate_log_directory(logs_dir, REDUCER_COMPONENT_NAME)

for i in range(0, num_workers):
validate_port(
Expand All @@ -491,15 +485,8 @@ def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_w
def validate_results_cache_config(
clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path
):
try:
validate_path_could_be_dir(data_dir)
except ValueError as ex:
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} data directory is invalid: {ex}")

try:
validate_path_could_be_dir(logs_dir)
except ValueError as ex:
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} logs directory is invalid: {ex}")
_validate_data_directory(data_dir, RESULTS_CACHE_COMPONENT_NAME)
validate_log_directory(logs_dir, RESULTS_CACHE_COMPONENT_NAME)

validate_port(
f"{RESULTS_CACHE_COMPONENT_NAME}.port",
Expand All @@ -508,8 +495,11 @@ def validate_results_cache_config(
)


def validate_worker_config(clp_config: CLPConfig):
def validate_logs_input_config(clp_config: CLPConfig):
clp_config.validate_logs_input_config()


def validate_output_storage_config(clp_config: CLPConfig):
clp_config.validate_archive_output_config()
clp_config.validate_stream_output_config()

Expand Down Expand Up @@ -590,3 +580,13 @@ def validate_dataset_name(clp_table_prefix: str, dataset_name: str) -> None:
f"Invalid dataset name: `{dataset_name}`. Names can only be a maximum of"
f" {dataset_name_max_len} characters long."
)


def is_retention_configured(clp_config: CLPConfig) -> bool:
if clp_config.archive_output.retention_period is not None:
return True

if clp_config.results_cache.retention_period is not None:
return True

return False
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@

from clp_py_utils.clp_config import Database
from clp_py_utils.clp_metadata_db_utils import (
get_archive_tags_table_name,
delete_archives_from_metadata_db,
get_archives_table_name,
get_files_table_name,
)
from clp_py_utils.sql_adapter import SQL_Adapter

Expand Down Expand Up @@ -325,13 +324,13 @@ def _delete_archives(

archive_ids: typing.List[str]
logger.info("Starting to delete archives from the database.")
try:
sql_adapter: SQL_Adapter = SQL_Adapter(database_config)
clp_db_connection_params: dict[str, any] = (
database_config.get_clp_connection_params_and_type(True)
)
table_prefix = clp_db_connection_params["table_prefix"]
sql_adapter: SQL_Adapter = SQL_Adapter(database_config)
clp_db_connection_params: dict[str, any] = database_config.get_clp_connection_params_and_type(
True
)
table_prefix = clp_db_connection_params["table_prefix"]

try:
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
Expand All @@ -343,9 +342,8 @@ def _delete_archives(

db_cursor.execute(
f"""
DELETE FROM `{get_archives_table_name(table_prefix, dataset)}`
SELECT id FROM `{get_archives_table_name(table_prefix, dataset)}`
WHERE {query_criteria}
RETURNING id
""",
query_params,
)
Expand All @@ -358,21 +356,7 @@ def _delete_archives(
archive_ids: typing.List[str] = [result["id"] for result in results]
delete_handler.validate_results(archive_ids)

ids_list_string: str = ", ".join(["'%s'"] * len(archive_ids))

db_cursor.execute(
f"""
DELETE FROM `{get_files_table_name(table_prefix, dataset)}`
WHERE archive_id in ({ids_list_string})
"""
)

db_cursor.execute(
f"""
DELETE FROM `{get_archive_tags_table_name(table_prefix, dataset)}`
WHERE archive_id in ({ids_list_string})
"""
)
delete_archives_from_metadata_db(db_cursor, archive_ids, table_prefix, dataset)
for archive_id in archive_ids:
logger.info(f"Deleted archive {archive_id} from the database.")

Comment on lines 360 to 362
Copy link
Contributor

@coderabbitai coderabbitai bot Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider batching the deletion logging for better performance.

When deleting many archives, logging each deletion individually could impact performance. Consider logging a summary instead.

-            for archive_id in archive_ids:
-                logger.info(f"Deleted archive {archive_id} from the database.")
+            logger.info(f"Deleted {len(archive_ids)} archives from the database.")
+            logger.debug(f"Deleted archive IDs: {', '.join(archive_ids)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for archive_id in archive_ids:
logger.info(f"Deleted archive {archive_id} from the database.")
logger.info(f"Deleted {len(archive_ids)} archives from the database.")
logger.debug(f"Deleted archive IDs: {', '.join(archive_ids)}")
🤖 Prompt for AI Agents
In
components/clp-package-utils/clp_package_utils/scripts/native/archive_manager.py
around lines 360 to 362, the code logs each deleted archive inside the loop
which can degrade performance for large batches; change this to log a single
summary after the deletions complete — collect the deleted archive_ids during
the loop (or count them), then call logger.info once with a concise summary
(e.g., total deleted, and optionally a short sample or first/last N ids) rather
than one log per id.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to this PR. will skip.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haiqi96 Understood! You're absolutely right that logging performance optimization is outside the scope of this retention and garbage collection PR. I'll skip this suggestion to keep the focus on the core functionality being added.


🧠 Learnings used
Learnt from: haiqi96
PR: y-scope/clp#594
File: components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py:104-110
Timestamp: 2024-11-15T16:21:52.122Z
Learning: In `clp_package_utils/scripts/native/del_archives.py`, when deleting archives, the `archive` variable retrieved from the database is controlled and is always a single string without path components. Therefore, it's acceptable to skip additional validation checks for directory traversal in this context.

Expand All @@ -385,6 +369,8 @@ def _delete_archives(

except Exception:
logger.exception("Failed to delete archives from the database. Aborting deletion.")
if db_conn in locals() and db_conn.is_connected():
db_conn.rollback()
return -1

logger.info(f"Finished deleting archives from the database.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
COMPRESSION_WORKER_COMPONENT_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
GARBAGE_COLLECTOR_NAME,
QUERY_JOBS_TABLE_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
Expand Down Expand Up @@ -54,17 +55,20 @@
get_clp_home,
is_container_exited,
is_container_running,
is_retention_configured,
load_config_file,
validate_and_load_db_credentials_file,
validate_and_load_queue_credentials_file,
validate_and_load_redis_credentials_file,
validate_db_config,
validate_log_directory,
validate_logs_input_config,
validate_output_storage_config,
validate_queue_config,
validate_redis_config,
validate_reducer_config,
validate_results_cache_config,
validate_webui_config,
validate_worker_config,
)

logger = logging.getLogger(__file__)
Expand Down Expand Up @@ -1051,6 +1055,88 @@ def start_reducer(
logger.info(f"Started {component_name}.")


def start_garbage_collector(
instance_id: str,
clp_config: CLPConfig,
container_clp_config: CLPConfig,
mounts: CLPDockerMounts,
):
component_name = GARBAGE_COLLECTOR_NAME
Comment on lines +1064 to +1070
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Add a concise docstring to start_garbage_collector

A brief description of purpose and parameters will help future maintainers.

Example:

"""
Start the garbage-collector container if any retention period is configured.

Args:
instance_id: Short instance identifier used in container names.
clp_config: Host-side CLPConfig with absolute host paths/credentials.
container_clp_config: Container-side CLPConfig with container paths.
mounts: Precomputed Docker mounts for CLP directories and optional AWS config.
"""

🤖 Prompt for AI Agents
In components/clp-package-utils/clp_package_utils/scripts/start_clp.py around
lines 1064 to 1070, add a concise docstring to the start_garbage_collector
function describing its purpose and parameters; include a short one-line summary
(what the function does) and parameter entries for instance_id, clp_config,
container_clp_config, and mounts that mention expected types/semantics (e.g.,
host vs container config, instance identifier, precomputed Docker mounts),
following the provided example format and placement immediately under the def
line.


if not is_retention_configured(clp_config):
logger.info(f"Retention period is not configured, skipping {component_name} creation...")
return

logger.info(f"Starting {component_name}...")

container_name = f"clp-{component_name}-{instance_id}"
if container_exists(container_name):
return

container_config_filename = f"{container_name}.yml"
container_config_file_path = clp_config.logs_directory / container_config_filename
with open(container_config_file_path, "w") as f:
yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f)

logs_dir = clp_config.logs_directory / component_name
validate_log_directory(logs_dir, component_name)
# Create logs directory if necessary
logs_dir.mkdir(parents=True, exist_ok=True)
container_logs_dir = container_clp_config.logs_directory / component_name

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"

# fmt: off
container_start_cmd = [
"docker", "run",
"-di",
"--network", "host",
"-w", str(CONTAINER_CLP_HOME),
"--name", container_name,
"--log-driver", "local",
"-u", f"{os.getuid()}:{os.getgid()}",
]
# fmt: on

necessary_env_vars = [
f"PYTHONPATH={clp_site_packages_dir}",
f"CLP_HOME={CONTAINER_CLP_HOME}",
f"CLP_LOGS_DIR={container_logs_dir}",
f"CLP_LOGGING_LEVEL={clp_config.garbage_collector.logging_level}",
]
necessary_mounts = [
mounts.clp_home,
mounts.logs_dir,
]

# Add necessary mounts for archives and streams.
if StorageType.FS == clp_config.archive_output.storage.type:
necessary_mounts.append(mounts.archives_output_dir)
if StorageType.FS == clp_config.stream_output.storage.type:
necessary_mounts.append(mounts.stream_output_dir)

aws_mount, aws_env_vars = generate_container_auth_options(clp_config, component_name)
if aws_mount:
necessary_mounts.append(mounts.aws_config_dir)
if aws_env_vars:
necessary_env_vars.extend(aws_env_vars)

append_docker_options(container_start_cmd, necessary_mounts, necessary_env_vars)
container_start_cmd.append(clp_config.execution_container)

# fmt: off
garbage_collector_cmd = [
"python3", "-u",
"-m", "job_orchestration.garbage_collector.garbage_collector",
"--config", str(container_clp_config.logs_directory / container_config_filename),
]
# fmt: on
cmd = container_start_cmd + garbage_collector_cmd
subprocess.run(cmd, stdout=subprocess.DEVNULL, check=True)

logger.info(f"Started {component_name}.")


def add_num_workers_argument(parser):
parser.add_argument(
"--num-workers",
Expand Down Expand Up @@ -1087,6 +1173,7 @@ def main(argv):
reducer_server_parser = component_args_parser.add_parser(REDUCER_COMPONENT_NAME)
add_num_workers_argument(reducer_server_parser)
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME)

parsed_args = args_parser.parse_args(argv[1:])

Expand All @@ -1111,6 +1198,7 @@ def main(argv):
ALL_TARGET_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
GARBAGE_COLLECTOR_NAME,
COMPRESSION_SCHEDULER_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
WEBUI_COMPONENT_NAME,
Expand All @@ -1136,12 +1224,18 @@ def main(argv):
QUERY_WORKER_COMPONENT_NAME,
):
validate_and_load_redis_credentials_file(clp_config, clp_home, True)
if target in (
ALL_TARGET_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
):
validate_logs_input_config(clp_config)
if target in (
ALL_TARGET_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
GARBAGE_COLLECTOR_NAME,
):
validate_worker_config(clp_config)
validate_output_storage_config(clp_config)

clp_config.validate_data_dir()
clp_config.validate_logs_dir()
Expand Down Expand Up @@ -1210,6 +1304,8 @@ def main(argv):
start_reducer(instance_id, clp_config, container_clp_config, num_workers, mounts)
if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME):
start_webui(instance_id, clp_config, container_clp_config, mounts)
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME):
start_garbage_collector(instance_id, clp_config, container_clp_config, mounts)

except Exception as ex:
if type(ex) == ValueError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
COMPRESSION_WORKER_COMPONENT_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
GARBAGE_COLLECTOR_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
Expand Down Expand Up @@ -84,6 +85,7 @@ def main(argv):
component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME)

parsed_args = args_parser.parse_args(argv[1:])

Expand Down Expand Up @@ -130,6 +132,9 @@ def main(argv):

already_exited_containers = []
force = parsed_args.force
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME):
container_name = f"clp-{GARBAGE_COLLECTOR_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force)
if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME):
container_name = f"clp-{WEBUI_COMPONENT_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force)
Expand Down
Loading