Skip to content

Commit 30e7f96

Browse files
authored
Query param for attaching fields to response json (#469)
This PR adds query parameters to format response output. Mainly duplicating existing fill null option to query param and adding a withFields param options. Attaching schema to output can benefit cases where client wants to display table but the table API wants all table columns to be defined first. Example: {{endpoint}}/api/v1/query?fields=true "query": "select min(p_timestamp), max(p_timestamp) from {{stream_name}}", ``` { "fields": [ "MIN(app.p_timestamp)", "MAX(app.p_timestamp)" ], "records": [ { "MAX(app.p_timestamp)": "2023-08-07T05:55:59.738", "MIN(app.p_timestamp)": "2023-08-07T05:55:20.335" } ] } ```
1 parent cdeb9e8 commit 30e7f96

File tree

2 files changed

+36
-22
lines changed

2 files changed

+36
-22
lines changed

server/src/handlers/http/query.rs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818

1919
use actix_web::error::ErrorUnauthorized;
2020
use actix_web::http::header::ContentType;
21-
use actix_web::web::Json;
21+
use actix_web::web::{self, Json};
2222
use actix_web::{FromRequest, HttpRequest, Responder};
2323
use actix_web_httpauth::extractors::basic::BasicAuth;
2424
use futures_util::Future;
2525
use http::StatusCode;
2626
use serde_json::Value;
27+
use std::collections::HashMap;
2728
use std::pin::Pin;
2829
use std::time::Instant;
2930

@@ -36,22 +37,37 @@ use crate::rbac::role::{Action, Permission};
3637
use crate::rbac::Users;
3738
use crate::response::QueryResponse;
3839

39-
pub async fn query(query: Query) -> Result<impl Responder, QueryError> {
40+
pub async fn query(
41+
query: Query,
42+
web::Query(params): web::Query<HashMap<String, bool>>,
43+
) -> Result<impl Responder, QueryError> {
4044
let time = Instant::now();
4145

46+
// format output json to include field names
47+
let with_fields = params.get("fields").cloned().unwrap_or(false);
48+
// Fill missing columns with null
49+
let fill_null = params
50+
.get("fillNull")
51+
.cloned()
52+
.or(Some(query.fill_null))
53+
.unwrap_or(false);
54+
4255
let storage = CONFIG.storage().get_object_store();
43-
let query_result = query.execute(storage).await;
44-
let query_result = query_result
45-
.map(|(records, fields)| QueryResponse::new(records, fields, query.fill_null))
46-
.map(|response| response.to_http())
47-
.map_err(|e| e.into());
56+
let (records, fields) = query.execute(storage).await?;
57+
let response = QueryResponse {
58+
records,
59+
fields,
60+
fill_null,
61+
with_fields,
62+
}
63+
.to_http();
4864

4965
let time = time.elapsed().as_secs_f64();
5066
QUERY_EXECUTE_TIME
5167
.with_label_values(&[query.stream_name.as_str()])
5268
.observe(time);
5369

54-
query_result
70+
Ok(response)
5571
}
5672

5773
impl FromRequest for Query {

server/src/response.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,20 @@
1616
*
1717
*/
1818

19-
use actix_web::http::StatusCode;
2019
use actix_web::{web, Responder};
2120
use datafusion::arrow::json::writer::record_batches_to_json_rows;
2221
use datafusion::arrow::record_batch::RecordBatch;
2322
use itertools::Itertools;
24-
use serde_json::Value;
23+
use serde_json::{json, Value};
2524

2625
pub struct QueryResponse {
27-
pub code: StatusCode,
2826
pub records: Vec<RecordBatch>,
2927
pub fields: Vec<String>,
3028
pub fill_null: bool,
29+
pub with_fields: bool,
3130
}
3231

3332
impl QueryResponse {
34-
pub fn new(records: Vec<RecordBatch>, fields: Vec<String>, fill_null: bool) -> Self {
35-
Self {
36-
code: StatusCode::OK,
37-
records,
38-
fields,
39-
fill_null,
40-
}
41-
}
42-
4333
pub fn to_http(&self) -> impl Responder {
4434
log::info!("{}", "Returning query results");
4535
let records: Vec<&RecordBatch> = self.records.iter().collect();
@@ -53,8 +43,16 @@ impl QueryResponse {
5343
}
5444
}
5545
}
56-
5746
let values = json_records.into_iter().map(Value::Object).collect_vec();
58-
web::Json(values)
47+
let response = if self.with_fields {
48+
json!({
49+
"fields": self.fields,
50+
"records": values
51+
})
52+
} else {
53+
Value::Array(values)
54+
};
55+
56+
web::Json(response)
5957
}
6058
}

0 commit comments

Comments
 (0)