From 45baa37e2774f149825b59897eac34b7d10564cb Mon Sep 17 00:00:00 2001 From: Rakhman Asmatullayev Date: Fri, 5 Dec 2025 18:11:19 +0000 Subject: [PATCH 1/4] clickhouse routing for client hosted data plane --- app-server/Cargo.lock | 40 ++++ app-server/Cargo.toml | 1 + app-server/src/api/v1/sql.rs | 46 ++++- app-server/src/db/projects.rs | 50 +++++ app-server/src/main.rs | 8 + app-server/src/sql/mod.rs | 175 +++++++++++++++--- app-server/src/traces/consumer.rs | 133 +++++++++++-- app-server/src/utils/mod.rs | 84 +++++++++ .../db/migrations/0062_deployment_mode.sql | 2 + 9 files changed, 494 insertions(+), 45 deletions(-) create mode 100644 frontend/lib/db/migrations/0062_deployment_mode.sql diff --git a/app-server/Cargo.lock b/app-server/Cargo.lock index 205d16152..02b7f2861 100644 --- a/app-server/Cargo.lock +++ b/app-server/Cargo.lock @@ -393,6 +393,7 @@ dependencies = [ "futures-util", "indexmap", "itertools 0.14.0", + "jsonwebtoken", "lapin", "log", "moka", @@ -2289,8 +2290,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.1+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -2897,6 +2900,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "keccak" version = "0.1.5" @@ -3486,6 +3504,16 @@ dependencies = [ "hmac", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -4645,6 +4673,18 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simple_asn1" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror 2.0.16", + "time", +] + [[package]] name = "slab" version = "0.4.11" diff --git a/app-server/Cargo.toml b/app-server/Cargo.toml index 167fa4335..64a4356ab 100644 --- a/app-server/Cargo.toml +++ b/app-server/Cargo.toml @@ -26,6 +26,7 @@ enum_dispatch = "0.3.13" futures-util = "0.3" indexmap = {version = "2.11.4", features = ["serde"]} itertools = "0.14.0" +jsonwebtoken = "9" lapin = "3.0.0" log = "0.4.28" moka = {version = "0.12.10", features = ["sync", "future"]} diff --git a/app-server/src/api/v1/sql.rs b/app-server/src/api/v1/sql.rs index 30bedcfeb..e47fdbbca 100644 --- a/app-server/src/api/v1/sql.rs +++ b/app-server/src/api/v1/sql.rs @@ -8,7 +8,11 @@ use opentelemetry::{ use serde::{Deserialize, Serialize}; use crate::{ - db::project_api_keys::ProjectApiKey, + db::{ + DB, + project_api_keys::ProjectApiKey, + projects::{DeploymentMode, get_workspace_by_project_id}, + }, query_engine::QueryEngine, sql::{self, ClickhouseReadonlyClient}, }; @@ -31,8 +35,10 @@ pub struct SqlQueryResponse { pub async fn execute_sql_query( req: web::Json, project_api_key: ProjectApiKey, + db: web::Data, clickhouse_ro: web::Data>>, query_engine: web::Data>, + http_client: web::Data>, ) -> ResponseResult { let project_id = project_api_key.project_id; let SqlQueryRequest { query } = req.into_inner(); @@ -41,13 +47,40 @@ pub async fn execute_sql_query( let span = tracer.start("api_sql_query"); let _guard = mark_span_as_active(span); - match clickhouse_ro.as_ref() { - Some(ro_client) => { - match sql::execute_sql_query( + // Fetch workspace info for routing + let workspace = get_workspace_by_project_id(&db.pool, &project_id) + .await + .map_err(|e| anyhow::anyhow!("Failed to get workspace: {}", e))?; + + match workspace.deployment_mode { + DeploymentMode::CLOUD | DeploymentMode::SELF_HOST => match clickhouse_ro.as_ref() { + Some(ro_client) => { + match sql::execute_sql_query( + query, + project_id, + HashMap::new(), + ro_client.clone(), + query_engine.into_inner().as_ref().clone(), + ) + .await + { + Ok(result_json) => { + Ok(HttpResponse::Ok().json(SqlQueryResponse { data: result_json })) + } + Err(e) => Err(e.into()), + } + } + None => Err(anyhow::anyhow!("ClickHouse read-only client is not configured.").into()), + }, + DeploymentMode::HYBRID => { + match sql::execute_sql_query_on_data_plane( query, project_id, - HashMap::new(), - ro_client.clone(), + workspace.id, + workspace + .data_plane_url + .ok_or_else(|| anyhow::anyhow!("Data plane URL is not set"))?, + http_client.into_inner().as_ref().clone(), query_engine.into_inner().as_ref().clone(), ) .await @@ -58,6 +91,5 @@ pub async fn execute_sql_query( Err(e) => Err(e.into()), } } - None => Err(anyhow::anyhow!("ClickHouse read-only client is not configured.").into()), } } diff --git a/app-server/src/db/projects.rs b/app-server/src/db/projects.rs index da9f4c29c..be7424d93 100644 --- a/app-server/src/db/projects.rs +++ b/app-server/src/db/projects.rs @@ -4,6 +4,16 @@ use serde::{Deserialize, Serialize}; use sqlx::{FromRow, PgPool}; use uuid::Uuid; +#[derive(sqlx::Type, Deserialize, Serialize, PartialEq, Clone, Debug, Default)] +#[sqlx(type_name = "deployment_mode")] +pub enum DeploymentMode { + #[default] + CLOUD, + HYBRID, + #[allow(non_camel_case_types)] + SELF_HOST, +} + #[derive(Deserialize, Serialize, FromRow, Clone)] #[serde(rename_all = "camelCase")] pub struct ProjectWithWorkspaceBillingInfo { @@ -50,3 +60,43 @@ pub async fn get_project_and_workspace_billing_info( Ok(result) } + +#[derive(Deserialize, Serialize, FromRow, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Workspace { + pub id: Uuid, + pub created_at: DateTime, + pub name: String, + pub tier_id: i64, + pub subscription_id: Option, + pub additional_seats: i64, + pub reset_time: DateTime, + pub deployment_mode: DeploymentMode, + pub data_plane_url: Option, +} + +pub async fn get_workspace_by_project_id(pool: &PgPool, project_id: &Uuid) -> Result { + let result = sqlx::query_as::<_, Workspace>( + " + SELECT + workspaces.id, + workspaces.created_at, + workspaces.name, + workspaces.tier_id, + workspaces.subscription_id, + workspaces.additional_seats, + workspaces.reset_time, + workspaces.deployment_mode, + workspaces.data_plane_url + FROM + workspaces + JOIN projects ON projects.workspace_id = workspaces.id + WHERE + projects.id = $1", + ) + .bind(project_id) + .fetch_one(pool) + .await?; + + Ok(result) +} diff --git a/app-server/src/main.rs b/app-server/src/main.rs index 93674c5be..a4a27a30c 100644 --- a/app-server/src/main.rs +++ b/app-server/src/main.rs @@ -612,9 +612,13 @@ fn main() -> anyhow::Result<()> { } }; + // == HTTP client == + let http_client = Arc::new(reqwest::Client::new()); + let clickhouse_for_http = clickhouse.clone(); let storage_for_http = storage.clone(); let sse_connections_for_http = sse_connections.clone(); + let http_client_for_http = http_client.clone(); if !enable_producer() && !enable_consumer() { log::error!( @@ -732,6 +736,7 @@ fn main() -> anyhow::Result<()> { let storage_for_consumer = storage.clone(); let quickwit_client_for_consumer = quickwit_client.clone(); let pubsub_for_consumer = pubsub.clone(); + let http_client_for_consumer = http_client.clone(); let consumer_handle = thread::Builder::new() .name("consumer".to_string()) .spawn(move || { @@ -761,6 +766,7 @@ fn main() -> anyhow::Result<()> { let ch_clone = clickhouse_for_consumer.clone(); let storage_clone = storage_for_consumer.clone(); let pubsub_clone = pubsub_for_consumer.clone(); + let http_client_clone = http_client_for_consumer.clone(); tokio::spawn(async move { let _handle = worker_handle; // Keep handle alive for the worker's lifetime @@ -771,6 +777,7 @@ fn main() -> anyhow::Result<()> { ch_clone, storage_clone, pubsub_clone, + http_client_clone, ) .await; }); @@ -936,6 +943,7 @@ fn main() -> anyhow::Result<()> { .app_data(web::Data::new(query_engine.clone())) .app_data(web::Data::new(sse_connections_for_http.clone())) .app_data(web::Data::new(quickwit_client.clone())) + .app_data(web::Data::new(http_client_for_http.clone())) // Ingestion endpoints allow both default and ingest-only keys .service( web::scope("/v1/browser-sessions").service( diff --git a/app-server/src/sql/mod.rs b/app-server/src/sql/mod.rs index 75e4c968e..cc89a2d4b 100644 --- a/app-server/src/sql/mod.rs +++ b/app-server/src/sql/mod.rs @@ -5,7 +5,7 @@ use opentelemetry::{ trace::{Span, Tracer}, }; use regex::Regex; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::{ collections::HashMap, @@ -14,7 +14,10 @@ use std::{ }; use uuid::Uuid; -use crate::query_engine::{QueryEngine, QueryEngineTrait, QueryEngineValidationResult}; +use crate::{ + query_engine::{QueryEngine, QueryEngineTrait, QueryEngineValidationResult}, + utils::generate_data_plane_jwt, +}; pub struct ClickhouseReadonlyClient(clickhouse::Client); @@ -93,32 +96,15 @@ pub async fn execute_sql_query( ) -> Result, SqlQueryError> { let tracer = global::tracer("app-server"); - let mut span = tracer.start("call_query_engine"); - span.set_attribute(KeyValue::new("sql.query", query.clone())); - span.set_attribute(KeyValue::new("project_id", project_id.to_string())); - let validation_result = query_engine.validate_query(query, project_id).await; - - let validated_query = match validation_result { - Ok(QueryEngineValidationResult::Success { validated_query }) => validated_query, - Ok(QueryEngineValidationResult::Error { error }) => { - span.record_error(&std::io::Error::new( - std::io::ErrorKind::Other, - error.clone(), - )); - span.end(); - return Err(SqlQueryError::ValidationError(error)); - } + // Validate query first + let validated_query = match validate_query(query, project_id, query_engine).await { + Ok(validated_query) => validated_query, Err(e) => { - span.record_error(&std::io::Error::new( - std::io::ErrorKind::Other, - e.to_string(), - )); - span.end(); - return Err(SqlQueryError::ValidationError(e.to_string())); + return Err(e); } }; - span.end(); + // Execute query let mut span = tracer.start("execute_sql_query"); span.set_attribute(KeyValue::new("sql.query", validated_query.clone())); span.set_attribute(KeyValue::new("project_id", project_id.to_string())); @@ -240,3 +226,144 @@ fn remove_query_from_error_message(error_message: &str) -> String { let without_settings = SETTING_REGEX.replace_all(&error_message, "").to_string(); VERSION_REGEX.replace_all(&without_settings, "").to_string() } + +#[derive(Serialize)] +struct DataPlaneReadRequest { + query: String, +} + +pub async fn execute_sql_query_on_data_plane( + query: String, + project_id: Uuid, + workspace_id: Uuid, + data_plane_url: String, + http_client: Arc, + query_engine: Arc, +) -> Result, SqlQueryError> { + let tracer = global::tracer("app-server"); + + // Validate query first + let validated_query = match validate_query(query, project_id, query_engine).await { + Ok(validated_query) => validated_query, + Err(e) => { + return Err(e); + } + }; + + // Execute on data plane + let mut span = tracer.start("execute_data_plane_sql_query"); + span.set_attribute(KeyValue::new("sql.query", validated_query.clone())); + span.set_attribute(KeyValue::new("data_plane_url", data_plane_url.clone())); + + // Generate JWT token + let jwt_token = generate_data_plane_jwt(workspace_id).map_err(|e| { + span.record_error(&std::io::Error::new(std::io::ErrorKind::Other, e.clone())); + span.end(); + SqlQueryError::InternalError(format!("Failed to generate JWT: {}", e)) + })?; + + let request_body = DataPlaneReadRequest { + query: validated_query, + }; + + // Send http request to data plane + let response = http_client + .post(format!("{}/clickhouse/read", data_plane_url)) + .header("Authorization", format!("Bearer {}", jwt_token)) + .header("Content-Type", "application/json") + .json(&request_body) + .send() + .await + .map_err(|e| { + span.record_error(&e); + span.end(); + SqlQueryError::InternalError(format!("Failed to send request to data plane: {}", e)) + })?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response.text().await.unwrap_or_default(); + span.record_error(&std::io::Error::new( + std::io::ErrorKind::Other, + error_text.clone(), + )); + span.end(); + return Err(SqlQueryError::InternalError(format!( + "Data plane returned error {}: {}", + status, error_text + ))); + } + + let data = response.bytes().await.map_err(|e| { + span.record_error(&e); + span.end(); + SqlQueryError::InternalError(format!("Failed to read response: {}", e)) + })?; + span.set_attribute(KeyValue::new("sql.response_bytes", data.len() as i64)); + span.end(); + + // Parse response + let mut processing_span = tracer.start("process_query_response"); + + let results: Value = serde_json::from_slice(&data).map_err(|e| { + log::error!("Failed to parse data plane response as JSON: {}", e); + processing_span.record_error(&e); + processing_span.end(); + SqlQueryError::InternalError(e.to_string()) + })?; + + let data_array = results + .get("data") + .ok_or_else(|| { + let msg = "Response missing 'data' field".to_string(); + processing_span.record_error(&SqlQueryError::InternalError(msg.clone())); + processing_span.end(); + SqlQueryError::InternalError(msg) + })? + .as_array() + .ok_or_else(|| { + let msg = "Response 'data' field is not an array".to_string(); + processing_span.record_error(&SqlQueryError::InternalError(msg.clone())); + processing_span.end(); + SqlQueryError::InternalError(msg) + })?; + + processing_span.end(); + Ok(data_array.clone()) +} + +async fn validate_query( + query: String, + project_id: Uuid, + query_engine: Arc, +) -> Result { + let tracer = global::tracer("app-server"); + let mut span = tracer.start("validate_sql_query"); + span.set_attribute(KeyValue::new("sql.query", query.clone())); + span.set_attribute(KeyValue::new("project_id", project_id.to_string())); + + let validation_result = query_engine.validate_query(query, project_id).await; + + let validated_query = match validation_result { + Ok(QueryEngineValidationResult::Success { validated_query }) => validated_query, + Ok(QueryEngineValidationResult::Error { error }) => { + span.record_error(&std::io::Error::new( + std::io::ErrorKind::Other, + error.clone(), + )); + span.end(); + return Err(SqlQueryError::ValidationError(error)); + } + Err(e) => { + span.record_error(&std::io::Error::new( + std::io::ErrorKind::Other, + e.to_string(), + )); + span.end(); + return Err(SqlQueryError::ValidationError(e.to_string())); + } + }; + span.end(); + + Ok(validated_query) +} diff --git a/app-server/src/traces/consumer.rs b/app-server/src/traces/consumer.rs index 3257038ad..ed5b0cc9b 100644 --- a/app-server/src/traces/consumer.rs +++ b/app-server/src/traces/consumer.rs @@ -7,6 +7,7 @@ use futures_util::future::join_all; use itertools::Itertools; use opentelemetry::trace::FutureExt; use rayon::prelude::*; +use serde::Serialize; use serde_json::Value; use tracing::instrument; use uuid::Uuid; @@ -28,6 +29,7 @@ use crate::{ db::{ DB, events::Event, + projects::{DeploymentMode, get_workspace_by_project_id}, spans::Span, tags::{SpanTag, TagSource}, trace::upsert_trace_statistics_batch, @@ -51,6 +53,12 @@ use crate::{ }, }; +#[derive(Serialize)] +struct DataPlaneWriteRequest<'a> { + table: &'static str, + data: &'a [CHSpan], +} + pub async fn process_queue_spans( db: Arc, cache: Arc, @@ -58,6 +66,7 @@ pub async fn process_queue_spans( clickhouse: clickhouse::Client, storage: Arc, pubsub: Arc, + http_client: Arc, ) { loop { inner_process_queue_spans( @@ -67,6 +76,7 @@ pub async fn process_queue_spans( clickhouse.clone(), storage.clone(), pubsub.clone(), + http_client.clone(), ) .await; log::warn!("Span listener exited. Rebinding queue conneciton..."); @@ -80,6 +90,7 @@ async fn inner_process_queue_spans( clickhouse: clickhouse::Client, storage: Arc, pubsub: Arc, + http_client: Arc, ) { // Add retry logic with exponential backoff for connection failures let get_receiver = || async { @@ -142,6 +153,7 @@ async fn inner_process_queue_spans( acker, queue.clone(), pubsub.clone(), + http_client.clone(), ) .await; } @@ -149,7 +161,17 @@ async fn inner_process_queue_spans( log::warn!("Queue closed connection. Shutting down span listener"); } -#[instrument(skip(messages, db, clickhouse, cache, storage, acker, queue, pubsub))] +#[instrument(skip( + messages, + db, + clickhouse, + cache, + storage, + acker, + queue, + pubsub, + http_client +))] async fn process_spans_and_events_batch( messages: Vec, db: Arc, @@ -159,6 +181,7 @@ async fn process_spans_and_events_batch( acker: MessageQueueAcker, queue: Arc, pubsub: Arc, + http_client: Arc, ) { let mut all_spans = Vec::new(); let mut all_events = Vec::new(); @@ -223,6 +246,7 @@ async fn process_spans_and_events_batch( acker, queue, pubsub, + http_client, ) .with_current_context() .await; @@ -246,6 +270,7 @@ struct StrippedSpan { acker, queue, pubsub, + http_client, ))] async fn process_batch( mut spans: Vec, @@ -257,6 +282,7 @@ async fn process_batch( acker: MessageQueueAcker, queue: Arc, pubsub: Arc, + http_client: Arc, ) { let mut span_usage_vec = Vec::new(); let mut all_events = Vec::new(); @@ -357,19 +383,98 @@ async fn process_batch( }) .collect(); - if let Err(e) = ch::spans::insert_spans_batch(clickhouse.clone(), &ch_spans).await { - log::error!( - "Failed to record {} spans to clickhouse: {:?}", - ch_spans.len(), - e - ); - let _ = acker.reject(false).await.map_err(|e| { - log::error!( - "[Write to Clickhouse] Failed to reject MQ delivery (batch): {:?}", - e - ); - }); - return; + // TODO: add project id -> deployment mode to cache [?] + let (workspace_id, deployment_mode, data_plane_url) = + match get_workspace_by_project_id(&db.pool, &project_id).await { + Ok(workspace) => ( + workspace.id, + workspace.deployment_mode, + workspace.data_plane_url, + ), + Err(e) => { + log::error!("Failed to get workspace by project id: {:?}", e); + return; + } + }; + + match deployment_mode { + DeploymentMode::CLOUD | DeploymentMode::SELF_HOST => { + if let Err(e) = ch::spans::insert_spans_batch(clickhouse.clone(), &ch_spans).await { + log::error!( + "Failed to record {} spans to clickhouse: {:?}", + ch_spans.len(), + e + ); + let _ = acker.reject(false).await.map_err(|e| { + log::error!( + "[Write to Clickhouse] Failed to reject MQ delivery (batch): {:?}", + e + ); + }); + return; + } + } + DeploymentMode::HYBRID => { + // Route to data plane + let Some(data_plane_url) = data_plane_url else { + log::error!( + "HYBRID deployment mode requires data_plane_url to be set for project {}", + project_id + ); + let _ = acker.reject(false).await.map_err(|e| { + log::error!("[HYBRID] Failed to reject MQ delivery (batch): {:?}", e); + }); + return; + }; + + let jwt_token = match crate::utils::generate_data_plane_jwt(workspace_id) { + Ok(token) => token, + Err(e) => { + log::error!("Failed to generate data plane JWT: {}", e); + let _ = acker.reject(false).await.map_err(|e| { + log::error!("[HYBRID] Failed to reject MQ delivery (batch): {:?}", e); + }); + return; + } + }; + + let request_body = DataPlaneWriteRequest { + table: "spans", + data: &ch_spans, + }; + + let response = http_client + .post(format!("{}/clickhouse/write", data_plane_url)) + .header("Authorization", format!("Bearer {}", jwt_token)) + .header("Content-Type", "application/json") + .json(&request_body) + .send() + .await; + + match response { + Ok(resp) if resp.status().is_success() => { + // Success - continue with the rest of the processing + } + Ok(resp) => { + log::error!( + "Data plane returned non-success status {}: {:?}", + resp.status(), + resp.text().await.unwrap_or_default() + ); + let _ = acker.reject(false).await.map_err(|e| { + log::error!("[HYBRID] Failed to reject MQ delivery (batch): {:?}", e); + }); + return; + } + Err(e) => { + log::error!("Failed to send spans to data plane: {:?}", e); + let _ = acker.reject(false).await.map_err(|e| { + log::error!("[HYBRID] Failed to reject MQ delivery (batch): {:?}", e); + }); + return; + } + } + } } // Send realtime span updates directly to SSE connections after successful ClickHouse writes diff --git a/app-server/src/utils/mod.rs b/app-server/src/utils/mod.rs index bee7eb866..9785c8660 100644 --- a/app-server/src/utils/mod.rs +++ b/app-server/src/utils/mod.rs @@ -1,4 +1,88 @@ +use std::sync::OnceLock; +use std::time::Duration; + +use jsonwebtoken::{Algorithm, EncodingKey, Header, encode}; +use moka::sync::Cache; +use serde::{Deserialize, Serialize}; use serde_json::Value; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize)] +pub struct DataPlaneClaims { + /// Issuer + pub iss: String, + /// Subject (workspace_id) + pub sub: String, + /// Issued at (Unix timestamp) + pub iat: i64, + /// Expiration (Unix timestamp) + pub exp: i64, +} + +/// JWT expiration time in seconds (5 minutes) +const JWT_EXPIRATION_SECS: i64 = 300; + +/// Cache TTL - refresh token when 80% of lifetime has passed (4 minutes) +const JWT_CACHE_TTL_SECS: u64 = 240; + +/// Cached encoding key - parsed once from PEM on first use +static ENCODING_KEY: OnceLock> = OnceLock::new(); + +/// Cache of JWT tokens per workspace_id +static JWT_CACHE: OnceLock> = OnceLock::new(); + +fn get_encoding_key() -> Result<&'static EncodingKey, String> { + ENCODING_KEY + .get_or_init(|| { + let private_key_pem = std::env::var("DATA_PLANE_PRIVATE_KEY") + .map_err(|_| "DATA_PLANE_PRIVATE_KEY environment variable not set".to_string())?; + + EncodingKey::from_rsa_pem(private_key_pem.as_bytes()) + .map_err(|e| format!("Invalid RSA private key: {}", e)) + }) + .as_ref() + .map_err(|e| e.clone()) +} + +fn get_jwt_cache() -> &'static Cache { + JWT_CACHE.get_or_init(|| { + Cache::builder() + .time_to_live(Duration::from_secs(JWT_CACHE_TTL_SECS)) + .max_capacity(10_000) + .build() + }) +} + +/// Generate a JWT token for data plane authentication. +/// Uses RS256 algorithm with a private key from environment variable. +/// Tokens are cached per workspace_id and reused until near expiration. +pub fn generate_data_plane_jwt(workspace_id: Uuid) -> Result { + let cache = get_jwt_cache(); + + // Return cached token if available + if let Some(token) = cache.get(&workspace_id) { + return Ok(token); + } + + // Generate new token + let key = get_encoding_key()?; + + let now = chrono::Utc::now().timestamp(); + let claims = DataPlaneClaims { + iss: "laminar".to_string(), + sub: workspace_id.to_string(), + iat: now, + exp: now + JWT_EXPIRATION_SECS, + }; + + let token = encode(&Header::new(Algorithm::RS256), &claims, key) + .map_err(|e| format!("Failed to encode JWT: {}", e))?; + + // Cache the token + cache.insert(workspace_id, token.clone()); + + Ok(token) +} pub fn json_value_to_string(v: &Value) -> String { match v { diff --git a/frontend/lib/db/migrations/0062_deployment_mode.sql b/frontend/lib/db/migrations/0062_deployment_mode.sql new file mode 100644 index 000000000..d22f62886 --- /dev/null +++ b/frontend/lib/db/migrations/0062_deployment_mode.sql @@ -0,0 +1,2 @@ +ALTER TABLE workspaces ADD COLUMN deployment_mode TEXT NOT NULL DEFAULT 'CLOUD'; +ALTER TABLE workspaces ADD COLUMN data_plane_url TEXT; From 61bc59f34ba1a7ae220386961d08a818be82dce5 Mon Sep 17 00:00:00 2001 From: Rakhman Asmatullayev Date: Sat, 6 Dec 2025 00:03:51 +0000 Subject: [PATCH 2/4] chore error handling, sql type --- app-server/src/db/projects.rs | 2 +- app-server/src/traces/consumer.rs | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/app-server/src/db/projects.rs b/app-server/src/db/projects.rs index be7424d93..bf9501e16 100644 --- a/app-server/src/db/projects.rs +++ b/app-server/src/db/projects.rs @@ -5,7 +5,7 @@ use sqlx::{FromRow, PgPool}; use uuid::Uuid; #[derive(sqlx::Type, Deserialize, Serialize, PartialEq, Clone, Debug, Default)] -#[sqlx(type_name = "deployment_mode")] +#[sqlx(type_name = "TEXT", rename_all = "SCREAMING_SNAKE_CASE")] pub enum DeploymentMode { #[default] CLOUD, diff --git a/app-server/src/traces/consumer.rs b/app-server/src/traces/consumer.rs index ed5b0cc9b..14c18b32e 100644 --- a/app-server/src/traces/consumer.rs +++ b/app-server/src/traces/consumer.rs @@ -393,6 +393,12 @@ async fn process_batch( ), Err(e) => { log::error!("Failed to get workspace by project id: {:?}", e); + let _ = acker.reject(false).await.map_err(|e| { + log::error!( + "[Get workspace] Failed to reject MQ delivery (batch): {:?}", + e + ); + }); return; } }; From f5fbafebc9f65936b7a4ea032894911ebf60c427 Mon Sep 17 00:00:00 2001 From: Rakhman Asmatullayev Date: Mon, 8 Dec 2025 16:16:29 +0000 Subject: [PATCH 3/4] move data plane logic in another dir, redo auth --- app-server/Cargo.lock | 1 + app-server/Cargo.toml | 1 + app-server/src/api/v1/sql.rs | 83 ++---- app-server/src/data_plane/auth.rs | 97 +++++++ app-server/src/data_plane/data_processor.rs | 281 ++++++++++++++++++++ app-server/src/data_plane/mod.rs | 4 + app-server/src/db/projects.rs | 2 - app-server/src/main.rs | 1 + app-server/src/sql/mod.rs | 115 +------- app-server/src/traces/consumer.rs | 119 +-------- app-server/src/utils/mod.rs | 84 ------ 11 files changed, 422 insertions(+), 366 deletions(-) create mode 100644 app-server/src/data_plane/auth.rs create mode 100644 app-server/src/data_plane/data_processor.rs create mode 100644 app-server/src/data_plane/mod.rs diff --git a/app-server/Cargo.lock b/app-server/Cargo.lock index 02b7f2861..821e3e69c 100644 --- a/app-server/Cargo.lock +++ b/app-server/Cargo.lock @@ -391,6 +391,7 @@ dependencies = [ "enum_delegate", "enum_dispatch", "futures-util", + "hex", "indexmap", "itertools 0.14.0", "jsonwebtoken", diff --git a/app-server/Cargo.toml b/app-server/Cargo.toml index 64a4356ab..e644da815 100644 --- a/app-server/Cargo.toml +++ b/app-server/Cargo.toml @@ -24,6 +24,7 @@ dotenv = "0.15" enum_delegate = "0.2.0" enum_dispatch = "0.3.13" futures-util = "0.3" +hex = "0.4" indexmap = {version = "2.11.4", features = ["serde"]} itertools = "0.14.0" jsonwebtoken = "9" diff --git a/app-server/src/api/v1/sql.rs b/app-server/src/api/v1/sql.rs index e47fdbbca..419b8cffa 100644 --- a/app-server/src/api/v1/sql.rs +++ b/app-server/src/api/v1/sql.rs @@ -1,24 +1,16 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use actix_web::{HttpResponse, post, web}; -use opentelemetry::{ - global, - trace::{Tracer, mark_span_as_active}, -}; use serde::{Deserialize, Serialize}; use crate::{ - db::{ - DB, - project_api_keys::ProjectApiKey, - projects::{DeploymentMode, get_workspace_by_project_id}, - }, + data_plane, + db::{DB, project_api_keys::ProjectApiKey}, query_engine::QueryEngine, - sql::{self, ClickhouseReadonlyClient}, + routes::types::ResponseResult, + sql::ClickhouseReadonlyClient, }; -use crate::routes::types::ResponseResult; - #[derive(Deserialize)] #[serde(rename_all = "camelCase")] pub struct SqlQueryRequest { @@ -43,53 +35,22 @@ pub async fn execute_sql_query( let project_id = project_api_key.project_id; let SqlQueryRequest { query } = req.into_inner(); - let tracer = global::tracer("tracer"); - let span = tracer.start("api_sql_query"); - let _guard = mark_span_as_active(span); - - // Fetch workspace info for routing - let workspace = get_workspace_by_project_id(&db.pool, &project_id) - .await - .map_err(|e| anyhow::anyhow!("Failed to get workspace: {}", e))?; - - match workspace.deployment_mode { - DeploymentMode::CLOUD | DeploymentMode::SELF_HOST => match clickhouse_ro.as_ref() { - Some(ro_client) => { - match sql::execute_sql_query( - query, - project_id, - HashMap::new(), - ro_client.clone(), - query_engine.into_inner().as_ref().clone(), - ) - .await - { - Ok(result_json) => { - Ok(HttpResponse::Ok().json(SqlQueryResponse { data: result_json })) - } - Err(e) => Err(e.into()), - } - } - None => Err(anyhow::anyhow!("ClickHouse read-only client is not configured.").into()), - }, - DeploymentMode::HYBRID => { - match sql::execute_sql_query_on_data_plane( - query, - project_id, - workspace.id, - workspace - .data_plane_url - .ok_or_else(|| anyhow::anyhow!("Data plane URL is not set"))?, - http_client.into_inner().as_ref().clone(), - query_engine.into_inner().as_ref().clone(), - ) - .await - { - Ok(result_json) => { - Ok(HttpResponse::Ok().json(SqlQueryResponse { data: result_json })) - } - Err(e) => Err(e.into()), - } + let clickhouse = match clickhouse_ro.as_ref() { + Some(client) => client.clone(), + None => { + return Err(anyhow::anyhow!("ClickHouse read-only client is not configured.").into()); } - } + }; + + let data = data_plane::read( + &db.pool, + clickhouse, + http_client.get_ref().clone(), + query_engine.get_ref().clone(), + project_id, + query, + ) + .await?; + + Ok(HttpResponse::Ok().json(SqlQueryResponse { data })) } diff --git a/app-server/src/data_plane/auth.rs b/app-server/src/data_plane/auth.rs new file mode 100644 index 000000000..2d5a43638 --- /dev/null +++ b/app-server/src/data_plane/auth.rs @@ -0,0 +1,97 @@ +//! Authentication for data plane communication. +//! +//! Uses Ed25519 signatures (asymmetric) for token generation. +//! Token format: `base64(payload).base64(signature)` +//! +//! Payload: `workspace_id:issued_at:expires_at` +//! +//! Generate keys: +//! ```bash +//! # Generate keypair and print hex-encoded keys +//! cargo run --bin generate-keys # or use the generate_keypair() function +//! ``` + +use std::sync::OnceLock; +use std::time::Duration; + +use base64::{Engine, engine::general_purpose::URL_SAFE_NO_PAD}; +use moka::sync::Cache; +use sodiumoxide::crypto::sign; +use uuid::Uuid; + +/// Token expiration time in seconds (15 minutes) +const TOKEN_EXPIRATION_SECS: i64 = 900; + +/// Cache TTL - refresh token when 80% of lifetime has passed (12 minutes) +const TOKEN_CACHE_TTL_SECS: u64 = 720; + +/// Cached signing key - parsed once on first use +static SIGNING_KEY: OnceLock> = OnceLock::new(); + +/// Cache of tokens per workspace_id +static TOKEN_CACHE: OnceLock> = OnceLock::new(); + +fn get_signing_key() -> Result<&'static sign::SecretKey, String> { + SIGNING_KEY + .get_or_init(|| { + let key_hex = std::env::var("DATA_PLANE_SIGNING_KEY") + .map_err(|_| "DATA_PLANE_SIGNING_KEY environment variable not set".to_string())?; + + let key_bytes = hex::decode(&key_hex) + .map_err(|e| format!("Invalid hex in DATA_PLANE_SIGNING_KEY: {}", e))?; + + sign::SecretKey::from_slice(&key_bytes) + .ok_or_else(|| "Invalid Ed25519 secret key (expected 64 bytes)".to_string()) + }) + .as_ref() + .map_err(|e| e.clone()) +} + +fn get_token_cache() -> &'static Cache { + TOKEN_CACHE.get_or_init(|| { + Cache::builder() + .time_to_live(Duration::from_secs(TOKEN_CACHE_TTL_SECS)) + .max_capacity(10_000) + .build() + }) +} + +/// Generate a signed token for data plane authentication. +/// +/// Uses Ed25519 signatures with a private key from environment variable. +/// Tokens are cached per workspace_id and reused until near expiration. +/// +/// Token format: `base64url(payload).base64url(signature)` +/// Payload format: `workspace_id:issued_at:expires_at` +pub fn generate_auth_token(workspace_id: Uuid) -> Result { + let cache = get_token_cache(); + + // Return cached token if available + if let Some(token) = cache.get(&workspace_id) { + return Ok(token); + } + + let signing_key = get_signing_key()?; + + let now = chrono::Utc::now().timestamp(); + let expires_at = now + TOKEN_EXPIRATION_SECS; + + // Create payload: workspace_id:issued_at:expires_at + let payload: String = format!("{}:{}:{}", workspace_id, now, expires_at); + let payload_bytes = payload.as_bytes(); + + // Sign the payload + let signature = sign::sign_detached(payload_bytes, signing_key); + + // Encode as base64url: payload.signature + let token = format!( + "{}.{}", + URL_SAFE_NO_PAD.encode(payload_bytes), + URL_SAFE_NO_PAD.encode(signature.as_ref()) + ); + + // Cache the token + cache.insert(workspace_id, token.clone()); + + Ok(token) +} diff --git a/app-server/src/data_plane/data_processor.rs b/app-server/src/data_plane/data_processor.rs new file mode 100644 index 000000000..1b631a1fb --- /dev/null +++ b/app-server/src/data_plane/data_processor.rs @@ -0,0 +1,281 @@ +//! Data processor handles routing reads/writes to the appropriate backend +//! based on the workspace's deployment mode (CLOUD vs HYBRID). + +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::OnceLock; +use std::time::Duration; + +use anyhow::{Result, anyhow}; +use moka::future::Cache; +use opentelemetry::{ + KeyValue, global, + trace::{Span, Tracer}, +}; +use serde::Serialize; +use serde_json::Value; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::query_engine::QueryEngine; +use crate::sql::{ClickhouseReadonlyClient, SqlQueryError, execute_sql_query, validate_query}; +use crate::{ + ch::{self, spans::CHSpan}, + db::projects::{DeploymentMode, get_workspace_by_project_id}, +}; + +use super::auth::generate_auth_token; + +const WORKSPACE_CONFIG_CACHE_TTL_SECS: u64 = 60 * 60; // 1 hour + +static WORKSPACE_CONFIG_CACHE: OnceLock> = OnceLock::new(); + +fn get_cache() -> &'static Cache { + WORKSPACE_CONFIG_CACHE.get_or_init(|| { + Cache::builder() + .time_to_live(Duration::from_secs(WORKSPACE_CONFIG_CACHE_TTL_SECS)) + .build() + }) +} + +#[derive(Clone, Debug)] +struct WorkspaceConfig { + workspace_id: Uuid, + deployment_mode: DeploymentMode, + data_plane_url: Option, +} + +async fn get_workspace_config(pool: &PgPool, project_id: Uuid) -> Result { + let cache = get_cache(); + + if let Some(config) = cache.get(&project_id).await { + return Ok(config); + } + + let workspace = get_workspace_by_project_id(pool, &project_id).await?; + + let config = WorkspaceConfig { + workspace_id: workspace.id, + deployment_mode: workspace.deployment_mode, + data_plane_url: workspace.data_plane_url, + }; + + cache.insert(project_id, config.clone()).await; + + Ok(config) +} + +#[derive(Serialize)] +struct DataPlaneWriteRequest<'a> { + table: &'static str, + data: &'a [CHSpan], +} + +#[derive(Serialize)] +struct DataPlaneReadRequest { + query: String, +} + +pub async fn write_spans( + pool: &PgPool, + clickhouse: &clickhouse::Client, + http_client: &reqwest::Client, + project_id: Uuid, + spans: &[CHSpan], +) -> Result<()> { + if spans.is_empty() { + return Ok(()); + } + + let config = get_workspace_config(pool, project_id).await?; + + match config.deployment_mode { + DeploymentMode::CLOUD => write_spans_to_clickhouse(clickhouse, spans).await, + DeploymentMode::HYBRID => { + write_spans_to_data_plane(http_client, project_id, &config, spans).await + } + } +} + +async fn write_spans_to_clickhouse( + clickhouse: &clickhouse::Client, + spans: &[CHSpan], +) -> Result<()> { + ch::spans::insert_spans_batch(clickhouse.clone(), spans).await +} + +async fn write_spans_to_data_plane( + http_client: &reqwest::Client, + project_id: Uuid, + config: &WorkspaceConfig, + spans: &[CHSpan], +) -> Result<()> { + let data_plane_url = config.data_plane_url.as_ref().ok_or_else(|| { + anyhow!( + "HYBRID deployment requires data_plane_url for project {}", + project_id + ) + })?; + + let auth_token = generate_auth_token(config.workspace_id) + .map_err(|e| anyhow!("Failed to generate auth token: {}", e))?; + + let response = http_client + .post(format!("{}/clickhouse/write", data_plane_url)) + .header("Authorization", format!("Bearer {}", auth_token)) + .header("Content-Type", "application/json") + .json(&DataPlaneWriteRequest { + table: "spans", + data: spans, + }) + .send() + .await?; + + if response.status().is_success() { + Ok(()) + } else { + Err(anyhow!( + "Data plane returned {}: {}", + response.status(), + response.text().await.unwrap_or_default() + )) + } +} + +pub async fn read( + pool: &PgPool, + clickhouse_ro: Arc, + http_client: Arc, + query_engine: Arc, + project_id: Uuid, + query: String, +) -> Result> { + let config = get_workspace_config(pool, project_id).await?; + + match config.deployment_mode { + DeploymentMode::CLOUD => { + read_from_clickhouse(clickhouse_ro, query_engine, project_id, query).await + } + DeploymentMode::HYBRID => { + read_from_data_plane(&http_client, query_engine, project_id, &config, query).await + } + } +} + +async fn read_from_clickhouse( + clickhouse_ro: Arc, + query_engine: Arc, + project_id: Uuid, + query: String, +) -> Result> { + let results = execute_sql_query( + query, + project_id, + HashMap::new(), + clickhouse_ro, + query_engine, + ) + .await + .map_err(|e| anyhow!("Failed to execute query: {}", e))?; + + Ok(results) +} + +async fn read_from_data_plane( + http_client: &reqwest::Client, + query_engine: Arc, + project_id: Uuid, + config: &WorkspaceConfig, + query: String, +) -> Result> { + let tracer = global::tracer("app-server"); + + // Validate query first + // TODO: move this function inside read() function above after all execute_sql_query() calls are done via data processor + let validated_query = match validate_query(query, project_id, query_engine).await { + Ok(validated_query) => validated_query, + Err(e) => { + return Err(e.into()); + } + }; + + let data_plane_url = config.data_plane_url.as_ref().ok_or_else(|| { + anyhow!( + "HYBRID deployment requires data_plane_url for project {}", + project_id + ) + })?; + + let auth_token = generate_auth_token(config.workspace_id) + .map_err(|e| anyhow!("Failed to generate auth token: {}", e))?; + + let mut span = tracer.start("execute_data_plane_sql_query"); + span.set_attribute(KeyValue::new("sql.query", validated_query.clone())); + span.set_attribute(KeyValue::new("data_plane_url", data_plane_url.clone())); + + let response = http_client + .post(format!("{}/clickhouse/read", data_plane_url)) + .header("Authorization", format!("Bearer {}", auth_token)) + .header("Content-Type", "application/json") + .json(&DataPlaneReadRequest { + query: validated_query, + }) + .send() + .await + .map_err(|e| { + span.record_error(&e); + span.end(); + SqlQueryError::InternalError(format!("Failed to send request to data plane: {}", e)) + })?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response.text().await.unwrap_or_default(); + span.record_error(&std::io::Error::new( + std::io::ErrorKind::Other, + error_text.clone(), + )); + span.end(); + return Err(anyhow!( + "Data plane returned error {}: {}", + status, + error_text + )); + } + + let data = response.bytes().await.map_err(|e| { + span.record_error(&e); + span.end(); + SqlQueryError::InternalError(format!("Failed to read response: {}", e)) + })?; + span.set_attribute(KeyValue::new("sql.response_bytes", data.len() as i64)); + span.end(); + + let mut processing_span = tracer.start("process_query_response"); + + let results: Value = serde_json::from_slice(&data).map_err(|e| { + log::error!("Failed to parse data plane response as JSON: {}", e); + processing_span.record_error(&e); + processing_span.end(); + SqlQueryError::InternalError(e.to_string()) + })?; + + let data_array = results + .get("data") + .ok_or_else(|| { + let msg = "Response missing 'data' field".to_string(); + processing_span.record_error(&SqlQueryError::InternalError(msg.clone())); + processing_span.end(); + SqlQueryError::InternalError(msg) + })? + .as_array() + .ok_or_else(|| { + let msg = "Response 'data' field is not an array".to_string(); + processing_span.record_error(&SqlQueryError::InternalError(msg.clone())); + processing_span.end(); + SqlQueryError::InternalError(msg) + })?; + + processing_span.end(); + Ok(data_array.clone()) +} diff --git a/app-server/src/data_plane/mod.rs b/app-server/src/data_plane/mod.rs new file mode 100644 index 000000000..00461f02e --- /dev/null +++ b/app-server/src/data_plane/mod.rs @@ -0,0 +1,4 @@ +pub mod auth; +pub mod data_processor; + +pub use data_processor::{read, write_spans}; diff --git a/app-server/src/db/projects.rs b/app-server/src/db/projects.rs index bf9501e16..4b0184003 100644 --- a/app-server/src/db/projects.rs +++ b/app-server/src/db/projects.rs @@ -10,8 +10,6 @@ pub enum DeploymentMode { #[default] CLOUD, HYBRID, - #[allow(non_camel_case_types)] - SELF_HOST, } #[derive(Deserialize, Serialize, FromRow, Clone)] diff --git a/app-server/src/main.rs b/app-server/src/main.rs index a4a27a30c..f6e20ed4d 100644 --- a/app-server/src/main.rs +++ b/app-server/src/main.rs @@ -65,6 +65,7 @@ mod auth; mod browser_events; mod cache; mod ch; +mod data_plane; mod datasets; mod db; mod evaluations; diff --git a/app-server/src/sql/mod.rs b/app-server/src/sql/mod.rs index cc89a2d4b..09562070f 100644 --- a/app-server/src/sql/mod.rs +++ b/app-server/src/sql/mod.rs @@ -1,11 +1,10 @@ pub mod queries; - use opentelemetry::{ KeyValue, global, trace::{Span, Tracer}, }; use regex::Regex; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use serde_json::{Value, json}; use std::{ collections::HashMap, @@ -14,10 +13,7 @@ use std::{ }; use uuid::Uuid; -use crate::{ - query_engine::{QueryEngine, QueryEngineTrait, QueryEngineValidationResult}, - utils::generate_data_plane_jwt, -}; +use crate::query_engine::{QueryEngine, QueryEngineTrait, QueryEngineValidationResult}; pub struct ClickhouseReadonlyClient(clickhouse::Client); @@ -227,112 +223,7 @@ fn remove_query_from_error_message(error_message: &str) -> String { VERSION_REGEX.replace_all(&without_settings, "").to_string() } -#[derive(Serialize)] -struct DataPlaneReadRequest { - query: String, -} - -pub async fn execute_sql_query_on_data_plane( - query: String, - project_id: Uuid, - workspace_id: Uuid, - data_plane_url: String, - http_client: Arc, - query_engine: Arc, -) -> Result, SqlQueryError> { - let tracer = global::tracer("app-server"); - - // Validate query first - let validated_query = match validate_query(query, project_id, query_engine).await { - Ok(validated_query) => validated_query, - Err(e) => { - return Err(e); - } - }; - - // Execute on data plane - let mut span = tracer.start("execute_data_plane_sql_query"); - span.set_attribute(KeyValue::new("sql.query", validated_query.clone())); - span.set_attribute(KeyValue::new("data_plane_url", data_plane_url.clone())); - - // Generate JWT token - let jwt_token = generate_data_plane_jwt(workspace_id).map_err(|e| { - span.record_error(&std::io::Error::new(std::io::ErrorKind::Other, e.clone())); - span.end(); - SqlQueryError::InternalError(format!("Failed to generate JWT: {}", e)) - })?; - - let request_body = DataPlaneReadRequest { - query: validated_query, - }; - - // Send http request to data plane - let response = http_client - .post(format!("{}/clickhouse/read", data_plane_url)) - .header("Authorization", format!("Bearer {}", jwt_token)) - .header("Content-Type", "application/json") - .json(&request_body) - .send() - .await - .map_err(|e| { - span.record_error(&e); - span.end(); - SqlQueryError::InternalError(format!("Failed to send request to data plane: {}", e)) - })?; - - if !response.status().is_success() { - let status = response.status(); - let error_text = response.text().await.unwrap_or_default(); - span.record_error(&std::io::Error::new( - std::io::ErrorKind::Other, - error_text.clone(), - )); - span.end(); - return Err(SqlQueryError::InternalError(format!( - "Data plane returned error {}: {}", - status, error_text - ))); - } - - let data = response.bytes().await.map_err(|e| { - span.record_error(&e); - span.end(); - SqlQueryError::InternalError(format!("Failed to read response: {}", e)) - })?; - span.set_attribute(KeyValue::new("sql.response_bytes", data.len() as i64)); - span.end(); - - // Parse response - let mut processing_span = tracer.start("process_query_response"); - - let results: Value = serde_json::from_slice(&data).map_err(|e| { - log::error!("Failed to parse data plane response as JSON: {}", e); - processing_span.record_error(&e); - processing_span.end(); - SqlQueryError::InternalError(e.to_string()) - })?; - - let data_array = results - .get("data") - .ok_or_else(|| { - let msg = "Response missing 'data' field".to_string(); - processing_span.record_error(&SqlQueryError::InternalError(msg.clone())); - processing_span.end(); - SqlQueryError::InternalError(msg) - })? - .as_array() - .ok_or_else(|| { - let msg = "Response 'data' field is not an array".to_string(); - processing_span.record_error(&SqlQueryError::InternalError(msg.clone())); - processing_span.end(); - SqlQueryError::InternalError(msg) - })?; - - processing_span.end(); - Ok(data_array.clone()) -} - -async fn validate_query( +pub async fn validate_query( query: String, project_id: Uuid, query_engine: Arc, diff --git a/app-server/src/traces/consumer.rs b/app-server/src/traces/consumer.rs index 14c18b32e..a451c2831 100644 --- a/app-server/src/traces/consumer.rs +++ b/app-server/src/traces/consumer.rs @@ -7,7 +7,6 @@ use futures_util::future::join_all; use itertools::Itertools; use opentelemetry::trace::FutureExt; use rayon::prelude::*; -use serde::Serialize; use serde_json::Value; use tracing::instrument; use uuid::Uuid; @@ -22,14 +21,13 @@ use crate::{ api::v1::traces::RabbitMqSpanMessage, cache::Cache, ch::{ - self, spans::CHSpan, traces::{CHTrace, TraceAggregation, upsert_traces_batch}, }, + data_plane, db::{ DB, events::Event, - projects::{DeploymentMode, get_workspace_by_project_id}, spans::Span, tags::{SpanTag, TagSource}, trace::upsert_trace_statistics_batch, @@ -53,12 +51,6 @@ use crate::{ }, }; -#[derive(Serialize)] -struct DataPlaneWriteRequest<'a> { - table: &'static str, - data: &'a [CHSpan], -} - pub async fn process_queue_spans( db: Arc, cache: Arc, @@ -383,104 +375,17 @@ async fn process_batch( }) .collect(); - // TODO: add project id -> deployment mode to cache [?] - let (workspace_id, deployment_mode, data_plane_url) = - match get_workspace_by_project_id(&db.pool, &project_id).await { - Ok(workspace) => ( - workspace.id, - workspace.deployment_mode, - workspace.data_plane_url, - ), - Err(e) => { - log::error!("Failed to get workspace by project id: {:?}", e); - let _ = acker.reject(false).await.map_err(|e| { - log::error!( - "[Get workspace] Failed to reject MQ delivery (batch): {:?}", - e - ); - }); - return; - } - }; - - match deployment_mode { - DeploymentMode::CLOUD | DeploymentMode::SELF_HOST => { - if let Err(e) = ch::spans::insert_spans_batch(clickhouse.clone(), &ch_spans).await { - log::error!( - "Failed to record {} spans to clickhouse: {:?}", - ch_spans.len(), - e - ); - let _ = acker.reject(false).await.map_err(|e| { - log::error!( - "[Write to Clickhouse] Failed to reject MQ delivery (batch): {:?}", - e - ); - }); - return; - } - } - DeploymentMode::HYBRID => { - // Route to data plane - let Some(data_plane_url) = data_plane_url else { - log::error!( - "HYBRID deployment mode requires data_plane_url to be set for project {}", - project_id - ); - let _ = acker.reject(false).await.map_err(|e| { - log::error!("[HYBRID] Failed to reject MQ delivery (batch): {:?}", e); - }); - return; - }; - - let jwt_token = match crate::utils::generate_data_plane_jwt(workspace_id) { - Ok(token) => token, - Err(e) => { - log::error!("Failed to generate data plane JWT: {}", e); - let _ = acker.reject(false).await.map_err(|e| { - log::error!("[HYBRID] Failed to reject MQ delivery (batch): {:?}", e); - }); - return; - } - }; - - let request_body = DataPlaneWriteRequest { - table: "spans", - data: &ch_spans, - }; - - let response = http_client - .post(format!("{}/clickhouse/write", data_plane_url)) - .header("Authorization", format!("Bearer {}", jwt_token)) - .header("Content-Type", "application/json") - .json(&request_body) - .send() - .await; - - match response { - Ok(resp) if resp.status().is_success() => { - // Success - continue with the rest of the processing - } - Ok(resp) => { - log::error!( - "Data plane returned non-success status {}: {:?}", - resp.status(), - resp.text().await.unwrap_or_default() - ); - let _ = acker.reject(false).await.map_err(|e| { - log::error!("[HYBRID] Failed to reject MQ delivery (batch): {:?}", e); - }); - return; - } - Err(e) => { - log::error!("Failed to send spans to data plane: {:?}", e); - let _ = acker.reject(false).await.map_err(|e| { - log::error!("[HYBRID] Failed to reject MQ delivery (batch): {:?}", e); - }); - return; - } - } - } + if let Err(e) = + data_plane::write_spans(&db.pool, &clickhouse, &http_client, project_id, &ch_spans).await + { + log::error!("Failed to write spans: {:?}", e); + let _ = acker.reject(false).await.map_err(|e| { + log::error!( + "[Write to data plane] Failed to reject MQ delivery (batch): {:?}", + e + ); + }); + return; } // Send realtime span updates directly to SSE connections after successful ClickHouse writes diff --git a/app-server/src/utils/mod.rs b/app-server/src/utils/mod.rs index 9785c8660..bee7eb866 100644 --- a/app-server/src/utils/mod.rs +++ b/app-server/src/utils/mod.rs @@ -1,88 +1,4 @@ -use std::sync::OnceLock; -use std::time::Duration; - -use jsonwebtoken::{Algorithm, EncodingKey, Header, encode}; -use moka::sync::Cache; -use serde::{Deserialize, Serialize}; use serde_json::Value; -use uuid::Uuid; - -#[derive(Debug, Serialize, Deserialize)] -pub struct DataPlaneClaims { - /// Issuer - pub iss: String, - /// Subject (workspace_id) - pub sub: String, - /// Issued at (Unix timestamp) - pub iat: i64, - /// Expiration (Unix timestamp) - pub exp: i64, -} - -/// JWT expiration time in seconds (5 minutes) -const JWT_EXPIRATION_SECS: i64 = 300; - -/// Cache TTL - refresh token when 80% of lifetime has passed (4 minutes) -const JWT_CACHE_TTL_SECS: u64 = 240; - -/// Cached encoding key - parsed once from PEM on first use -static ENCODING_KEY: OnceLock> = OnceLock::new(); - -/// Cache of JWT tokens per workspace_id -static JWT_CACHE: OnceLock> = OnceLock::new(); - -fn get_encoding_key() -> Result<&'static EncodingKey, String> { - ENCODING_KEY - .get_or_init(|| { - let private_key_pem = std::env::var("DATA_PLANE_PRIVATE_KEY") - .map_err(|_| "DATA_PLANE_PRIVATE_KEY environment variable not set".to_string())?; - - EncodingKey::from_rsa_pem(private_key_pem.as_bytes()) - .map_err(|e| format!("Invalid RSA private key: {}", e)) - }) - .as_ref() - .map_err(|e| e.clone()) -} - -fn get_jwt_cache() -> &'static Cache { - JWT_CACHE.get_or_init(|| { - Cache::builder() - .time_to_live(Duration::from_secs(JWT_CACHE_TTL_SECS)) - .max_capacity(10_000) - .build() - }) -} - -/// Generate a JWT token for data plane authentication. -/// Uses RS256 algorithm with a private key from environment variable. -/// Tokens are cached per workspace_id and reused until near expiration. -pub fn generate_data_plane_jwt(workspace_id: Uuid) -> Result { - let cache = get_jwt_cache(); - - // Return cached token if available - if let Some(token) = cache.get(&workspace_id) { - return Ok(token); - } - - // Generate new token - let key = get_encoding_key()?; - - let now = chrono::Utc::now().timestamp(); - let claims = DataPlaneClaims { - iss: "laminar".to_string(), - sub: workspace_id.to_string(), - iat: now, - exp: now + JWT_EXPIRATION_SECS, - }; - - let token = encode(&Header::new(Algorithm::RS256), &claims, key) - .map_err(|e| format!("Failed to encode JWT: {}", e))?; - - // Cache the token - cache.insert(workspace_id, token.clone()); - - Ok(token) -} pub fn json_value_to_string(v: &Value) -> String { match v { From 480f84ee17e1e833e2e9541447d8106245825639 Mon Sep 17 00:00:00 2001 From: Rakhman Asmatullayev Date: Mon, 8 Dec 2025 16:31:31 +0000 Subject: [PATCH 4/4] generic write to data plane payload --- app-server/src/data_plane/data_processor.rs | 24 +++++++++++++++++---- app-server/src/data_plane/mod.rs | 2 +- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/app-server/src/data_plane/data_processor.rs b/app-server/src/data_plane/data_processor.rs index 1b631a1fb..f89c1e579 100644 --- a/app-server/src/data_plane/data_processor.rs +++ b/app-server/src/data_plane/data_processor.rs @@ -65,10 +65,26 @@ async fn get_workspace_config(pool: &PgPool, project_id: Uuid) -> Result { + #[serde(skip_serializing_if = "Option::is_none")] + pub spans: Option<&'a [CHSpan]>, + // Add more fields here as needed (e.g., events, traces) +} + #[derive(Serialize)] struct DataPlaneWriteRequest<'a> { - table: &'static str, - data: &'a [CHSpan], + table: Table, + data: WriteData<'a>, } #[derive(Serialize)] @@ -125,8 +141,8 @@ async fn write_spans_to_data_plane( .header("Authorization", format!("Bearer {}", auth_token)) .header("Content-Type", "application/json") .json(&DataPlaneWriteRequest { - table: "spans", - data: spans, + table: Table::Spans, + data: WriteData { spans: Some(spans) }, }) .send() .await?; diff --git a/app-server/src/data_plane/mod.rs b/app-server/src/data_plane/mod.rs index 00461f02e..3606e5e23 100644 --- a/app-server/src/data_plane/mod.rs +++ b/app-server/src/data_plane/mod.rs @@ -1,4 +1,4 @@ pub mod auth; pub mod data_processor; -pub use data_processor::{read, write_spans}; +pub use data_processor::{Table, WriteData, read, write_spans};