diff --git a/src/cli.rs b/src/cli.rs index 82cd27d5e..aab90eb1e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -102,13 +102,6 @@ pub struct Cli { pub ms_clarity_tag: Option, - // Trino vars - pub trino_endpoint: Option, - pub trino_username: Option, - pub trino_auth: Option, - pub trino_schema: Option, - pub trino_catalog: Option, - // Kafka specific env vars pub kafka_topics: Option, pub kafka_host: Option, @@ -155,13 +148,6 @@ impl Cli { pub const MAX_DISK_USAGE: &'static str = "max-disk-usage"; pub const MS_CLARITY_TAG: &'static str = "ms-clarity-tag"; - // Trino specific env vars - pub const TRINO_ENDPOINT: &'static str = "p-trino-end-point"; - pub const TRINO_CATALOG_NAME: &'static str = "p-trino-catalog-name"; - pub const TRINO_USER_NAME: &'static str = "p-trino-user-name"; - pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization"; - pub const TRINO_SCHEMA: &'static str = "p-trino-schema"; - // Kafka specific env vars pub const KAFKA_TOPICS: &'static str = "kafka-topics"; pub const KAFKA_HOST: &'static str = "kafka-host"; @@ -252,41 +238,6 @@ impl Cli { .value_name("STRING") .help("Audit logger password"), ) - .arg( - Arg::new(Self::TRINO_ENDPOINT) - .long(Self::TRINO_ENDPOINT) - .env("P_TRINO_ENDPOINT") - .value_name("STRING") - .help("Address and port for Trino HTTP(s) server"), - ) - .arg( - Arg::new(Self::TRINO_CATALOG_NAME) - .long(Self::TRINO_CATALOG_NAME) - .env("P_TRINO_CATALOG_NAME") - .value_name("STRING") - .help("Name of the catalog to be queried (Translates to X-Trino-Catalog)"), - ) - .arg( - Arg::new(Self::TRINO_SCHEMA) - .long(Self::TRINO_SCHEMA) - .env("P_TRINO_SCHEMA") - .value_name("STRING") - .help("Name of schema to be queried (Translates to X-Trino-Schema)"), - ) - .arg( - Arg::new(Self::TRINO_USER_NAME) - .long(Self::TRINO_USER_NAME) - .env("P_TRINO_USER_NAME") - .value_name("STRING") - .help("Name of Trino user (Translates to X-Trino-User)"), - ) - .arg( - Arg::new(Self::TRINO_AUTHORIZATION) - .long(Self::TRINO_AUTHORIZATION) - .env("P_TRINO_AUTHORIZATION") - .value_name("STRING") - .help("Base 64 encoded in the format username:password"), - ) .arg( Arg::new(Self::TLS_CERT) .long(Self::TLS_CERT) @@ -551,12 +502,6 @@ impl FromArgMatches for Cli { } fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { - self.trino_catalog = m.get_one::(Self::TRINO_CATALOG_NAME).cloned(); - self.trino_endpoint = m.get_one::(Self::TRINO_ENDPOINT).cloned(); - self.trino_auth = m.get_one::(Self::TRINO_AUTHORIZATION).cloned(); - self.trino_schema = m.get_one::(Self::TRINO_SCHEMA).cloned(); - self.trino_username = m.get_one::(Self::TRINO_USER_NAME).cloned(); - self.kafka_topics = m.get_one::(Self::KAFKA_TOPICS).cloned(); self.kafka_security_protocol = m .get_one::(Self::KAFKA_SECURITY_PROTOCOL) diff --git a/src/handlers/http/about.rs b/src/handlers/http/about.rs index 40113f885..79f9c22b2 100644 --- a/src/handlers/http/about.rs +++ b/src/handlers/http/about.rs @@ -17,7 +17,7 @@ */ use actix_web::web::Json; -use serde_json::json; +use serde_json::{json, Value}; use crate::{ about::{self, get_latest_release}, @@ -45,7 +45,7 @@ use std::path::PathBuf; /// "path": store_endpoint /// } /// } -pub async fn about() -> Json { +pub async fn about() -> Json { let meta = StorageMetadata::global(); let current_release = about::current(); @@ -86,16 +86,6 @@ pub async fn about() -> Json { }; let ms_clarity_tag = &CONFIG.parseable.ms_clarity_tag; - let mut query_engine = "Parseable".to_string(); - if let (Some(_), Some(_), Some(_), Some(_)) = ( - CONFIG.parseable.trino_endpoint.as_ref(), - CONFIG.parseable.trino_catalog.as_ref(), - CONFIG.parseable.trino_schema.as_ref(), - CONFIG.parseable.trino_username.as_ref(), - ) { - // Trino is enabled - query_engine = "Trino".to_string(); - } Json(json!({ "version": current_version, @@ -119,7 +109,5 @@ pub async fn about() -> Json { "analytics": { "clarityTag": ms_clarity_tag }, - "queryEngine": query_engine - })) } diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 99ab350c1..9a95aa26d 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -42,7 +42,6 @@ pub mod oidc; pub mod query; pub mod rbac; pub mod role; -pub mod trino; pub mod users; pub const MAX_EVENT_PAYLOAD_SIZE: usize = 10485760; pub const API_BASE_PATH: &str = "api"; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 1c95bd7ee..e6b853afc 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -53,7 +53,6 @@ impl ParseableServer for QueryServer { web::scope(&base_path()) .service(Server::get_correlation_webscope()) .service(Server::get_query_factory()) - .service(Server::get_trino_factory()) .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) .service(Server::get_about_factory()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index e1302c66f..73301006a 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -23,7 +23,6 @@ use crate::handlers::http::about; use crate::handlers::http::base_path; use crate::handlers::http::health_check; use crate::handlers::http::query; -use crate::handlers::http::trino; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; use crate::hottier::HotTierManager; @@ -69,7 +68,6 @@ impl ParseableServer for Server { web::scope(&base_path()) .service(Self::get_correlation_webscope()) .service(Self::get_query_factory()) - .service(Self::get_trino_factory()) .service(Self::get_ingest_factory()) .service(Self::get_liveness_factory()) .service(Self::get_readiness_factory()) @@ -163,12 +161,6 @@ impl ParseableServer for Server { } impl Server { - // get the trino factory - pub fn get_trino_factory() -> Resource { - web::resource("/trinoquery") - .route(web::post().to(trino::trino_query).authorize(Action::Query)) - } - pub fn get_metrics_webscope() -> Scope { web::scope("/metrics").service( web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)), diff --git a/src/handlers/http/trino.rs b/src/handlers/http/trino.rs deleted file mode 100644 index f764842bb..000000000 --- a/src/handlers/http/trino.rs +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use std::{collections::HashMap, future::Future, pin::Pin}; - -use actix_web::{ - web::{self, Json}, - FromRequest, HttpRequest, Responder, -}; -use http::HeaderMap; -use serde_json::Value; -use tracing::warn; -use trino_response::QueryResponse; - -use crate::{ - handlers::{AUTHORIZATION_KEY, TRINO_CATALOG, TRINO_SCHEMA, TRINO_USER}, - option::CONFIG, -}; - -use super::query::QueryError; - -#[derive(Debug, serde::Deserialize, serde::Serialize, Default)] -#[serde(rename_all = "camelCase")] -pub struct QueryResultsTrino { - pub id: String, - pub next_uri: Option, - pub stats: Value, - pub error: Option, - pub warnings: Option, - pub columns: Option, - pub data: Option, -} - -/// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize, serde::Serialize)] -#[serde(rename_all = "camelCase")] -pub struct TrinoQuery { - pub query: String, - #[serde(skip)] - pub fields: bool, -} - -impl FromRequest for TrinoQuery { - type Error = actix_web::Error; - type Future = Pin>>>; - - fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { - let query = Json::::from_request(req, payload); - let params = web::Query::>::from_request(req, payload) - .into_inner() - .map(|x| x.0) - .unwrap_or_default(); - - let fut = async move { - let mut query = query.await?.into_inner(); - // format output json to include field names - query.fields = params.get("fields").cloned().unwrap_or(false); - - Ok(query) - }; - - Box::pin(fut) - } -} - -pub async fn trino_query( - _req: HttpRequest, - query_request: TrinoQuery, -) -> Result { - let sql = query_request.query; - - let (endpoint, catalog, schema, username) = - if let (Some(endpoint), Some(catalog), Some(schema), Some(username)) = ( - CONFIG.parseable.trino_endpoint.as_ref(), - CONFIG.parseable.trino_catalog.as_ref(), - CONFIG.parseable.trino_schema.as_ref(), - CONFIG.parseable.trino_username.as_ref(), - ) { - let endpoint = if endpoint.ends_with('/') { - &endpoint[0..endpoint.len() - 1] - } else { - endpoint - }; - ( - endpoint.to_string(), - catalog.to_string(), - schema.to_string(), - username.to_string(), - ) - } else { - return Err(QueryError::Anyhow(anyhow::Error::msg( - "Trino endpoint, catalog, schema, or username not set in config", - ))); - }; - let auth = &CONFIG.parseable.trino_auth; - - trino_init( - &sql, - query_request.fields, - &endpoint, - &catalog, - &schema, - &username, - auth, - ) - .await? - .to_http() -} - -pub async fn trino_get( - with_fields: bool, - query_results: QueryResultsTrino, -) -> Result { - // initial check for nextUri - if let Some(mut next_uri) = query_results.next_uri { - let mut records: Vec = Vec::new(); - let mut fields: Vec = Vec::new(); - - let client = reqwest::Client::new(); - - // loop will handle batches being sent by server - loop { - let res: QueryResultsTrino = client.get(next_uri.clone()).send().await?.json().await?; - - // check if columns and data present, collate - // if len of fields is not 0, then don't overwrite - if fields.is_empty() { - if let Some(columns) = res.columns { - columns.as_array().unwrap().iter().for_each(|row| { - let name = row - .as_object() - .unwrap() - .get("name") - .unwrap() - .as_str() - .unwrap() - .to_string(); - fields.push(name); - }); - } - } - - if let Some(data) = res.data { - if let Some(data) = data.as_array() { - data.iter().for_each(|d| records.push(d.to_owned())); - } - } - - // check if more data present - if res.next_uri.is_some() { - // more data to process - next_uri = res.next_uri.unwrap().to_string(); - } else { - // check if state is FINISHED or FAILED, then return - let state = res - .stats - .as_object() - .unwrap() - .get("state") - .unwrap() - .as_str() - .unwrap(); - - match state { - "FAILED" => { - // extract error - if res.error.is_some() { - let message = res - .error - .unwrap() - .as_object() - .unwrap() - .get("message") - .unwrap() - .to_string(); - return Err(QueryError::Anyhow(anyhow::Error::msg(message))); - } else { - return Err(QueryError::Anyhow(anyhow::Error::msg("FAILED"))); - } - } - "FINISHED" => { - // break - break; - } - _ => { - warn!("state '{state}' not covered"); - break; - } - } - } - } - - Ok(QueryResponse { - trino_records: Some(records), - fields, - with_fields, - }) - } else { - // initial check for nex_uri retuned None - // check for error messages - Err(QueryError::Anyhow(anyhow::Error::msg( - "Did not receive nexUri for initial QueryResults", - ))) - } -} - -#[allow(clippy::too_many_arguments)] -/// This is the entry point for a trino bound request -/// The first POST request will happen here and the subsequent GET requests will happen in `trino_get()` -pub async fn trino_init( - query: &str, - fields: bool, - endpoint: &str, - catalog: &str, - schema: &str, - user: &str, - auth: &Option, -) -> Result { - let mut headers = HeaderMap::new(); - headers.insert(TRINO_SCHEMA, schema.parse().unwrap()); - headers.insert(TRINO_CATALOG, catalog.parse().unwrap()); - headers.insert(TRINO_USER, user.parse().unwrap()); - - // add password if present - if let Some(auth) = auth { - headers.insert(AUTHORIZATION_KEY, format!("Basic {auth}").parse().unwrap()); - } - - let response: QueryResultsTrino = match reqwest::Client::new() - .post(format!("{endpoint}/v1/statement")) - .body(query.to_owned()) - .headers(headers) - .send() - .await - { - Ok(r) => r.json().await?, - Err(e) => return Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))), - }; - - trino_get(fields, response).await -} - -mod trino_response { - use actix_web::{web, Responder}; - use itertools::Itertools; - use serde_json::{json, Map, Value}; - use tracing::info; - - use crate::handlers::http::query::QueryError; - - pub struct QueryResponse { - pub trino_records: Option>, - pub fields: Vec, - pub with_fields: bool, - } - - impl QueryResponse { - pub fn to_http(&self) -> Result { - info!("{}", "Returning query results"); - let values = if let Some(trino_records) = self.trino_records.clone() { - // trino_records = Vec - let mut json_records: Vec> = Vec::new(); - for array in trino_records.into_iter() { - let mut m: Map = Map::new(); - for (key, val) in self - .fields - .clone() - .into_iter() - .zip(array.as_array().unwrap()) - { - m.insert(key, val.clone()); - } - json_records.push(m); - } - - json_records.into_iter().map(Value::Object).collect_vec() - } else { - return Err(QueryError::Anyhow(anyhow::Error::msg( - "QueryResponse made improperly", - ))); - }; - - let response = if self.with_fields { - json!({ - "fields": self.fields, - "records": values - }) - } else { - Value::Array(values) - }; - - Ok(web::Json(response)) - } - } -} diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index 656a71daa..2f0ae762f 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -34,11 +34,6 @@ const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; const USER_COOKIE_NAME: &str = "username"; -//constants for trino -const TRINO_SCHEMA: &str = "x-trino-schema"; -const TRINO_CATALOG: &str = "x-trino-catalog"; -const TRINO_USER: &str = "x-trino-user"; - // constants for log Source values for known sources and formats const LOG_SOURCE_KINESIS: &str = "kinesis";