Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions app-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions app-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum_dispatch = "0.3.13"
futures-util = "0.3"
indexmap = {version = "2.11.4", features = ["serde"]}
itertools = "0.14.0"
jsonwebtoken = "9"
lapin = "3.0.0"
log = "0.4.28"
moka = {version = "0.12.10", features = ["sync", "future"]}
Expand Down
46 changes: 39 additions & 7 deletions app-server/src/api/v1/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use opentelemetry::{
use serde::{Deserialize, Serialize};

use crate::{
db::project_api_keys::ProjectApiKey,
db::{
DB,
project_api_keys::ProjectApiKey,
projects::{DeploymentMode, get_workspace_by_project_id},
},
query_engine::QueryEngine,
sql::{self, ClickhouseReadonlyClient},
};
Expand All @@ -31,8 +35,10 @@ pub struct SqlQueryResponse {
pub async fn execute_sql_query(
req: web::Json<SqlQueryRequest>,
project_api_key: ProjectApiKey,
db: web::Data<DB>,
clickhouse_ro: web::Data<Option<Arc<ClickhouseReadonlyClient>>>,
query_engine: web::Data<Arc<QueryEngine>>,
http_client: web::Data<Arc<reqwest::Client>>,
) -> ResponseResult {
let project_id = project_api_key.project_id;
let SqlQueryRequest { query } = req.into_inner();
Expand All @@ -41,13 +47,40 @@ pub async fn execute_sql_query(
let span = tracer.start("api_sql_query");
let _guard = mark_span_as_active(span);

match clickhouse_ro.as_ref() {
Some(ro_client) => {
match sql::execute_sql_query(
// Fetch workspace info for routing
let workspace = get_workspace_by_project_id(&db.pool, &project_id)
.await
.map_err(|e| anyhow::anyhow!("Failed to get workspace: {}", e))?;

match workspace.deployment_mode {
DeploymentMode::CLOUD | DeploymentMode::SELF_HOST => match clickhouse_ro.as_ref() {
Some(ro_client) => {
match sql::execute_sql_query(
query,
project_id,
HashMap::new(),
ro_client.clone(),
query_engine.into_inner().as_ref().clone(),
)
.await
{
Ok(result_json) => {
Ok(HttpResponse::Ok().json(SqlQueryResponse { data: result_json }))
}
Err(e) => Err(e.into()),
}
}
None => Err(anyhow::anyhow!("ClickHouse read-only client is not configured.").into()),
},
DeploymentMode::HYBRID => {
match sql::execute_sql_query_on_data_plane(
query,
project_id,
HashMap::new(),
ro_client.clone(),
workspace.id,
workspace
.data_plane_url
.ok_or_else(|| anyhow::anyhow!("Data plane URL is not set"))?,
http_client.into_inner().as_ref().clone(),
query_engine.into_inner().as_ref().clone(),
)
.await
Expand All @@ -58,6 +91,5 @@ pub async fn execute_sql_query(
Err(e) => Err(e.into()),
}
}
None => Err(anyhow::anyhow!("ClickHouse read-only client is not configured.").into()),
}
}
50 changes: 50 additions & 0 deletions app-server/src/db/projects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ use serde::{Deserialize, Serialize};
use sqlx::{FromRow, PgPool};
use uuid::Uuid;

#[derive(sqlx::Type, Deserialize, Serialize, PartialEq, Clone, Debug, Default)]
#[sqlx(type_name = "deployment_mode")]
pub enum DeploymentMode {
#[default]
CLOUD,
HYBRID,
#[allow(non_camel_case_types)]
SELF_HOST,
}

#[derive(Deserialize, Serialize, FromRow, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ProjectWithWorkspaceBillingInfo {
Expand Down Expand Up @@ -50,3 +60,43 @@ pub async fn get_project_and_workspace_billing_info(

Ok(result)
}

#[derive(Deserialize, Serialize, FromRow, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Workspace {
pub id: Uuid,
pub created_at: DateTime<Utc>,
pub name: String,
pub tier_id: i64,
pub subscription_id: Option<String>,
pub additional_seats: i64,
pub reset_time: DateTime<Utc>,
pub deployment_mode: DeploymentMode,
pub data_plane_url: Option<String>,
}

pub async fn get_workspace_by_project_id(pool: &PgPool, project_id: &Uuid) -> Result<Workspace> {
let result = sqlx::query_as::<_, Workspace>(
"
SELECT
workspaces.id,
workspaces.created_at,
workspaces.name,
workspaces.tier_id,
workspaces.subscription_id,
workspaces.additional_seats,
workspaces.reset_time,
workspaces.deployment_mode,
workspaces.data_plane_url
FROM
workspaces
JOIN projects ON projects.workspace_id = workspaces.id
WHERE
projects.id = $1",
)
.bind(project_id)
.fetch_one(pool)
.await?;

Ok(result)
}
8 changes: 8 additions & 0 deletions app-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,13 @@ fn main() -> anyhow::Result<()> {
}
};

// == HTTP client ==
let http_client = Arc::new(reqwest::Client::new());

let clickhouse_for_http = clickhouse.clone();
let storage_for_http = storage.clone();
let sse_connections_for_http = sse_connections.clone();
let http_client_for_http = http_client.clone();

if !enable_producer() && !enable_consumer() {
log::error!(
Expand Down Expand Up @@ -732,6 +736,7 @@ fn main() -> anyhow::Result<()> {
let storage_for_consumer = storage.clone();
let quickwit_client_for_consumer = quickwit_client.clone();
let pubsub_for_consumer = pubsub.clone();
let http_client_for_consumer = http_client.clone();
let consumer_handle = thread::Builder::new()
.name("consumer".to_string())
.spawn(move || {
Expand Down Expand Up @@ -761,6 +766,7 @@ fn main() -> anyhow::Result<()> {
let ch_clone = clickhouse_for_consumer.clone();
let storage_clone = storage_for_consumer.clone();
let pubsub_clone = pubsub_for_consumer.clone();
let http_client_clone = http_client_for_consumer.clone();

tokio::spawn(async move {
let _handle = worker_handle; // Keep handle alive for the worker's lifetime
Expand All @@ -771,6 +777,7 @@ fn main() -> anyhow::Result<()> {
ch_clone,
storage_clone,
pubsub_clone,
http_client_clone,
)
.await;
});
Expand Down Expand Up @@ -936,6 +943,7 @@ fn main() -> anyhow::Result<()> {
.app_data(web::Data::new(query_engine.clone()))
.app_data(web::Data::new(sse_connections_for_http.clone()))
.app_data(web::Data::new(quickwit_client.clone()))
.app_data(web::Data::new(http_client_for_http.clone()))
// Ingestion endpoints allow both default and ingest-only keys
.service(
web::scope("/v1/browser-sessions").service(
Expand Down
Loading