Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/actions/run-on-image/action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ runs:
- run: >-
docker run
--user $(id -u):$(id -g)
--env npm_config_cache=/tmp/.npm
--volume "$GITHUB_WORKSPACE":/mnt/repo
--workdir /mnt/repo
${{steps.get_image_props.outputs.qualified_image_name}}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/clp-artifact-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ jobs:
|| (github.event_name != 'pull_request' && github.ref == 'refs/heads/main')}}
run_command: >-
CLP_CPP_MAX_PARALLELISM_PER_BUILD_TASK=$(getconf _NPROCESSORS_ONLN)
HOME=/tmp
task package-build-deps

- uses: "./.github/actions/clp-build-runtime-image"
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion components/api-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ futures = "0.3.31"
mongodb = "3.3.0"
num_enum = "0.7.5"
rmp-serde = "1.3.0"
secrecy = "0.10.3"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
sqlx = { version = "0.8.6", features = ["runtime-tokio", "mysql"] }
thiserror = "2.0.17"
tokio = { version = "1.48.0", features = ["full"] }
tracing = "0.1.41"
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3.20", features = ["json", "env-filter"] }
tracing-subscriber = { version = "0.3.20", features = ["json", "env-filter", "fmt", "std"] }
87 changes: 60 additions & 27 deletions components/api-server/src/bin/api_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,81 @@ use clp_rust_utils::{clp_config::package, serde::yaml};
use futures::{Stream, StreamExt};
use thiserror::Error;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{self};
use tracing_subscriber::{self, fmt::writer::MakeWriterExt};

