Skip to content

Commit bb2652c

Browse files
authored
Merge null to fields that are undefined (#291)
1 parent 7a47739 commit bb2652c

File tree

3 files changed

+56
-24
lines changed

3 files changed

+56
-24
lines changed

server/src/handlers/event.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
*/
1818

19-
use actix_web::{web, HttpRequest, HttpResponse};
19+
use actix_web::{web, HttpRequest, HttpResponse, Responder};
2020
use serde_json::Value;
2121
use std::time::Instant;
2222

@@ -33,17 +33,28 @@ use self::error::{PostError, QueryError};
3333
const PREFIX_TAGS: &str = "x-p-tag-";
3434
const PREFIX_META: &str = "x-p-meta-";
3535
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
36+
const FILL_NULL_OPTION_KEY: &str = "fill_null";
3637
const SEPARATOR: char = '^';
3738

38-
pub async fn query(_req: HttpRequest, json: web::Json<Value>) -> Result<HttpResponse, QueryError> {
39+
pub async fn query(
40+
_req: HttpRequest,
41+
json: web::Json<Value>,
42+
) -> Result<impl Responder, QueryError> {
3943
let time = Instant::now();
4044
let json = json.into_inner();
45+
46+
let fill_null = json
47+
.as_object()
48+
.and_then(|map| map.get(FILL_NULL_OPTION_KEY))
49+
.and_then(|value| value.as_bool())
50+
.unwrap_or_default();
51+
4152
let query = Query::parse(json)?;
4253

4354
let storage = CONFIG.storage().get_object_store();
4455
let query_result = query.execute(storage).await;
4556
let query_result = query_result
46-
.map(Into::<QueryResponse>::into)
57+
.map(|(records, fields)| QueryResponse::new(records, fields, fill_null))
4758
.map(|response| response.to_http())
4859
.map_err(|e| e.into());
4960

server/src/query.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl Query {
8888
pub async fn execute(
8989
&self,
9090
storage: Arc<dyn ObjectStorage + Send>,
91-
) -> Result<Vec<RecordBatch>, ExecuteError> {
91+
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
9292
let dir = StorageDir::new(&self.stream_name);
9393
// take a look at local dir and figure out what local cache we could use for this query
9494
let staging_arrows = dir
@@ -128,8 +128,19 @@ impl Query {
128128
.map_err(ObjectStorageError::DataFusionError)?;
129129
// execute the query and collect results
130130
let df = ctx.sql(self.query.as_str()).await?;
131+
// dataframe qualifies name by adding table name before columns. \
132+
// For now this is just actual names
133+
let fields = df
134+
.schema()
135+
.fields()
136+
.iter()
137+
.map(|f| f.name())
138+
.cloned()
139+
.collect_vec();
140+
131141
let results = df.collect().await?;
132-
Ok(results)
142+
143+
Ok((results, fields))
133144
}
134145
}
135146

server/src/response.rs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,44 @@
1717
*/
1818

1919
use actix_web::http::StatusCode;
20-
use actix_web::{HttpResponse, HttpResponseBuilder};
21-
use datafusion::arrow::json;
20+
use actix_web::{web, Responder};
21+
use datafusion::arrow::json::writer::record_batches_to_json_rows;
2222
use datafusion::arrow::record_batch::RecordBatch;
23+
use itertools::Itertools;
24+
use serde_json::Value;
2325

2426
pub struct QueryResponse {
2527
pub code: StatusCode,
26-
pub body: Vec<RecordBatch>,
28+
pub records: Vec<RecordBatch>,
29+
pub fields: Vec<String>,
30+
pub fill_null: bool,
2731
}
2832

2933
impl QueryResponse {
30-
pub fn to_http(&self) -> HttpResponse {
31-
log::info!("{}", "Returning query results");
32-
let buf = Vec::new();
33-
let mut writer = json::ArrayWriter::new(buf);
34-
writer.write_batches(&self.body).unwrap();
35-
writer.finish().unwrap();
36-
37-
HttpResponseBuilder::new(self.code)
38-
.content_type("json")
39-
.body(writer.into_inner())
40-
}
41-
}
42-
43-
impl From<Vec<RecordBatch>> for QueryResponse {
44-
fn from(body: Vec<RecordBatch>) -> Self {
34+
pub fn new(records: Vec<RecordBatch>, fields: Vec<String>, fill_null: bool) -> Self {
4535
Self {
4636
code: StatusCode::OK,
47-
body,
37+
records,
38+
fields,
39+
fill_null,
4840
}
4941
}
42+
43+
pub fn to_http(&self) -> impl Responder {
44+
log::info!("{}", "Returning query results");
45+
let mut json_records = record_batches_to_json_rows(&self.records).unwrap();
46+
47+
if self.fill_null {
48+
for map in &mut json_records {
49+
for field in &self.fields {
50+
if !map.contains_key(field) {
51+
map.insert(field.clone(), Value::Null);
52+
}
53+
}
54+
}
55+
}
56+
57+
let values = json_records.into_iter().map(Value::Object).collect_vec();
58+
web::Json(values)
59+
}
5060
}

0 commit comments

Comments
 (0)