Skip to content

Commit ef19fba

Browse files
junhaoliaoclaude
andcommitted
Merge branch 'main' into k8s-presto
Resolve conflicts: - Chart.yaml: Accept main's version (0.2.1-dev.0) - values.yaml: Keep prestoWorker section, drop storage.storageClassName (host-path PVs removed on main) - set-up-multi-{dedicated,shared}-test.sh: Combine Presto node support with main's get_image_helm_args - guides-k8s-deployment.md: Keep Presto link references - Accept deletion of all host-path PV/PVC files (query-scheduler-logs, query-worker-logs, query-worker-staged-streams, reducer-logs) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2 parents 00d7292 + 7306153 commit ef19fba

File tree

311 files changed

+9205
-3919
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

311 files changed

+9205
-3919
lines changed

Cargo.lock

Lines changed: 26 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/api-server/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,5 @@ thiserror = "2.0.18"
3636
tokio = { version = "1.49.0", features = ["full"] }
3737
tower-http = { version = "0.6.8", features = ["cors"] }
3838
tracing = "0.1.44"
39-
tracing-appender = "0.2.4"
40-
tracing-subscriber = { version = "0.3.22", features = ["json", "env-filter", "fmt", "std"] }
4139
utoipa = { version = "5.4.0", features = ["axum_extras"] }
4240
utoipa-axum = "0.2.0"

components/api-server/src/bin/api_server.rs

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
use anyhow::Context;
22
use clap::Parser;
33
use clp_rust_utils::{clp_config::package, serde::yaml};
4-
use tracing_appender::{
5-
non_blocking::WorkerGuard,
6-
rolling::{RollingFileAppender, Rotation},
7-
};
8-
use tracing_subscriber::{self, fmt::writer::MakeWriterExt};
94

105
#[derive(Parser)]
116
#[command(version, about = "API Server for CLP.")]
@@ -40,29 +35,6 @@ fn read_config_and_credentials(
4035
Ok((config, credentials))
4136
}
4237

