Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ static-files = "0.2"
thiserror = "2.0"
ulid = { version = "1.0", features = ["serde"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }
futures-core = "0.3.31"

[build-dependencies]
cargo_toml = "0.21"
Expand Down
7 changes: 2 additions & 5 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

use std::{io::ErrorKind, sync::Arc};
use std::sync::Arc;

use chrono::{DateTime, Local, NaiveTime, Utc};
use column::Column;
Expand Down Expand Up @@ -259,10 +259,7 @@ async fn create_manifest(
.date_naive()
.and_time(
NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999)
.ok_or(IOError::new(
ErrorKind::Other,
"Failed to create upper bound for manifest",
))?,
.ok_or(IOError::other("Failed to create upper bound for manifest"))?,
)
.and_utc();

Expand Down
8 changes: 5 additions & 3 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ impl FlightService for AirServiceImpl {
}

async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;
let key = extract_session_key(req.metadata())
.map_err(|e| Status::unauthenticated(e.to_string()))?;

let ticket = get_query_from_ticket(&req)?;
let ticket =
get_query_from_ticket(&req).map_err(|e| Status::invalid_argument(e.to_string()))?;

info!("query requested to airplane: {:?}", ticket);

Expand Down Expand Up @@ -246,7 +248,7 @@ impl FlightService for AirServiceImpl {
.observe(time);

// Airplane takes off 🛫
out
out.map_err(|e| *e)
}

async fn do_put(
Expand Down
189 changes: 147 additions & 42 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,43 @@
*
*/

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures::stream::once;
use futures::{future, Stream, StreamExt};
use futures_util::Future;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use serde_json::json;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tracing::error;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::error::ExecuteError;
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::{QueryResponse, TIME_ELAPSED_HEADER};
use crate::response::QueryResponse;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};
use crate::utils::user_auth_for_datasets;

const TIME_ELAPSED_HEADER: &str = "p-time-elapsed";
/// Query Request through http endpoint.
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
Expand All @@ -62,6 +65,8 @@ pub struct Query {
#[serde(skip)]
pub fields: bool,
#[serde(skip)]
pub streaming: bool,
#[serde(skip)]
pub filter_tags: Option<Vec<String>>,
}

Expand All @@ -73,7 +78,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
{
Ok(raw_logical_plan) => raw_logical_plan,
Err(_) => {
//if logical plan creation fails, create streams and try again
create_streams_for_querier().await;
session_state
.create_logical_plan(&query_request.query)
Expand All @@ -83,10 +87,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

// Create a visitor to extract the table names present in query
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();
update_schema_when_distributed(&tables).await?;
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
Expand All @@ -101,56 +103,154 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
user_auth_for_datasets(&permissions, &tables)?;

let time = Instant::now();
// Intercept `count(*)`` queries and use the counts API

if let Some(column_name) = query.is_logical_plan_count_without_filters() {
let counts_req = CountsRequest {
stream: table_name.clone(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let count_records = counts_req.get_bin_density().await?;
// NOTE: this should not panic, since there is atleast one bin, always
let count = count_records[0].count;
let response = if query_request.fields {
json!({
"fields": [&column_name],
"records": [json!({column_name: count})]
})
} else {
Value::Array(vec![json!({column_name: count})])
};
return handle_count_query(&query_request, &table_name, column_name, time).await;
}

let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();
if !query_request.streaming {
return handle_non_streaming_query(query, &table_name, &query_request, time).await;
}

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);
handle_streaming_query(query, &table_name, &query_request, time).await
}

return Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response));
}
async fn handle_count_query(
query_request: &Query,
table_name: &str,
column_name: &str,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let counts_req = CountsRequest {
stream: table_name.to_string(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let count_records = counts_req.get_bin_density().await?;
let count = count_records[0].count;
let response = if query_request.fields {
json!({
"fields": [column_name],
"records": [json!({column_name: count})]
})
} else {
serde_json::Value::Array(vec![json!({column_name: count})])
};

let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

let (records, fields) = execute(query, &table_name).await?;
QUERY_EXECUTE_TIME
.with_label_values(&[table_name])
.observe(time);

Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
}

async fn handle_non_streaming_query(
query: LogicalQuery,
table_name: &str,
query_request: &Query,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let (records, fields) = execute(query, table_name).await?;
let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[table_name])
.observe(time);
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
total_time,
}
.to_http()?;
.to_json()?;
Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
}

