Skip to content

Commit 4d4a979

Browse files
author
Devdutt Shenoi
committed
refactor + test: query response serialization
1 parent a71ce87 commit 4d4a979

File tree

3 files changed

+189
-56
lines changed

3 files changed

+189
-56
lines changed

src/handlers/http/query.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use actix_web::http::header::ContentType;
2020
use actix_web::web::{self, Json};
2121
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
22+
use arrow_schema::ArrowError;
2223
use chrono::{DateTime, Utc};
2324
use datafusion::common::tree_node::TreeNode;
2425
use datafusion::error::DataFusionError;
@@ -308,22 +309,22 @@ pub enum QueryError {
308309
EventError(#[from] EventError),
309310
#[error("Error: {0}")]
310311
MalformedQuery(&'static str),
311-
#[allow(unused)]
312-
#[error(
313-
r#"Error: Failed to Parse Record Batch into Json
314-
Description: {0}"#
315-
)]
316-
JsonParse(String),
317312
#[error("Error: {0}")]
318313
ActixError(#[from] actix_web::Error),
319314
#[error("Error: {0}")]
320315
Anyhow(#[from] anyhow::Error),
316+
#[error("Arrow Error: {0}")]
317+
Arrow(#[from] ArrowError),
318+
#[error("Error: Failed to Parse Record Batch into Json: {0}")]
319+
Json(#[from] serde_json::Error),
321320
}
322321

323322
impl actix_web::ResponseError for QueryError {
324323
fn status_code(&self) -> http::StatusCode {
325324
match self {
326-
QueryError::Execute(_) | QueryError::JsonParse(_) => StatusCode::INTERNAL_SERVER_ERROR,
325+
QueryError::Execute(_) | QueryError::Json(_) | QueryError::Arrow(_) => {
326+
StatusCode::INTERNAL_SERVER_ERROR
327+
}
327328
_ => StatusCode::BAD_REQUEST,
328329
}
329330
}

src/response.rs

Lines changed: 181 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
*
1717
*/
1818

19-
use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
19+
use crate::handlers::http::query::QueryError;
2020
use actix_web::HttpResponse;
2121
use datafusion::arrow::record_batch::RecordBatch;
22-
use itertools::Itertools;
23-
use serde_json::{json, Value};
22+
use serde_json::{json, Map, Value};
2423
use tracing::info;
2524

2625
pub struct QueryResponse {
@@ -31,31 +30,198 @@ pub struct QueryResponse {
3130
}
3231

3332
impl QueryResponse {
33+
/// TODO: maybe this can be futher optimized by directly converting `arrow` to `serde_json` instead of serializing to bytes
3434
pub fn to_http(&self) -> Result<HttpResponse, QueryError> {
35-
info!("{}", "Returning query results");
35+
info!("Returning query results");
36+
let response = self.to_json()?;
37+
38+
Ok(HttpResponse::Ok().json(response))
39+
}
40+
41+
fn to_json(&self) -> Result<Value, QueryError> {
42+
let buf = vec![];
43+
let mut writer = arrow_json::ArrayWriter::new(buf);
3644
let records: Vec<&RecordBatch> = self.records.iter().collect();
37-
let mut json_records = record_batches_to_json(&records)?;
45+
writer.write_batches(&records)?;
46+
writer.finish()?;
47+
48+
let mut json: Vec<Map<String, Value>> = serde_json::from_slice(&writer.into_inner())?;
3849

3950
if self.fill_null {
40-
for map in &mut json_records {
41-
for field in &self.fields {
42-
if !map.contains_key(field) {
43-
map.insert(field.clone(), Value::Null);
44-
}
51+
for object in json.iter_mut() {
52+
for field in self.fields.iter() {
53+
object.entry(field).or_insert(Value::Null);
4554
}
4655
}
4756
}
48-
let values = json_records.into_iter().map(Value::Object).collect_vec();
4957

50-
let response = if self.with_fields {
58+
let json = if self.with_fields {
5159
json!({
5260
"fields": self.fields,
53-
"records": values
61+
"records": json
5462
})
5563
} else {
56-
Value::Array(values)
64+
json!(json)
5765
};
5866

59-
Ok(HttpResponse::Ok().json(response))
67+
Ok(json)
68+
}
69+
}
70+
71+
#[cfg(test)]
72+
mod tests {
73+
use std::sync::Arc;
74+
75+
use arrow_array::{Array, Float64Array, Int64Array, RecordBatch, StringArray};
76+
use arrow_schema::Schema;
77+
use serde_json::{json, Value};
78+
79+
use crate::response::QueryResponse;
80+
81+
#[test]
82+
fn check_empty_record_batches_to_json() {
83+
let response = QueryResponse {
84+
records: vec![RecordBatch::new_empty(Arc::new(Schema::empty()))],
85+
fields: vec![],
86+
fill_null: false,
87+
with_fields: false,
88+
};
89+
90+
assert_eq!(response.to_json().unwrap(), Value::Array(vec![]));
91+
}
92+
93+
#[test]
94+
fn check_record_batches_to_json() {
95+
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..3));
96+
let array2: Arc<dyn Array> = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64)));
97+
let array3: Arc<dyn Array> = Arc::new(StringArray::from_iter(
98+
(0..3).map(|x| Some(format!("str {x}"))),
99+
));
100+
101+
let record = RecordBatch::try_from_iter_with_nullable([
102+
("a", array1, true),
103+
("b", array2, true),
104+
("c", array3, true),
105+
])
106+
.unwrap();
107+
let response = QueryResponse {
108+
records: vec![record],
109+
fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()],
110+
fill_null: false,
111+
with_fields: false,
112+
};
113+
114+
assert_eq!(
115+
response.to_json().unwrap(),
116+
json!([
117+
{"a": 0, "b": 0.0, "c": "str 0"},
118+
{"a": 1, "b": 1.0, "c": "str 1"},
119+
{"a": 2, "b": 2.0, "c": "str 2"}
120+
])
121+
);
122+
}
123+
124+
#[test]
125+
fn check_record_batches_to_json_with_fields() {
126+
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..3));
127+
let array2: Arc<dyn Array> = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64)));
128+
let array3: Arc<dyn Array> = Arc::new(StringArray::from_iter(
129+
(0..3).map(|x| Some(format!("str {x}"))),
130+
));
131+
132+
let record = RecordBatch::try_from_iter_with_nullable([
133+
("a", array1, true),
134+
("b", array2, true),
135+
("c", array3, true),
136+
])
137+
.unwrap();
138+
let response = QueryResponse {
139+
records: vec![record],
140+
fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()],
141+
fill_null: false,
142+
with_fields: true,
143+
};
144+
145+
assert_eq!(
146+
response.to_json().unwrap(),
147+
json!({
148+
"fields": ["a", "b", "c"],
149+
"records": [
150+
{"a": 0, "b": 0.0, "c": "str 0"},
151+
{"a": 1, "b": 1.0, "c": "str 1"},
152+
{"a": 2, "b": 2.0, "c": "str 2"}
153+
]
154+
})
155+
);
156+
}
157+
158+
#[test]
159+
fn check_record_batches_to_json_without_nulls() {
160+
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..3));
161+
let array2: Arc<dyn Array> = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64)));
162+
let array3: Arc<dyn Array> = Arc::new(StringArray::from_iter((0..3).map(|x| {
163+
if x == 1 {
164+
Some(format!("str {x}"))
165+
} else {
166+
None
167+
}
168+
})));
169+
170+
let record = RecordBatch::try_from_iter_with_nullable([
171+
("a", array1, true),
172+
("b", array2, true),
173+
("c", array3, true),
174+
])
175+
.unwrap();
176+
let response = QueryResponse {
177+
records: vec![record],
178+
fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()],
179+
fill_null: false,
180+
with_fields: false,
181+
};
182+
183+
assert_eq!(
184+
response.to_json().unwrap(),
185+
json!([
186+
{"a": 0, "b": 0.0},
187+
{"a": 1, "b": 1.0, "c": "str 1"},
188+
{"a": 2, "b": 2.0}
189+
])
190+
);
191+
}
192+
193+
#[test]
194+
fn check_record_batches_to_json_with_nulls() {
195+
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..3));
196+
let array2: Arc<dyn Array> = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64)));
197+
let array3: Arc<dyn Array> = Arc::new(StringArray::from_iter((0..3).map(|x| {
198+
if x == 1 {
199+
Some(format!("str {x}"))
200+
} else {
201+
None
202+
}
203+
})));
204+
205+
let record = RecordBatch::try_from_iter_with_nullable([
206+
("a", array1, true),
207+
("b", array2, true),
208+
("c", array3, true),
209+
])
210+
.unwrap();
211+
let response = QueryResponse {
212+
records: vec![record],
213+
fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()],
214+
fill_null: true,
215+
with_fields: false,
216+
};
217+
218+
assert_eq!(
219+
response.to_json().unwrap(),
220+
json!([
221+
{"a": 0, "b": 0.0, "c": null},
222+
{"a": 1, "b": 1.0, "c": "str 1"},
223+
{"a": 2, "b": 2.0, "c": null}
224+
])
225+
);
60226
}
61227
}

