Skip to content
Merged
9 changes: 9 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
CLPConfig,
DB_COMPONENT_NAME,
QueryEngine,
QUEUE_COMPONENT_NAME,
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
Expand Down Expand Up @@ -582,6 +583,14 @@ def validate_dataset_name(clp_table_prefix: str, dataset_name: str) -> None:
)


def validate_retention_config(clp_config: CLPConfig) -> None:
clp_query_engine = clp_config.package.query_engine
if is_retention_period_configured(clp_config) and clp_query_engine == QueryEngine.PRESTO:
raise ValueError(
f"Retention control is not supported with query_engine `{clp_query_engine}`"
)


def is_retention_period_configured(clp_config: CLPConfig) -> bool:
if clp_config.archive_output.retention_period is not None:
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
COMPRESSION_WORKER_COMPONENT_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
GARBAGE_COLLECTOR_NAME,
GARBAGE_COLLECTOR_COMPONENT_NAME,
get_components_for_target,
QUERY_JOBS_TABLE_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
Expand Down Expand Up @@ -69,6 +69,7 @@
validate_redis_config,
validate_reducer_config,
validate_results_cache_config,
validate_retention_config,
validate_webui_config,
)

Expand Down Expand Up @@ -1067,7 +1068,7 @@ def start_garbage_collector(
container_clp_config: CLPConfig,
mounts: CLPDockerMounts,
):
component_name = GARBAGE_COLLECTOR_NAME
component_name = GARBAGE_COLLECTOR_COMPONENT_NAME

if not is_retention_period_configured(clp_config):
logger.info(f"Retention period is not configured, skipping {component_name} creation...")
Expand Down Expand Up @@ -1179,7 +1180,7 @@ def main(argv):
reducer_server_parser = component_args_parser.add_parser(REDUCER_COMPONENT_NAME)
add_num_workers_argument(reducer_server_parser)
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME)
component_args_parser.add_parser(GARBAGE_COLLECTOR_COMPONENT_NAME)

parsed_args = args_parser.parse_args(argv[1:])

Expand Down Expand Up @@ -1213,7 +1214,7 @@ def main(argv):
ALL_TARGET_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
GARBAGE_COLLECTOR_NAME,
GARBAGE_COLLECTOR_COMPONENT_NAME,
COMPRESSION_SCHEDULER_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
WEBUI_COMPONENT_NAME,
Expand Down Expand Up @@ -1248,9 +1249,15 @@ def main(argv):
ALL_TARGET_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
GARBAGE_COLLECTOR_NAME,
GARBAGE_COLLECTOR_COMPONENT_NAME,
):
validate_output_storage_config(clp_config)
if target in (
ALL_TARGET_NAME,
CONTROLLER_TARGET_NAME,
GARBAGE_COLLECTOR_COMPONENT_NAME,
):
validate_retention_config(clp_config)

clp_config.validate_data_dir()
clp_config.validate_logs_dir()
Expand Down Expand Up @@ -1330,7 +1337,8 @@ def main(argv):

if WEBUI_COMPONENT_NAME in components_to_start:
start_webui(instance_id, clp_config, container_clp_config, mounts)
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME):

if GARBAGE_COLLECTOR_COMPONENT_NAME in components_to_start:
start_garbage_collector(instance_id, clp_config, container_clp_config, mounts)
Comment on lines +1341 to 1342
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

GC start is correctly gated by components_to_start, but PRESTO currently filters GC out

Given the current implementation of get_runnable_components, PRESTO returns COMPRESSION_COMPONENTS | UI_COMPONENTS, excluding STORAGE_MANAGEMENT_COMPONENTS. That means GC won’t start for PRESTO even when targeted via CONTROLLER or ALL.

This is addressed by my earlier suggestion in clp_config.py to compute PRESTO_RUNNABLE_COMPONENTS = ALL_COMPONENTS - PRESTO_INCOMPATIBLE_COMPONENTS.


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "1) locate clp_config.py"
rg -n --hidden --no-ignore "clp_config.py" || true

echo
echo "2) search for get_runnable_components and PRESTO-related symbols"
rg -n -C3 --hidden --no-ignore "get_runnable_components" || true
rg -n -C3 --hidden --no-ignore "PRESTO_" || true
rg -n -C3 --hidden --no-ignore "STORAGE_MANAGEMENT_COMPONENTS" || true
rg -n -C3 --hidden --no-ignore "COMPRESSION_COMPONENTS" || true
rg -n -C3 --hidden --no-ignore "UI_COMPONENTS" || true
rg -n -C3 --hidden --no-ignore "GARBAGE_COLLECTOR_COMPONENT_NAME" || true

echo
echo "3) show files that define PRESTO runnable/component groups (if found)"
rg -n --hidden --no-ignore "PRESTO" -g '!**/node_modules/**' --files-with-matches | sort -u || true

Length of output: 42289