async fn handle_streaming_query(
query: LogicalQuery,
table_name: &str,
query_request: &Query,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let (records_stream, fields) = execute_stream(query, table_name).await?;
let fields = fields.clone();
let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.with_label_values(&[table_name])
.observe(time);

Ok(response)
let send_null = query_request.send_null;
let with_fields = query_request.fields;

let stream = if with_fields {
// send the fields as an initial chunk
let fields_json = serde_json::json!({
"fields": fields
})
.to_string();

// stream the records without fields
let records_stream = records_stream.map(move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
records: vec![batch],
fields: Vec::new(),
fill_null: send_null,
with_fields: false,
}
.to_json()
.unwrap_or_else(|e| {
error!("Failed to parse record batch into JSON: {}", e);
json!({})
});
Ok(Bytes::from(format!("{}\n", response)))
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
});

// Combine the initial fields chunk with the records stream
let fields_chunk = once(future::ok::<_, actix_web::Error>(Bytes::from(format!(
"{}\n",
fields_json
))));
Box::pin(fields_chunk.chain(records_stream))
as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
} else {
let stream = records_stream.map(move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
records: vec![batch],
fields: fields.clone(),
fill_null: send_null,
with_fields,
}
.to_json()
.unwrap_or_else(|e| {
error!("Failed to parse record batch into JSON: {}", e);
json!({})
});
Ok(Bytes::from(format!("{}\n", response)))
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
});

Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
};

Ok(HttpResponse::Ok()
.content_type("application/x-ndjson")
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.streaming(stream))
}

pub async fn get_counts(
Expand Down Expand Up @@ -222,6 +322,10 @@ impl FromRequest for Query {
query.send_null = params.get("sendNull").cloned().unwrap_or(false);
}

if !query.streaming {
query.streaming = params.get("streaming").cloned().unwrap_or(false);
}

Ok(query)
};

Expand Down Expand Up @@ -285,6 +389,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
send_null: query.send_null,
start_time: start_time.to_rfc3339(),
end_time: end_time.to_rfc3339(),
streaming: query.streaming,
};

Some(q)
Expand Down
18 changes: 10 additions & 8 deletions src/handlers/livetail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ impl FlightService for FlightServiceImpl {
}

async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;
let key = extract_session_key(req.metadata()).map_err(|e| *e)?;
let ticket: serde_json::Value = serde_json::from_slice(&req.into_inner().ticket)
.map_err(|err| Status::internal(err.to_string()))?;
let stream = extract_stream(&ticket)?;
let stream = extract_stream(&ticket).map_err(|e| *e)?;
info!("livetail requested for stream {}", stream);
match Users.authorize(key, rbac::role::Action::Query, Some(stream), None) {
rbac::Response::Authorized => (),
Expand Down Expand Up @@ -232,16 +232,16 @@ pub fn server() -> impl Future<Output = Result<(), Box<dyn std::error::Error + S
}
}

pub fn extract_stream(body: &serde_json::Value) -> Result<&str, Status> {
pub fn extract_stream(body: &serde_json::Value) -> Result<&str, Box<Status>> {
body.as_object()
.ok_or(Status::invalid_argument("expected object in request body"))?
.ok_or_else(|| Box::new(Status::invalid_argument("expected object in request body")))?
.get("stream")
.ok_or(Status::invalid_argument("stream key value is not provided"))?
.ok_or_else(|| Box::new(Status::invalid_argument("stream key value is not provided")))?
.as_str()
.ok_or(Status::invalid_argument("stream key value is invalid"))
.ok_or_else(|| Box::new(Status::invalid_argument("stream key value is invalid")))
}

pub fn extract_session_key(headers: &MetadataMap) -> Result<SessionKey, Status> {
pub fn extract_session_key(headers: &MetadataMap) -> Result<SessionKey, Box<Status>> {
// Extract username and password from the request using basic auth extractor.
let basic = extract_basic_auth(headers).map(|creds| SessionKey::BasicAuth {
username: creds.user_id,
Expand All @@ -261,7 +261,9 @@ pub fn extract_session_key(headers: &MetadataMap) -> Result<SessionKey, Status>
return Ok(SessionKey::SessionId(session));
}

Err(Status::unauthenticated("No authentication method supplied"))
Err(Box::new(Status::unauthenticated(
"No authentication method supplied",
)))
}

fn extract_basic_auth(header: &MetadataMap) -> Option<Credentials> {
Expand Down
Loading
Loading