43-
fn set_up_logging() -> anyhow::Result<WorkerGuard> {
44-
let logs_directory =
45-
std::env::var("CLP_LOGS_DIR").context("Expect `CLP_LOGS_DIR` environment variable.")?;
46-
let logs_directory = std::path::Path::new(logs_directory.as_str());
47-
let file_appender =
48-
RollingFileAppender::new(Rotation::HOURLY, logs_directory, "api_server.log");
49-
let (non_blocking_writer, guard) = tracing_appender::non_blocking(file_appender);
50-
tracing_subscriber::fmt()
51-
.event_format(
52-
tracing_subscriber::fmt::format()
53-
.with_level(true)
54-
.with_target(false)
55-
.with_file(true)
56-
.with_line_number(true)
57-
.json(),
58-
)
59-
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
60-
.with_ansi(false)
61-
.with_writer(std::io::stdout.and(non_blocking_writer))
62-
.init();
63-
Ok(guard)
64-
}
65-
6638
async fn shutdown_signal() {
6739
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
6840
.expect("failed to listen for SIGTERM");
@@ -79,7 +51,7 @@ async fn main() -> anyhow::Result<()> {
7951
let args = Args::parse();
8052

8153
let (config, credentials) = read_config_and_credentials(&args)?;
82-
let _guard = set_up_logging()?;
54+
let _guard = clp_rust_utils::logging::set_up_logging("api_server.log");
8355

8456
let api_server_config = config
8557
.api_server

components/api-server/src/client.rs

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ pub struct QueryConfig {
2828
/// The search query as a KQL string.
2929
pub query_string: String,
3030

31-
/// The dataset to search within. If not provided, only `default` dataset will be searched.
31+
/// The datasets to search within. If not provided, only `default` dataset will be searched.
3232
#[serde(default)]
33-
pub dataset: Option<String>,
33+
pub datasets: Option<Vec<String>>,
3434

3535
/// The maximum number of results to return. Set to `0` for no limit.
3636
#[serde(default)]
@@ -58,7 +58,7 @@ pub struct QueryConfig {
5858
impl From<QueryConfig> for SearchJobConfig {
5959
fn from(value: QueryConfig) -> Self {
6060
Self {
61-
dataset: value.dataset,
61+
datasets: value.datasets,
6262
query_string: value.query_string,
6363
max_num_results: value.max_num_results,
6464
begin_timestamp: value.time_range_begin_millisecs,
@@ -128,10 +128,10 @@ impl Client {
128128
/// * Forwards [`sqlx::query::Query::execute`]'s return values on failure.
129129
pub async fn submit_query(&self, query_config: QueryConfig) -> Result<u64, ClientError> {
130130
let mut search_job_config: SearchJobConfig = query_config.into();
131-
if search_job_config.dataset.is_none() {
132-
search_job_config.dataset = match self.config.package.storage_engine {
131+
if search_job_config.datasets.is_none() {
132+
search_job_config.datasets = match self.config.package.storage_engine {
133133
StorageEngine::Clp => None,
134-
StorageEngine::ClpS => Some("default".to_owned()),
134+
StorageEngine::ClpS => Some(vec!["default".to_owned()]),
135135
}
136136
}
137137
if search_job_config.max_num_results == 0 {
@@ -483,6 +483,52 @@ impl Client {
483483
Ok(mapped)
484484
}
485485

486+
/// Retrieves timestamp column names for a given dataset.
487+
///
488+
/// # Returns
489+
///
490+
/// A vector of timestamp column name strings on success.
491+
///
492+
/// # Errors
493+
///
494+
/// Returns an error if:
495+
///
496+
/// * [`ClientError::InvalidDatasetName`] if the dataset name contains invalid characters.
497+
/// * [`ClientError::DatasetNotFound`] if the dataset's column metadata table doesn't exist.
498+
/// * Forwards [`sqlx::query::Query::fetch_all`]'s return values on failure.
499+
pub async fn get_timestamp_column_names(
500+
&self,
501+
dataset_name: &str,
502+
) -> Result<Vec<String>, ClientError> {
503+
// Must be kept in sync with `NodeType::Timestamp` in
504+
// `components/core/src/clp_s/SchemaTree.hpp`.
505+
const TIMESTAMP_NODE_TYPE: i8 = 14;
506+
// MySQL error number for "Table doesn't exist".
507+
const MYSQL_TABLE_NOT_FOUND: u16 = 1146;
508+
509+
if !clp_rust_utils::dataset::VALID_DATASET_NAME_REGEX.is_match(dataset_name) {
510+
return Err(ClientError::InvalidDatasetName);
511+
}
512+
let table_name = format!("clp_{dataset_name}_column_metadata");
513+
let names: Vec<String> =
514+
sqlx::query_scalar(&format!("SELECT name FROM `{table_name}` WHERE type = ?"))
515+
.bind(TIMESTAMP_NODE_TYPE)
516+
.fetch_all(&self.sql_pool)
517+
.await
518+
.map_err(|err| {
519+
if let sqlx::Error::Database(db_err) = &err
520+
&& let Some(mysql_err) =
521+
db_err.try_downcast_ref::<sqlx::mysql::MySqlDatabaseError>()
522+
&& mysql_err.number() == MYSQL_TABLE_NOT_FOUND
523+
{
524+
return ClientError::DatasetNotFound(dataset_name.to_owned());
525+
}
526+
err.into()
527+
})?;
528+
529+
Ok(names)
530+
}
531+
486532
/// # Returns
487533
///
488534
/// A reference to the API server configuration.

components/api-server/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ pub enum ClientError {
2727

2828
#[error("Search job not found: {0}")]
2929
SearchJobNotFound(u64),
30+
31+
#[error("Invalid dataset name")]
32+
InvalidDatasetName,
33+
34+
#[error("Dataset not found: {0}")]
35+
DatasetNotFound(String),
3036
}
3137

3238
/// Empty trait to mark errors that indicate malformed data.

components/api-server/src/routes.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ pub fn from_client(client: Client) -> Result<axum::Router, serde_json::Error> {
3636
.routes(routes!(query))
3737
.routes(routes!(query_results))
3838
.routes(routes!(cancel_query))
39+
.route(
40+
"/column_metadata/{dataset_name}/timestamp",
41+
get(get_timestamp_column_names),
42+
)
3943
.with_state(client)
4044
.split_for_parts();
4145
let api_json = api.to_json()?;
@@ -85,7 +89,7 @@ async fn health() -> String {
8589
content= QueryConfig,
8690
example = json!({
8791
"query_string": "*",
88-
"dataset": "default",
92+
"datasets": ["default"],
8993
"time_range_begin_millisecs": 0,
9094
"time_range_end_millisecs": 17_356_896,
9195
"ignore_case": true,
@@ -221,13 +225,33 @@ async fn cancel_query(
221225
}
222226
}
223227

228+
async fn get_timestamp_column_names(
229+
State(client): State<Client>,
230+
Path(dataset_name): Path<String>,
231+
) -> Result<Json<Vec<String>>, HandlerError> {
232+
let names = client
233+
.get_timestamp_column_names(&dataset_name)
234+
.await
235+
.map_err(|err| {
236+
tracing::error!(
237+
"Failed to get timestamp column names for dataset '{}': {:?}",
238+
dataset_name,
239+
err
240+
);
241+
HandlerError::from(err)
242+
})?;
243+
Ok(Json(names))
244+
}
245+
224246
/// Generic errors for request handlers.
225247
#[derive(Error, Debug)]
226248
enum HandlerError {
227249
#[error("Internal server error")]
228250
InternalServer,
229251
#[error("Not found")]
230252
NotFound,
253+
#[error("Bad request: {0}")]
254+
BadRequest(String),
231255
}
232256

233257
impl From<axum::Error> for HandlerError {
@@ -239,7 +263,8 @@ impl From<axum::Error> for HandlerError {
239263
impl From<ClientError> for HandlerError {
240264
fn from(err: ClientError) -> Self {
241265
match err {
242-
ClientError::SearchJobNotFound(_) => Self::NotFound,
266+
ClientError::SearchJobNotFound(_) | ClientError::DatasetNotFound(_) => Self::NotFound,
267+
ClientError::InvalidDatasetName => Self::BadRequest(format!("{err}")),
243268
_ => Self::InternalServer,
244269
}
245270
}
@@ -251,6 +276,7 @@ impl IntoResponse for HandlerError {
251276
match self {
252277
Self::NotFound => StatusCode::NOT_FOUND.into_response(),
253278
Self::InternalServer => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
279+
Self::BadRequest(msg) => (StatusCode::BAD_REQUEST, msg).into_response(),
254280
}
255281
}
256282
}

components/clp-mcp-server/clp_mcp_server/clp_connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async def submit_query(
6060
job_config = msgpack.packb(
6161
{
6262
"begin_timestamp": begin_ts,
63-
"dataset": CLP_DEFAULT_DATASET_NAME,
63+
"datasets": [CLP_DEFAULT_DATASET_NAME],
6464
"end_timestamp": end_ts,
6565
"ignore_case": True,
6666
"max_num_results": SEARCH_MAX_NUM_RESULTS,

components/clp-mcp-server/clp_mcp_server/clp_mcp_server.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
"""CLP MCP Server entry point."""
22

33
import ipaddress
4-
import logging
5-
import os
64
import socket
75
import sys
86
from pathlib import Path
97

108
import click
119
from clp_py_utils.clp_config import ClpConfig, MCP_SERVER_COMPONENT_NAME
12-
from clp_py_utils.clp_logging import get_logger, get_logging_formatter, set_logging_level
10+
from clp_py_utils.clp_logging import configure_logging, get_logger
1311
from clp_py_utils.core import read_yaml_config_file
1412
from pydantic import ValidationError
1513

@@ -38,12 +36,8 @@ def main(host: str, port: int, config_path: Path) -> int:
3836
:param config_path: The path to server's configuration file.
3937
:return: Exit code (0 for success, non-zero for failure).
4038
"""
41-
# Setup logging to file
42-
log_file_path = Path(os.getenv("CLP_LOGS_DIR")) / "mcp_server.log"
43-
logging_file_handler = logging.FileHandler(filename=log_file_path, encoding="utf-8")
44-
logging_file_handler.setFormatter(get_logging_formatter())
45-
logger.addHandler(logging_file_handler)
46-
set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL"))
39+
# Setup optional file logging and logging level.
40+
configure_logging(logger, "mcp_server")
4741

4842
exit_code = 0
4943

components/clp-mcp-server/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "clp-mcp-server"
3-
version = "0.9.1-dev"
3+
version = "0.10.1-dev"
44
description = "MCP server for CLP"
55
authors = [{name = "YScope Inc.", email = "dev@yscope.com"}]
66
readme = "README.md"

components/clp-mcp-server/uv.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)