Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 110 additions & 17 deletions components/clp-package-utils/clp_package_utils/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import subprocess
import uuid
from abc import ABC, abstractmethod
from enum import auto
from typing import Any, Optional

from clp_py_utils.clp_config import (
Expand All @@ -17,7 +18,6 @@
COMPRESSION_SCHEDULER_COMPONENT_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
DB_COMPONENT_NAME,
DeploymentType,
GARBAGE_COLLECTOR_COMPONENT_NAME,
MCP_SERVER_COMPONENT_NAME,
QUERY_JOBS_TABLE_NAME,
Expand All @@ -37,11 +37,13 @@
get_datasets_table_name,
get_files_table_name,
)
from strenum import KebabCaseStrEnum

from clp_package_utils.general import (
check_docker_dependencies,
CONTAINER_CLP_HOME,
DockerComposeProjectNotRunningError,
DockerComposeProjectAlreadyRunningError,
DockerDependencyError,
dump_shared_container_config,
generate_docker_compose_container_config,
Expand Down Expand Up @@ -74,6 +76,16 @@ def __ior__(self, other: "EnvVarsDict") -> "EnvVarsDict":
return self


class DeploymentTarget(KebabCaseStrEnum):
ALL = auto()
CONTROLLER = auto()
COMPRESSION_WORKER = auto()
QUERY_WORKER = auto()
REDUCER = auto()
WEBUI = auto()
MCP = auto()


class BaseController(ABC):
"""
Base controller for orchestrating CLP components. Derived classes should implement any
Expand Down Expand Up @@ -638,8 +650,74 @@ class DockerComposeController(BaseController):
Controller for orchestrating CLP components using Docker Compose.
"""

def __init__(self, clp_config: CLPConfig, instance_id: str) -> None:
def __init__(
self,
clp_config: CLPConfig,
instance_id: str,
target: DeploymentTarget,
num_workers: Optional[int]
) -> None:
self._project_name = f"clp-package-{instance_id}"

self._target = target
self._is_existing_project_allowed = target != DeploymentTarget.ALL
self._num_workers = num_workers or self._get_num_workers()

self._is_mcp_enabled = clp_config.mcp_server is not None
self._is_legacy_search_enabled = clp_config.package.query_engine != QueryEngine.PRESTO

if target in (
DeploymentTarget.QUERY_WORKER,
DeploymentTarget.REDUCER,
) and not self._is_legacy_search_enabled:
raise ValueError(
"Legacy search components (query-worker/reducer) cannot be launched when the "
"query engine is set to Presto."
)
if target == DeploymentTarget.MCP and not self._is_mcp_enabled:
raise ValueError(
"The MCP server is not configured in the CLP package configuration, so it cannot "
"be launched."
)

# Controllers
self._launch_database = target in (DeploymentTarget.ALL, DeploymentTarget.CONTROLLER)
self._launch_redis = target in (DeploymentTarget.ALL, DeploymentTarget.CONTROLLER)
self._launch_queue = target in (DeploymentTarget.ALL, DeploymentTarget.CONTROLLER)
self._launch_results_cache = target in (DeploymentTarget.ALL, DeploymentTarget.CONTROLLER)
self._launch_compression_scheduler = target in (DeploymentTarget.ALL, DeploymentTarget.CONTROLLER)
self._launch_query_scheduler = target in (DeploymentTarget.ALL, DeploymentTarget.CONTROLLER)
self._launch_garbage_collector = target in (DeploymentTarget.ALL,
DeploymentTarget.CONTROLLER) and is_retention_period_configured(clp_config)

# Workers
self._launch_compression_worker = target in (DeploymentTarget.ALL, DeploymentTarget.COMPRESSION_WORKER)
self._launch_query_worker = target in (DeploymentTarget.ALL, DeploymentTarget.QUERY_WORKER)
self._launch_reducer = target in (DeploymentTarget.ALL, DeploymentTarget.REDUCER)

# Clients
self._launch_webui = target in (DeploymentTarget.ALL, DeploymentTarget.WEBUI)
self._launch_mcp = (target in (DeploymentTarget.ALL,DeploymentTarget.MCP) and self._is_mcp_enabled)
Comment on lines +684 to +700
Copy link
Member Author

@junhaoliao junhaoliao Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of (, if not most of) those computed flags are unused. i was trying to use those to control whether the setups methods should be run or not, but realized we shouldn't disable any setup functions as they may setup dependencies for the other components.

e.g. the db's setup method should not be disabled if we want to run the webui, or the webui might not be able to read the credential env vars


self._compose_profile: Optional[str] = None
self._launch_only_service: Optional[str] = None
if target == DeploymentTarget.CONTROLLER:
self._compose_profile = "controller"
elif target == DeploymentTarget.WEBUI:
self._launch_only_service = "webui"
elif target == DeploymentTarget.MCP:
self._launch_only_service = "mcp-server"
elif target == DeploymentTarget.COMPRESSION_WORKER:
self._launch_only_service = "compression-worker"
elif target == DeploymentTarget.QUERY_WORKER:
self._launch_only_service = "query-worker"
elif target == DeploymentTarget.REDUCER:
self._launch_only_service = "reducer"

self._compose_file_override = (
"docker-compose.base.yaml" if target == DeploymentTarget.CONTROLLER else None
)

super().__init__(clp_config)

def start(self) -> None:
Expand All @@ -649,20 +727,32 @@ def start(self) -> None:
:raise: Propagates `check_docker_dependencies`'s exceptions.
:raise: Propagates `subprocess.run`'s exceptions.
"""
check_docker_dependencies(
should_compose_project_be_running=False, project_name=self._project_name
)
try:
check_docker_dependencies(
should_compose_project_be_running=False, project_name=self._project_name
)
except DockerComposeProjectAlreadyRunningError:
if self._is_existing_project_allowed:
logger.info(
"Docker Compose project '%s' is already running. Adding requested services.",
self._project_name,
)
else:
raise
self._set_up_env()

deployment_type = self._clp_config.get_deployment_type()
logger.info(f"Starting CLP using Docker Compose ({deployment_type} deployment)...")

logger.info(
"Starting CLP using Docker Compose (target=%s)...",
self._target,
)
cmd = ["docker", "compose", "--project-name", self._project_name]
if deployment_type == DeploymentType.BASE:
cmd += ["--file", "docker-compose.base.yaml"]
if self._clp_config.mcp_server is not None:
cmd += ["--profile", "mcp"]
if self._compose_file_override is not None:
cmd += ["--file", self._compose_file_override]
if self._compose_profile is not None:
cmd += ["--profile", self._compose_profile]
cmd += ["up", "--detach", "--wait"]
if self._launch_only_service is not None:
cmd += ["--no-deps", self._launch_only_service]
subprocess.run(
cmd,
cwd=self._clp_home,
Expand Down Expand Up @@ -713,7 +803,6 @@ def _get_num_workers() -> int:
def _set_up_env(self) -> None:
# Generate container-specific config.
container_clp_config = generate_docker_compose_container_config(self._clp_config)
num_workers = self._get_num_workers()
dump_shared_container_config(container_clp_config, self._clp_config)

env_vars = EnvVarsDict()
Expand Down Expand Up @@ -782,12 +871,16 @@ def _set_up_env(self) -> None:
env_vars |= self._set_up_env_for_results_cache()
env_vars |= self._set_up_env_for_compression_scheduler()
env_vars |= self._set_up_env_for_query_scheduler()
env_vars |= self._set_up_env_for_compression_worker(num_workers)
env_vars |= self._set_up_env_for_query_worker(num_workers)
env_vars |= self._set_up_env_for_reducer(num_workers)
env_vars |= self._set_up_env_for_garbage_collector()
env_vars |= self._set_up_env_for_compression_worker(self._num_workers)
env_vars |= self._set_up_env_for_query_worker(self._num_workers)
env_vars |= self._set_up_env_for_reducer(self._num_workers)
env_vars |= self._set_up_env_for_webui(container_clp_config)
env_vars |= self._set_up_env_for_mcp_server()
env_vars |= self._set_up_env_for_garbage_collector()

if not self._launch_garbage_collector:
env_vars["CLP_GARBAGE_COLLECTOR_ENABLED"] = "0"
env_vars["CLP_LEGACY_SEARCH_ENABLED"] = "1" if self._is_legacy_search_enabled else "0"

# Write the environment variables to the `.env` file.
with open(f"{self._clp_home}/.env", "w") as env_file:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

from clp_py_utils.clp_config import CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

from clp_package_utils.controller import DockerComposeController, get_or_create_instance_id
from clp_package_utils.controller import (
DeploymentTarget,
DockerComposeController,
get_or_create_instance_id,
)
from clp_package_utils.general import (
get_clp_home,
load_config_file,
Expand Down Expand Up @@ -38,12 +42,28 @@ def main(argv):
help="Enable debug logging.",
)

parsed_args = args_parser.parse_args(argv[1:])
subparsers = args_parser.add_subparsers(dest="target", help="Deployment target to start.")
for target in DeploymentTarget:
sub = subparsers.add_parser(target.value)

if target in {
DeploymentTarget.COMPRESSION_WORKER,
DeploymentTarget.QUERY_WORKER,
DeploymentTarget.REDUCER,
}:
sub.add_argument(
"--num-workers",
type=int,
help="Set worker concurrency for this target.",
)

parsed_args = args_parser.parse_args(argv[1:])
if parsed_args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
target = parsed_args.target or DeploymentTarget.ALL
num_workers = getattr(parsed_args, "num_workers", None)

try:
# Validate and load config file.
Expand Down Expand Up @@ -78,7 +98,12 @@ def main(argv):

try:
instance_id = get_or_create_instance_id(clp_config)
controller = DockerComposeController(clp_config, instance_id)
controller = DockerComposeController(
clp_config,
instance_id,
target,
num_workers,
)
controller.start()
except Exception as ex:
if type(ex) == ValueError:
Expand Down
11 changes: 0 additions & 11 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@
ZstdCompressionLevel = Annotated[int, Field(ge=1, le=19)]


class DeploymentType(KebabCaseStrEnum):
BASE = auto()
FULL = auto()


class StorageEngine(KebabCaseStrEnum):
CLP = auto()
CLP_S = auto()
Expand Down Expand Up @@ -767,12 +762,6 @@ def load_container_image_ref(self):
def get_shared_config_file_path(self) -> pathlib.Path:
return self.logs_directory / CLP_SHARED_CONFIG_FILENAME

def get_deployment_type(self) -> DeploymentType:
if QueryEngine.PRESTO == self.package.query_engine:
return DeploymentType.BASE
else:
return DeploymentType.FULL

def dump_to_primitive_dict(self):
custom_serialized_fields = {
"database",
Expand Down
44 changes: 33 additions & 11 deletions docs/src/dev-docs/design-deployment-orchestration.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ This section explains how we use Docker Compose to orchestrate the CLP package a
the following subsections:

* [Setting up the Docker Compose project's environment](#setting-up-the-environment)
* [Starting and stoping the Docker Compose project](#starting-and-stopping-the-project)
* [Deployment types](#deployment-types)
* [Starting and stopping the Docker Compose project](#starting-and-stopping-the-project)
* [Targets and profiles](#targets-and-profiles)
* [Implementation details](#implementation-details)
* [Troubleshooting](#troubleshooting)

Expand All @@ -199,19 +199,41 @@ as environment variables or command line arguments, as necessary.

### Starting and stopping the project

To start and stop the project, `DockerComposeController` simply invokes `docker compose up` or
To start and stop the project, `DockerComposeController` invokes `docker compose up` or
`docker compose down` as appropriate. However, to allow multiple CLP packages to be run on the same
host, we explicitly specify a project name for the project, where the name is based on the package's
instance ID.

### Deployment Types

CLP supports two deployment types determined by the `package.query_engine` configuration setting.

1. **BASE**: For deployments using [Presto][presto-integration] as the query engine. This deployment
only uses `docker-compose.base.yaml`.
2. **FULL**: For deployments using one of CLP's native query engines. This uses both
`docker-compose.base.yaml` and `docker-compose.yaml`.
### Targets and profiles

`start-clp.py` exposes a `--target` flag that determines which services are launched on the current
host. If no target is specified, the controller starts the full stack (`docker compose up` using
`docker-compose.yaml`) and, when MCP is configured, enables the `mcp` profile so the MCP server is
included.

For targeted launches, the controller switches to `docker-compose.base.yaml`, activates the relevant
profile, and, for single-service targets, specifies the service name with `--no-deps`. This allows
operators to run CLP across multiple hosts without bringing down existing containers:

| Target | Profile(s) | Services started |
|----------------------|------------|----------------------------------------------------|
| `controller` | `controller` | Core databases, schedulers, garbage collector |
| `ui` | `ui` | `webui` (no dependencies) |
| `mcp` | `mcp` | `mcp-server` (no dependencies) |
| `compression-worker` | `worker` | `compression-worker` (no dependencies) |
| `query-worker` | `worker` | `query-worker` (no dependencies) |
| `reducer` | `worker` | `reducer` (no dependencies) |

`--num-workers` can be supplied with the worker and reducer targets to override the default process
count inside those containers. The CLI can be invoked repeatedly to scale the deployment: each call
adds the requested services to the existing Compose project without interrupting running containers.
`stop-clp.py` continues to tear down the entire project regardless of the target that was used to
start individual services.

When the package's query engine is set to [Presto][presto-integration], legacy search components are
disabled automatically via the `CLP_ENABLE_LEGACY_SEARCH` environment variable. This prevents the
query scheduler, query workers, and reducer from starting unless they are explicitly requested when
legacy search is enabled.

### Implementation details

Expand Down
Loading
Loading