GC start is correctly gated by components_to_start, but PRESTO currently filters GC out

Verified: clp_config.get_runnable_components returns COMPRESSION_COMPONENTS | UI_COMPONENTS when QueryEngine.PRESTO, which excludes STORAGE_MANAGEMENT_COMPONENTS (garbage_collector). start_clp.py intersects target components with runnable_components before starting, so garbage_collector will not be started for PRESTO.

Please update to compute PRESTO runnable components from ALL_COMPONENTS (so GC can be included when appropriate):

  • Files to inspect/fix:

    • components/clp-py-utils/clp_py_utils/clp_config.py
      • STORAGE_MANAGEMENT_COMPONENTS / ALL_COMPONENTS (around lines ~51–54)
      • get_runnable_components (around lines ~876–880)
    • components/clp-package-utils/clp_package_utils/scripts/start_clp.py
      • intersection of components_to_start with runnable_components (around lines ~1203–1205)
      • GC start call (lines ~1341–1342) — confirmation that gating is correct here
  • Suggested change (example patch) to clp_config.py:

# define which components PRESTO cannot run (adjust the set as appropriate)
PRESTO_INCOMPATIBLE_COMPONENTS = {
    # e.g. QUERY_COMPONENTS or other components known to be incompatible
}

PRESTO_RUNNABLE_COMPONENTS = ALL_COMPONENTS - PRESTO_INCOMPATIBLE_COMPONENTS

def get_runnable_components(self) -> Set[str]:
    if QueryEngine.PRESTO == self.package.query_engine:
        return PRESTO_RUNNABLE_COMPONENTS
    else:
        return ALL_COMPONENTS

Reason: keep the source-of-truth as ALL_COMPONENTS and subtract explicit PRESTO-incompatible pieces to avoid accidentally excluding STORAGE_MANAGEMENT_COMPONENTS (garbage_collector).

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/clp_config.py around lines ~51–54 and
~876–880, change the PRESTO handling so runnable components are computed from
ALL_COMPONENTS minus an explicit PRESTO_INCOMPATIBLE_COMPONENTS set (i.e.,
define PRESTO_INCOMPATIBLE_COMPONENTS and set PRESTO_RUNNABLE_COMPONENTS =
ALL_COMPONENTS - PRESTO_INCOMPATIBLE_COMPONENTS), and update
get_runnable_components to return PRESTO_RUNNABLE_COMPONENTS when query_engine
is PRESTO (otherwise ALL_COMPONENTS); also verify
STORAGE_MANAGEMENT_COMPONENTS/ALL_COMPONENTS definitions include
garbage_collector. In
components/clp-package-utils/clp_package_utils/scripts/start_clp.py around lines
~1203–1205 and confirm lines 1341–1342, keep the intersection of
components_to_start with runnable_components (no change) so that after the
config fix garbage_collector can be included, and ensure the existing gating at
the GC start call remains intact.


except Exception as ex:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
COMPRESSION_WORKER_COMPONENT_NAME,
CONTROLLER_TARGET_NAME,
DB_COMPONENT_NAME,
GARBAGE_COLLECTOR_NAME,
GARBAGE_COLLECTOR_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
QUEUE_COMPONENT_NAME,
Expand Down Expand Up @@ -85,7 +85,7 @@ def main(argv):
component_args_parser.add_parser(COMPRESSION_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(QUERY_WORKER_COMPONENT_NAME)
component_args_parser.add_parser(WEBUI_COMPONENT_NAME)
component_args_parser.add_parser(GARBAGE_COLLECTOR_NAME)
component_args_parser.add_parser(GARBAGE_COLLECTOR_COMPONENT_NAME)

parsed_args = args_parser.parse_args(argv[1:])

Expand Down Expand Up @@ -132,8 +132,8 @@ def main(argv):

already_exited_containers = []
force = parsed_args.force
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_NAME):
container_name = f"clp-{GARBAGE_COLLECTOR_NAME}-{instance_id}"
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_COMPONENT_NAME):
container_name = f"clp-{GARBAGE_COLLECTOR_COMPONENT_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force)
Comment on lines +135 to 137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Clean up the GC container config file after stopping (consistency with other components)

Other components (e.g., reducer, schedulers) remove their generated config files from logs_dir after stopping. Do the same for GC to avoid stale files.