#[derive(Parser)]
#[command(version, about = "API Server for CLP.")]
struct Args {}
struct Args {
#[arg(long)]
config: String,

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _ = Args::parse();
let home = std::env::var("CLP_HOME").context("Expect `CLP_HOME` env variable")?;
let home = std::path::Path::new(&home);
#[arg(long)]
host: Option<String>,

let config_path = home.join(package::DEFAULT_CONFIG_FILE_PATH);
let config: package::config::Config = yaml::from_path(&config_path).context(format!(
#[arg(long)]
port: Option<u16>,
}

fn read_config_and_credentials(
args: &Args,
) -> anyhow::Result<(package::config::Config, package::credentials::Credentials)> {
let config_path = std::path::Path::new(args.config.as_str());
let config: package::config::Config = yaml::from_path(config_path).context(format!(
"Config file {} does not exist",
config_path.display()
))?;

let file_appender = RollingFileAppender::new(
Rotation::HOURLY,
home.join(&config.logs_directory).join("api_server"),
"api_server.log",
);
let credentials = package::credentials::Credentials {
database: package::credentials::Database {
password: secrecy::SecretString::new(
std::env::var("CLP_DB_PASS")
.context("Expect `CLP_DB_PASS` env variable")?
.into_boxed_str(),
),
user: std::env::var("CLP_DB_USER").context("Expect `CLP_DB_USER` env variable")?,
},
};
Ok((config, credentials))
}

fn set_up_logging() -> anyhow::Result<()> {
let logs_directory =
std::env::var("CLP_LOGS_DIR").context("Expect `CLP_LOGS_DIR` environment variable.")?;
let logs_directory = std::path::Path::new(logs_directory.as_str());
let file_appender =
RollingFileAppender::new(Rotation::HOURLY, logs_directory, "api_server.log");
let (non_blocking_writer, _guard) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_ansi(false)
.with_writer(non_blocking_writer)
.with_writer(std::io::stdout.and(non_blocking_writer))
.init();
Ok(())
}

let credentials_path = home.join(package::DEFAULT_CREDENTIALS_FILE_PATH);
let credentials: package::credentials::Credentials = yaml::from_path(&credentials_path)
.context(format!(
"Credentials file {} does not exist",
credentials_path.display()
))?;
async fn shutdown_signal() {
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to listen for SIGTERM");
tokio::select! {
_ = sigterm.recv() => {
}
_ = tokio::signal::ctrl_c()=> {
}
}
}

let addr = format!("{}:{}", &config.api_server.host, &config.api_server.port);
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();

let (config, credentials) = read_config_and_credentials(&args)?;
set_up_logging()?;

let addr = format!(
"{}:{}",
args.host.unwrap_or_else(|| config.api_server.host.clone()),
args.port.unwrap_or(config.api_server.port)
);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.context(format!("Cannot listen to {addr}"))?;
Expand All @@ -71,11 +108,7 @@ async fn main() -> anyhow::Result<()> {

tracing::info!("Server started at {addr}");
axum::serve(listener, app)
.with_graceful_shutdown(async {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for event");
})
.with_graceful_shutdown(shutdown_signal())
.await?;
Ok(())
}
Expand Down
26 changes: 26 additions & 0 deletions components/clp-package-utils/clp_package_utils/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, Optional

from clp_py_utils.clp_config import (
API_SERVER_COMPONENT_NAME,
AwsAuthType,
CLPConfig,
COMPRESSION_JOBS_TABLE_NAME,
Expand Down Expand Up @@ -425,6 +426,30 @@ def _set_up_env_for_reducer(self, num_workers: int) -> EnvVarsDict:

return env_vars

def _set_up_env_for_api_server(self) -> EnvVarsDict:
"""
Sets up environment variables and directories for the API server component.

:return: Dictionary of environment variables necessary to launch the component.
"""
component_name = API_SERVER_COMPONENT_NAME

logger.info(f"Setting up environment for {component_name}...")

logs_dir = self._clp_config.logs_directory / component_name
resolved_logs_dir = resolve_host_path_in_container(logs_dir)
resolved_logs_dir.mkdir(parents=True, exist_ok=True)

env_vars = EnvVarsDict()

# Connection config
env_vars |= {
"CLP_API_SERVER_HOST": _get_ip_from_hostname(self._clp_config.api_server.host),
"CLP_API_SERVER_PORT": str(self._clp_config.api_server.port),
}

return env_vars

def _set_up_env_for_webui(self, container_clp_config: CLPConfig) -> EnvVarsDict:
"""
Sets up environment variables and settings for the Web UI component.
Expand Down Expand Up @@ -747,6 +772,7 @@ def set_up_env(self) -> None:
env_vars |= self._set_up_env_for_webui(container_clp_config)
env_vars |= self._set_up_env_for_mcp_server()
env_vars |= self._set_up_env_for_garbage_collector()
env_vars |= self._set_up_env_for_api_server()

# Write the environment variables to the `.env` file.
with open(f"{self._clp_home}/.env", "w") as env_file:
Expand Down
15 changes: 15 additions & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
WEBUI_COMPONENT_NAME = "webui"
MCP_SERVER_COMPONENT_NAME = "mcp_server"
GARBAGE_COLLECTOR_COMPONENT_NAME = "garbage_collector"
API_SERVER_COMPONENT_NAME = "api_server"

# Action names
ARCHIVE_MANAGER_ACTION_NAME = "archive_manager"
Expand Down Expand Up @@ -592,6 +593,19 @@ class GarbageCollector(BaseModel):
sweep_interval: SweepInterval = SweepInterval()


class QueryJobPollingConfig(BaseModel):
initial_backoff_ms: int = Field(default=100, alias="initial_backoff")

max_backoff_ms: int = Field(default=5000, alias="max_backoff")


class ApiServer(BaseModel):
host: DomainStr = "localhost"
port: Port = 3001
query_job_polling: QueryJobPollingConfig = QueryJobPollingConfig()
default_max_num_query_results: int = 1000


class Presto(BaseModel):
DEFAULT_PORT: ClassVar[int] = 8080

Expand Down Expand Up @@ -627,6 +641,7 @@ class CLPConfig(BaseModel):
query_worker: QueryWorker = QueryWorker()
webui: WebUi = WebUi()
garbage_collector: GarbageCollector = GarbageCollector()
api_server: ApiServer = ApiServer()
credentials_file_path: SerializablePath = CLP_DEFAULT_CREDENTIALS_FILE_PATH

mcp_server: Optional[McpServer] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ impl Default for ApiServer {
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct QueryJobPollingConfig {
#[serde(rename = "initial_backoff")]
pub initial_backoff_ms: u64,
Expand Down
9 changes: 9 additions & 0 deletions components/package-template/src/etc/clp-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@
# archive: 60
# search_result: 30
#
## API server config
#api_server:
# host: "localhost"
# port: 3001
# default_max_num_query_results: 1000
# query_job_polling:
# initial_backoff_ms: 100
# max_backoff_ms: 5000
#
## Presto client config
#presto: null
#
Expand Down
12 changes: 8 additions & 4 deletions docs/src/dev-docs/design-deployment-orchestration.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ graph LR
compression_worker["compression-worker"]
query_worker["query-worker"]
reducer["reducer"]
api_server["api-server"]
garbage_collector["garbage-collector"]
Copy link
Contributor Author

@hoophalab hoophalab Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to make the nodes in Management & UI section follow the order api_server, garbage_collector, webui. api_server and garbage_collector are management nodes.

webui["webui"]
mcp_server["mcp-server"]

garbage_collector["garbage-collector"]

%% One-time jobs
db_table_creator["db-table-creator"]
results_cache_indices_creator["results-cache-indices-creator"]
Expand All @@ -64,6 +64,8 @@ graph LR
redis -->|healthy| query_scheduler
query_scheduler -->|healthy| reducer
results_cache_indices_creator -->|completed_successfully| reducer
db_table_creator -->|completed_successfully| api_server
results_cache_indices_creator -->|completed_successfully| api_server
db_table_creator -->|completed_successfully| webui
results_cache_indices_creator -->|completed_successfully| webui
db_table_creator -->|completed_successfully| mcp_server
Expand Down Expand Up @@ -94,9 +96,10 @@ graph LR
reducer
end

subgraph UI & Management
webui
subgraph Management & UI
api_server
garbage_collector
webui
end

subgraph AI
Expand Down Expand Up @@ -125,6 +128,7 @@ graph LR
| compression_worker | Worker processes for compression jobs |
| query_worker | Worker processes for search/aggregation jobs |
| reducer | Reducers for performing the final stages of aggregation jobs |
| api_server | API server for submitting queries |
| webui | Web server for the UI |
| mcp_server | MCP server for AI agent to access CLP functionalities |
| garbage_collector | Process to manage data retention |
Expand Down
6 changes: 6 additions & 0 deletions docs/src/user-docs/guides-multi-host.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ docker compose \
up query-scheduler \
--no-deps --wait

# Start API server
docker compose \
--project-name "clp-package-$(cat var/log/instance-id)" \
up webui \
--no-deps --wait

# Start webui
docker compose \
--project-name "clp-package-$(cat var/log/instance-id)" \
Expand Down
18 changes: 17 additions & 1 deletion taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ vars:
G_PACKAGE_BUILD_DIR: "{{.G_BUILD_DIR}}/clp-package"
G_PACKAGE_VENV_DIR: "{{.G_BUILD_DIR}}/package-venv"
G_PYTHON_LIBS_DIR: "{{.G_BUILD_DIR}}/python-libs"
G_WEBUI_BUILD_DIR: "{{.G_BUILD_DIR}}/webui"
G_RUST_BUILD_DIR: "{{.G_BUILD_DIR}}/rust"
G_SPIDER_BUILD_DIR: "{{.G_BUILD_DIR}}/spider"
G_WEBUI_BUILD_DIR: "{{.G_BUILD_DIR}}/webui"

# Taskfile paths
G_UTILS_TASKFILE: "{{.ROOT_DIR}}/tools/yscope-dev-utils/exports/taskfiles/utils/utils.yaml"
Expand Down Expand Up @@ -72,6 +73,7 @@ tasks:
- task: "clean-python-component"
vars:
COMPONENT: "job-orchestration"
- task: "clean-rust"
- task: "clean-webui"
- task: "tests:integration:cache-clear"

Expand All @@ -95,6 +97,10 @@ tasks:
FLAVOUR: "text"
STORAGE_ENGINE: "clp"

clean-rust:
cmds:
- "rm -rf '{{.G_RUST_BUILD_DIR}}'"

clean-webui:
cmds:
- "rm -rf '{{.G_WEBUI_SRC_DIR}}/client/node_modules'"
Expand Down Expand Up @@ -191,6 +197,15 @@ tasks:
JOBS: "{{.G_CPP_MAX_PARALLELISM_PER_BUILD_TASK}}"
TARGETS: ["clg", "clo", "clp", "clp-s", "indexer", "log-converter", "reducer-server"]

# NOTE: We rely on cargo for checksumming. It automatically rebuilds when source files change or
# when a binary is missing.
rust:
deps: ["toolchains:rust"]
dir: "{{.ROOT_DIR}}"
cmd: |-
. "$HOME/.cargo/env"
cargo build --release --bins --target-dir {{.G_RUST_BUILD_DIR}}

clp-mcp-server:
- task: "uv-component"
vars:
Expand All @@ -217,6 +232,7 @@ tasks:
- "deps:spider"
- "init"
- "python-libs"
- "rust"
- "webui"

webui:
Expand Down
Loading
Loading