src/utils/arrow/mod.rs

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ use itertools::Itertools;
5151
pub mod batch_adapter;
5252
pub mod flight;
5353

54-
use anyhow::Result;
5554
pub use batch_adapter::adapt_batch;
56-
use serde_json::{Map, Value};
5755

5856
/// Replaces columns in a record batch with new arrays.
5957
///
@@ -80,30 +78,6 @@ pub fn replace_columns(
8078
RecordBatch::try_new(schema, batch_arrays).unwrap()
8179
}
8280

83-
/// Converts a slice of record batches to JSON.
84-
///
85-
/// # Arguments
86-
///
87-
/// * `records` - The record batches to convert.
88-
///
89-
/// # Returns
90-
/// * Result<Vec<Map<String, Value>>>
91-
///
92-
/// A vector of JSON objects representing the record batches.
93-
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result<Vec<Map<String, Value>>> {
94-
let buf = vec![];
95-
let mut writer = arrow_json::ArrayWriter::new(buf);
96-
writer.write_batches(records)?;
97-
writer.finish()?;
98-
99-
let buf = writer.into_inner();
100-
101-
let json_rows: Vec<Map<String, Value>> =
102-
serde_json::from_reader(buf.as_slice()).unwrap_or_default();
103-
104-
Ok(json_rows)
105-
}
106-
10781
/// Retrieves a field from a slice of fields by name.
10882
///
10983
/// # Arguments
@@ -185,14 +159,6 @@ mod tests {
185159
assert_eq!(new_rb.num_rows(), 3)
186160
}
187161

188-
#[test]
189-
fn check_empty_json_to_record_batches() {
190-
let r = RecordBatch::new_empty(Arc::new(Schema::empty()));
191-
let rb = vec![&r];
192-
let batches = record_batches_to_json(&rb).unwrap();
193-
assert_eq!(batches, vec![]);
194-
}
195-
196162
#[test]
197163
fn test_timestamp_array_has_correct_size_and_value() {
198164
let size = 5;

0 commit comments

Comments
 (0)