Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ static-files = "0.2"
thiserror = "2.0"
ulid = { version = "1.0", features = ["serde"] }
xxhash-rust = { version = "0.8", features = ["xxh3"] }
futures-core = "0.3.31"

[build-dependencies]
cargo_toml = "0.21"
Expand Down
189 changes: 147 additions & 42 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,43 @@
*
*/

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use futures::stream::once;
use futures::{future, Stream, StreamExt};
use futures_util::Future;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use serde_json::json;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tracing::error;

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::error::ExecuteError;
use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{execute, execute_stream, CountsRequest, CountsResponse, Query as LogicalQuery};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::Users;
use crate::response::{QueryResponse, TIME_ELAPSED_HEADER};
use crate::response::QueryResponse;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};
use crate::utils::user_auth_for_datasets;

const TIME_ELAPSED_HEADER: &str = "p-time-elapsed";
/// Query Request through http endpoint.
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
Expand All @@ -62,6 +65,8 @@ pub struct Query {
#[serde(skip)]
pub fields: bool,
#[serde(skip)]
pub streaming: bool,
#[serde(skip)]
pub filter_tags: Option<Vec<String>>,
}

Expand All @@ -73,7 +78,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
{
Ok(raw_logical_plan) => raw_logical_plan,
Err(_) => {
//if logical plan creation fails, create streams and try again
create_streams_for_querier().await;
session_state
.create_logical_plan(&query_request.query)
Expand All @@ -83,10 +87,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

// Create a visitor to extract the table names present in query
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();
update_schema_when_distributed(&tables).await?;
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
Expand All @@ -101,56 +103,154 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
user_auth_for_datasets(&permissions, &tables)?;

let time = Instant::now();
// Intercept `count(*)`` queries and use the counts API

if let Some(column_name) = query.is_logical_plan_count_without_filters() {
let counts_req = CountsRequest {
stream: table_name.clone(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let count_records = counts_req.get_bin_density().await?;
// NOTE: this should not panic, since there is atleast one bin, always
let count = count_records[0].count;
let response = if query_request.fields {
json!({
"fields": [&column_name],
"records": [json!({column_name: count})]
})
} else {
Value::Array(vec![json!({column_name: count})])
};
return handle_count_query(&query_request, &table_name, column_name, time).await;
}

let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();
if !query_request.streaming {
return handle_non_streaming_query(query, &table_name, &query_request, time).await;
}

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);
handle_streaming_query(query, &table_name, &query_request, time).await
}

return Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response));
}
async fn handle_count_query(
query_request: &Query,
table_name: &str,
column_name: &str,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let counts_req = CountsRequest {
stream: table_name.to_string(),
start_time: query_request.start_time.clone(),
end_time: query_request.end_time.clone(),
num_bins: 1,
};
let count_records = counts_req.get_bin_density().await?;
let count = count_records[0].count;
let response = if query_request.fields {
json!({
"fields": [column_name],
"records": [json!({column_name: count})]
})
} else {
serde_json::Value::Array(vec![json!({column_name: count})])
};

let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

let (records, fields) = execute(query, &table_name).await?;
QUERY_EXECUTE_TIME
.with_label_values(&[table_name])
.observe(time);

Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
}

async fn handle_non_streaming_query(
query: LogicalQuery,
table_name: &str,
query_request: &Query,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let (records, fields) = execute(query, table_name).await?;
let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[table_name])
.observe(time);
let response = QueryResponse {
records,
fields,
fill_null: query_request.send_null,
with_fields: query_request.fields,
total_time,
}
.to_http()?;
.to_json()?;
Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.json(response))
}

async fn handle_streaming_query(
query: LogicalQuery,
table_name: &str,
query_request: &Query,
time: Instant,
) -> Result<HttpResponse, QueryError> {
let (records_stream, fields) = execute_stream(query, table_name).await?;
let fields = fields.clone();
let total_time = format!("{:?}", time.elapsed());
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.with_label_values(&[table_name])
.observe(time);

Ok(response)
let send_null = query_request.send_null;
let with_fields = query_request.fields;

let stream = if with_fields {
// send the fields as an initial chunk
let fields_json = serde_json::json!({
"fields": fields
})
.to_string();

// stream the records without fields
let records_stream = records_stream.map(move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
records: vec![batch],
fields: Vec::new(),
fill_null: send_null,
with_fields: false,
}
.to_json()
.unwrap_or_else(|e| {
error!("Failed to parse record batch into JSON: {}", e);
json!({})
});
Ok(Bytes::from(format!("{}\n", response)))
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
});

