Skip to content

Commit 35efb17

Browse files
parmesantnikhilsinhaparseableDevdutt Shenoi
authored
feat: API for date_bin query (#1093)
This PR adds a new API to get `date_bin` from a given stream. This API can also be used to find the count(*) for the stream. --------- Co-authored-by: Nikhil Sinha <[email protected]> Co-authored-by: Devdutt Shenoi <[email protected]>
1 parent fadb1f1 commit 35efb17

File tree

6 files changed

+316
-15
lines changed

6 files changed

+316
-15
lines changed

src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ impl ParseableServer for QueryServer {
5252
config
5353
.service(
5454
web::scope(&base_path())
55+
.service(Server::get_date_bin())
5556
.service(Server::get_correlation_webscope())
5657
.service(Server::get_query_factory())
5758
.service(Server::get_liveness_factory())

src/handlers/http/modal/server.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ impl ParseableServer for Server {
8080
.service(Self::get_llm_webscope())
8181
.service(Self::get_oauth_webscope(oidc_client))
8282
.service(Self::get_user_role_webscope())
83+
.service(Self::get_date_bin())
8384
.service(Self::get_metrics_webscope()),
8485
)
8586
.service(Self::get_ingest_otel_factory())
@@ -266,6 +267,10 @@ impl Server {
266267
),
267268
)
268269
}
270+
pub fn get_date_bin() -> Resource {
271+
web::resource("/datebin")
272+
.route(web::post().to(query::get_date_bin).authorize(Action::Query))
273+
}
269274

270275
// get the query factory
271276
// POST "/query" ==> Get results of the SQL query passed in request body

src/handlers/http/query.rs

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
21-
use actix_web::{FromRequest, HttpRequest, Responder};
21+
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
2222
use chrono::{DateTime, Utc};
2323
use datafusion::common::tree_node::TreeNode;
2424
use datafusion::error::DataFusionError;
2525
use datafusion::execution::context::SessionState;
2626
use futures_util::Future;
2727
use http::StatusCode;
28+
use serde::{Deserialize, Serialize};
29+
use serde_json::{json, Value};
2830
use std::collections::HashMap;
2931
use std::pin::Pin;
3032
use std::sync::Arc;
@@ -39,7 +41,7 @@ use crate::event::commit_schema;
3941
use crate::metrics::QUERY_EXECUTE_TIME;
4042
use crate::option::{Mode, CONFIG};
4143
use crate::query::error::ExecuteError;
42-
use crate::query::Query as LogicalQuery;
44+
use crate::query::{DateBinRequest, DateBinResponse, Query as LogicalQuery};
4345
use crate::query::{TableScanVisitor, QUERY_SESSION};
4446
use crate::rbac::Users;
4547
use crate::response::QueryResponse;
@@ -52,7 +54,7 @@ use crate::utils::user_auth_for_query;
5254
use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
5355

5456
/// Query Request through http endpoint.
55-
#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
57+
#[derive(Debug, Deserialize, Serialize, Clone)]
5658
#[serde(rename_all = "camelCase")]
5759
pub struct Query {
5860
pub query: String,
@@ -66,7 +68,7 @@ pub struct Query {
6668
pub filter_tags: Option<Vec<String>>,
6769
}
6870

69-
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
71+
pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpResponse, QueryError> {
7072
let session_state = QUERY_SESSION.state();
7173
let raw_logical_plan = match session_state
7274
.create_logical_plan(&query_request.query)
@@ -81,11 +83,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
8183
.await?
8284
}
8385
};
84-
8586
let time_range =
8687
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
8788

88-
// create a visitor to extract the table names present in query
89+
// Create a visitor to extract the table names present in query
8990
let mut visitor = TableScanVisitor::default();
9091
let _ = raw_logical_plan.visit(&mut visitor);
9192

@@ -103,6 +104,31 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
103104
user_auth_for_query(&permissions, &tables)?;
104105

105106
let time = Instant::now();
107+
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
108+
let date_bin_request = DateBinRequest {
109+
stream: table_name.clone(),
110+
start_time: query_request.start_time.clone(),
111+
end_time: query_request.end_time.clone(),
112+
num_bins: 1,
113+
};
114+
let date_bin_records = date_bin_request.get_bin_density().await?;
115+
let response = if query_request.fields {
116+
json!({
117+
"fields": vec![&column_name],
118+
"records": vec![json!({column_name: date_bin_records[0].log_count})]
119+
})
120+
} else {
121+
Value::Array(vec![json!({column_name: date_bin_records[0].log_count})])
122+
};
123+
124+
let time = time.elapsed().as_secs_f64();
125+
126+
QUERY_EXECUTE_TIME
127+
.with_label_values(&[&table_name])
128+
.observe(time);
129+
130+
return Ok(HttpResponse::Ok().json(response));
131+
}
106132
let (records, fields) = query.execute(table_name.clone()).await?;
107133

108134
let response = QueryResponse {
@@ -122,6 +148,24 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
122148
Ok(response)
123149
}
124150

151+
pub async fn get_date_bin(
152+
req: HttpRequest,
153+
date_bin: Json<DateBinRequest>,
154+
) -> Result<impl Responder, QueryError> {
155+
let creds = extract_session_key_from_req(&req)?;
156+
let permissions = Users.get_permissions(&creds);
157+
158+
// does user have access to table?
159+
user_auth_for_query(&permissions, &[date_bin.stream.clone()])?;
160+
161+
let date_bin_records = date_bin.get_bin_density().await?;
162+
163+
Ok(web::Json(DateBinResponse {
164+
fields: vec!["date_bin_timestamp".into(), "log_count".into()],
165+
records: date_bin_records,
166+
}))
167+
}
168+
125169
pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), QueryError> {
126170
if CONFIG.options.mode == Mode::Query {
127171
for table in tables {

0 commit comments

Comments
 (0)