-        if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_COMPONENT_NAME):
-            container_name = f"clp-{GARBAGE_COLLECTOR_COMPONENT_NAME}-{instance_id}"
-            stop_running_container(container_name, already_exited_containers, force)
+        if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_COMPONENT_NAME):
+            container_name = f"clp-{GARBAGE_COLLECTOR_COMPONENT_NAME}-{instance_id}"
+            stop_running_container(container_name, already_exited_containers, force)
+
+            container_config_file_path = logs_dir / f"{container_name}.yml"
+            if container_config_file_path.exists():
+                container_config_file_path.unlink()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_COMPONENT_NAME):
container_name = f"clp-{GARBAGE_COLLECTOR_COMPONENT_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force)
if target in (ALL_TARGET_NAME, GARBAGE_COLLECTOR_COMPONENT_NAME):
container_name = f"clp-{GARBAGE_COLLECTOR_COMPONENT_NAME}-{instance_id}"
stop_running_container(container_name, already_exited_containers, force)
container_config_file_path = logs_dir / f"{container_name}.yml"
if container_config_file_path.exists():
container_config_file_path.unlink()
🤖 Prompt for AI Agents
In components/clp-package-utils/clp_package_utils/scripts/stop_clp.py around
lines 135 to 137, after stopping the GC container you need to also remove its
generated config file from logs_dir to match other components; locate the GC
config filename using the same naming convention used elsewhere (component name
+ instance id, e.g. GARBAGE_COLLECTOR_COMPONENT_NAME and instance_id) or derive
it from container_name, build the full path under logs_dir, check if the file
exists and unlink it, and wrap the remove in a try/except that logs a
debug/error on failure so stale GC config files are cleaned up safely.

if target in (ALL_TARGET_NAME, WEBUI_COMPONENT_NAME):
container_name = f"clp-{WEBUI_COMPONENT_NAME}-{instance_id}"
Expand Down
10 changes: 7 additions & 3 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
COMPRESSION_WORKER_COMPONENT_NAME = "compression_worker"
QUERY_WORKER_COMPONENT_NAME = "query_worker"
WEBUI_COMPONENT_NAME = "webui"
GARBAGE_COLLECTOR_NAME = "garbage_collector"
GARBAGE_COLLECTOR_COMPONENT_NAME = "garbage_collector"

# Component groups
GENERAL_SCHEDULING_COMPONENTS = {
Expand All @@ -48,7 +48,10 @@
RESULTS_CACHE_COMPONENT_NAME,
WEBUI_COMPONENT_NAME,
}
ALL_COMPONENTS = COMPRESSION_COMPONENTS | QUERY_COMPONENTS | UI_COMPONENTS
STORAGE_MANAGEMENT_COMPONENTS = {GARBAGE_COLLECTOR_COMPONENT_NAME}
ALL_COMPONENTS = (
COMPRESSION_COMPONENTS | QUERY_COMPONENTS | UI_COMPONENTS | STORAGE_MANAGEMENT_COMPONENTS
)

# Target names
ALL_TARGET_NAME = ""
Expand All @@ -60,7 +63,8 @@
| {
COMPRESSION_SCHEDULER_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
},
}
| STORAGE_MANAGEMENT_COMPONENTS,
}

QUERY_JOBS_TABLE_NAME = "query_jobs"
Expand Down
4 changes: 2 additions & 2 deletions components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
COMPRESSION_SCHEDULER_COMPONENT_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
FsStorage,
GARBAGE_COLLECTOR_NAME,
GARBAGE_COLLECTOR_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
S3Config,
Expand Down Expand Up @@ -119,7 +119,7 @@ def generate_container_auth_options(
elif component_type in (WEBUI_COMPONENT_NAME,):
output_storages_by_component_type = [clp_config.stream_output.storage]
elif component_type in (
GARBAGE_COLLECTOR_NAME,
GARBAGE_COLLECTOR_COMPONENT_NAME,
QUERY_SCHEDULER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from clp_py_utils.clp_config import (
CLPConfig,
GARBAGE_COLLECTOR_NAME,
GARBAGE_COLLECTOR_COMPONENT_NAME,
)
from clp_py_utils.clp_logging import get_logger
from clp_py_utils.core import read_yaml_config_file
Expand All @@ -24,18 +24,20 @@
from job_orchestration.garbage_collector.utils import configure_logger
from pydantic import ValidationError

logger = get_logger(GARBAGE_COLLECTOR_NAME)
logger = get_logger(GARBAGE_COLLECTOR_COMPONENT_NAME)


async def main(argv: List[str]) -> int:
args_parser = argparse.ArgumentParser(description=f"Spin up the {GARBAGE_COLLECTOR_NAME}.")
args_parser = argparse.ArgumentParser(
description=f"Spin up the {GARBAGE_COLLECTOR_COMPONENT_NAME}."
)
args_parser.add_argument("--config", "-c", required=True, help="CLP configuration file.")
parsed_args = args_parser.parse_args(argv[1:])

# Setup logging to file
logs_directory = Path(os.getenv("CLP_LOGS_DIR"))
logging_level = os.getenv("CLP_LOGGING_LEVEL")
configure_logger(logger, logging_level, logs_directory, GARBAGE_COLLECTOR_NAME)
configure_logger(logger, logging_level, logs_directory, GARBAGE_COLLECTOR_COMPONENT_NAME)

# Load configuration
config_path = Path(parsed_args.config)
Expand Down