// Combine the initial fields chunk with the records stream
let fields_chunk = once(future::ok::<_, actix_web::Error>(Bytes::from(format!(
"{}\n",
fields_json
))));
Box::pin(fields_chunk.chain(records_stream))
as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
} else {
let stream = records_stream.map(move |batch_result| match batch_result {
Ok(batch) => {
let response = QueryResponse {
records: vec![batch],
fields: fields.clone(),
fill_null: send_null,
with_fields,
}
.to_json()
.unwrap_or_else(|e| {
error!("Failed to parse record batch into JSON: {}", e);
json!({})
});
Ok(Bytes::from(format!("{}\n", response)))
}
Err(e) => Err(actix_web::error::ErrorInternalServerError(e)),
});

Box::pin(stream) as Pin<Box<dyn Stream<Item = Result<Bytes, actix_web::Error>>>>
};

Ok(HttpResponse::Ok()
.content_type("application/x-ndjson")
.insert_header((TIME_ELAPSED_HEADER, total_time.as_str()))
.streaming(stream))
}

pub async fn get_counts(
Expand Down Expand Up @@ -222,6 +322,10 @@ impl FromRequest for Query {
query.send_null = params.get("sendNull").cloned().unwrap_or(false);
}

if !query.streaming {
query.streaming = params.get("streaming").cloned().unwrap_or(false);
}

Ok(query)
};

Expand Down Expand Up @@ -285,6 +389,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
send_null: query.send_null,
start_time: start_time.to_rfc3339(),
end_time: end_time.to_rfc3339(),
streaming: query.streaming,
};

Some(q)
Expand Down
33 changes: 32 additions & 1 deletion src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::error::DataFusionError;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::SessionStateBuilder;
use datafusion::execution::{SendableRecordBatchStream, SessionStateBuilder};
use datafusion::logical_expr::expr::Alias;
use datafusion::logical_expr::{
Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan,
Expand Down Expand Up @@ -78,6 +78,17 @@ pub async fn execute(
.expect("The Join should have been successful")
}

pub async fn execute_stream(
query: Query,
stream_name: &str,
) -> Result<(SendableRecordBatchStream, Vec<String>), ExecuteError> {
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
QUERY_RUNTIME
.spawn(async move { query.execute_stream(time_partition.as_ref()).await })
.await
.expect("The Join should have been successful")
}

// A query request by client
#[derive(Debug)]
pub struct Query {
Expand Down Expand Up @@ -182,6 +193,26 @@ impl Query {
Ok((results, fields))
}

// execute stream
pub async fn execute_stream(
&self,
time_partition: Option<&String>,
) -> Result<(SendableRecordBatchStream, Vec<String>), ExecuteError> {
let df = QUERY_SESSION
.execute_logical_plan(self.final_logical_plan(time_partition))
.await?;
let fields = df
.schema()
.fields()
.iter()
.map(|f| f.name())
.cloned()
.collect_vec();
let stream = df.execute_stream().await?;

Ok((stream, fields))
}

pub async fn get_dataframe(
&self,
time_partition: Option<&String>,
Expand Down
10 changes: 2 additions & 8 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,20 @@
*/

use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use actix_web::HttpResponse;
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};
use tracing::info;

pub const TIME_ELAPSED_HEADER: &str = "p-time-elapsed";

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
pub fill_null: bool,
pub with_fields: bool,
pub total_time: String,
}

impl QueryResponse {
pub fn to_http(&self) -> Result<HttpResponse, QueryError> {
pub fn to_json(&self) -> Result<Value, QueryError> {
info!("{}", "Returning query results");
let mut json_records = record_batches_to_json(&self.records)?;

Expand All @@ -58,8 +54,6 @@ impl QueryResponse {
Value::Array(values)
};

Ok(HttpResponse::Ok()
.insert_header((TIME_ELAPSED_HEADER, self.total_time.as_str()))
.json(response))
Ok(response)
}
}
Loading