Skip to content

Commit d6043b3

Browse files
committed
rewrite all load api calls to return RecordBatch
1 parent 5af0e7a commit d6043b3

File tree

5 files changed

+188
-95
lines changed

5 files changed

+188
-95
lines changed

packages/cubejs-backend-native/src/orchestrator.rs

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use crate::node_obj_deserializer::JsValueDeserializer;
22
use crate::transport::MapCubeErrExt;
33
use cubeorchestrator::query_message_parser::QueryResult;
44
use cubeorchestrator::query_result_transform::{
5-
RequestResultData, RequestResultDataMulti, TransformedData,
5+
DBResponsePrimitive, RequestResultData, RequestResultDataMulti, TransformedData,
66
};
77
use cubeorchestrator::transport::{JsRawData, TransformDataRequest};
8+
use cubesql::compile::engine::df::scan::{FieldValue, ValueObject};
89
use cubesql::CubeError;
910
use neon::context::{Context, FunctionContext, ModuleContext};
1011
use neon::handle::Handle;
@@ -15,6 +16,7 @@ use neon::prelude::{
1516
};
1617
use neon::types::buffer::TypedArray;
1718
use serde::Deserialize;
19+
use std::borrow::Cow;
1820
use std::sync::Arc;
1921

2022
pub fn register_module(cx: &mut ModuleContext) -> NeonResult<()> {
@@ -39,6 +41,7 @@ pub fn register_module(cx: &mut ModuleContext) -> NeonResult<()> {
3941
pub struct ResultWrapper {
4042
transform_data: TransformDataRequest,
4143
data: Arc<QueryResult>,
44+
transformed_data: Option<TransformedData>,
4245
}
4346

4447
impl ResultWrapper {
@@ -115,14 +118,80 @@ impl ResultWrapper {
115118
Ok(Self {
116119
transform_data: transform_request,
117120
data: query_result,
121+
transformed_data: None,
118122
})
119123
}
120124

121-
pub fn transform_result(&self) -> Result<TransformedData, CubeError> {
122-
let transformed = TransformedData::transform(&self.transform_data, &self.data)
123-
.map_cube_err("Can't prepare transformed data")?;
125+
pub fn transform_result(&mut self) -> Result<(), CubeError> {
126+
self.transformed_data = Some(
127+
TransformedData::transform(&self.transform_data, &self.data)
128+
.map_cube_err("Can't prepare transformed data")?,
129+
);
124130

125-
Ok(transformed)
131+
Ok(())
132+
}
133+
}
134+
135+
impl ValueObject for ResultWrapper {
136+
fn len(&mut self) -> Result<usize, CubeError> {
137+
if self.transformed_data.is_none() {
138+
self.transform_result()?;
139+
}
140+
141+
let data = self.transformed_data.as_ref().unwrap();
142+
143+
match data {
144+
TransformedData::Compact {
145+
members: _members,
146+
dataset,
147+
} => Ok(dataset.len()),
148+
TransformedData::Vanilla(dataset) => Ok(dataset.len()),
149+
}
150+
}
151+
152+
fn get(&mut self, index: usize, field_name: &str) -> Result<FieldValue, CubeError> {
153+
if self.transformed_data.is_none() {
154+
self.transform_result()?;
155+
}
156+
157+
let data = self.transformed_data.as_ref().unwrap();
158+
159+
let value = match data {
160+
TransformedData::Compact { members, dataset } => {
161+
let Some(row) = dataset.get(index) else {
162+
return Err(CubeError::user(format!(
163+
"Unexpected response from Cube, can't get {} row",
164+
index
165+
)));
166+
};
167+
168+
let Some(member_index) = members.iter().position(|m| m == field_name) else {
169+
return Err(CubeError::user(format!(
170+
"Field name '{}' not found in members",
171+
field_name
172+
)));
173+
};
174+
175+
row.get(member_index).unwrap_or(&DBResponsePrimitive::Null)
176+
}
177+
TransformedData::Vanilla(dataset) => {
178+
let Some(row) = dataset.get(index) else {
179+
return Err(CubeError::user(format!(
180+
"Unexpected response from Cube, can't get {} row",
181+
index
182+
)));
183+
};
184+
185+
row.get(field_name).unwrap_or(&DBResponsePrimitive::Null)
186+
}
187+
};
188+
189+
Ok(match value {
190+
DBResponsePrimitive::String(s) => FieldValue::String(Cow::Borrowed(s)),
191+
DBResponsePrimitive::Number(n) => FieldValue::Number(*n),
192+
DBResponsePrimitive::Boolean(b) => FieldValue::Bool(*b),
193+
DBResponsePrimitive::Null => FieldValue::Null,
194+
})
126195
}
127196
}
128197

packages/cubejs-backend-native/src/transport.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use crate::{
1212
stream::call_js_with_stream_as_callback,
1313
};
1414
use async_trait::async_trait;
15-
use cubesql::compile::engine::df::scan::{MemberField, SchemaRef};
15+
use cubesql::compile::engine::df::scan::{
16+
convert_transport_response, transform_response, MemberField, RecordBatch, SchemaRef,
17+
};
1618
use cubesql::compile::engine::df::wrapper::SqlQuery;
1719
use cubesql::transport::{
1820
SpanId, SqlGenerator, SqlResponse, TransportLoadRequestQuery, TransportLoadResponse,
@@ -334,9 +336,9 @@ impl TransportService for NodeBridgeTransport {
334336
sql_query: Option<SqlQuery>,
335337
ctx: AuthContextRef,
336338
meta: LoadRequestMeta,
337-
_schema: SchemaRef,
338-
// ) -> Result<Vec<RecordBatch>, CubeError> {
339-
) -> Result<TransportLoadResponse, CubeError> {
339+
schema: SchemaRef,
340+
member_fields: Vec<MemberField>,
341+
) -> Result<Vec<RecordBatch>, CubeError> {
340342
trace!("[transport] Request ->");
341343

342344
let native_auth = ctx
@@ -461,20 +463,23 @@ impl TransportService for NodeBridgeTransport {
461463
}
462464
};
463465

464-
break serde_json::from_value::<TransportLoadResponse>(response)
466+
let response = match serde_json::from_value::<TransportLoadResponse>(response) {
467+
Ok(v) => v,
468+
Err(err) => {
469+
return Err(CubeError::user(err.to_string()));
470+
}
471+
};
472+
473+
break convert_transport_response(response, schema.clone(), member_fields)
465474
.map_err(|err| CubeError::user(err.to_string()));
466475
}
467476
ValueFromJs::ResultWrapper(result_wrappers) => {
468-
let response = TransportLoadResponse {
469-
pivot_query: None,
470-
slow_query: None,
471-
query_type: None,
472-
results: result_wrappers
473-
.into_iter()
474-
.map(|v| v.transform_result().unwrap().into())
475-
.collect(),
476-
};
477-
break Ok(response);
477+
break result_wrappers
478+
.into_iter()
479+
.map(|mut wrapper| {
480+
transform_response(&mut wrapper, schema.clone(), &member_fields)
481+
})
482+
.collect::<Result<Vec<_>, _>>();
478483
}
479484
}
480485
}

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 71 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use async_trait::async_trait;
2-
use cubeclient::models::{V1LoadRequestQuery, V1LoadResult, V1LoadResultAnnotation};
2+
use cubeclient::models::{V1LoadRequestQuery, V1LoadResponse};
33
pub use datafusion::{
44
arrow::{
55
array::{
@@ -52,7 +52,7 @@ use datafusion::{
5252
logical_plan::JoinType,
5353
scalar::ScalarValue,
5454
};
55-
use serde_json::{json, Value};
55+
use serde_json::Value;
5656

5757
#[derive(Debug, Clone, Eq, PartialEq)]
5858
pub enum MemberField {
@@ -655,28 +655,22 @@ impl ExecutionPlan for CubeScanExecutionPlan {
655655
)));
656656
}
657657

658-
let mut response = JsonValueObject::new(
659-
load_data(
660-
self.span_id.clone(),
661-
request,
662-
self.auth_context.clone(),
663-
self.transport.clone(),
664-
meta.clone(),
665-
self.schema.clone(),
666-
self.options.clone(),
667-
self.wrapped_sql.clone(),
668-
)
669-
.await?
670-
.data,
671-
);
672-
one_shot_stream.data = Some(
673-
transform_response(
674-
&mut response,
675-
one_shot_stream.schema.clone(),
676-
&one_shot_stream.member_fields,
677-
)
678-
.map_err(|e| DataFusionError::Execution(e.message.to_string()))?,
679-
);
658+
let response = load_data(
659+
self.span_id.clone(),
660+
request,
661+
self.auth_context.clone(),
662+
self.transport.clone(),
663+
meta.clone(),
664+
self.schema.clone(),
665+
self.member_fields.clone(),
666+
self.options.clone(),
667+
self.wrapped_sql.clone(),
668+
)
669+
.await?;
670+
671+
// For now execute method executes only one query at a time, so we
672+
// take the first result
673+
one_shot_stream.data = Some(response.first().unwrap().clone());
680674

681675
Ok(Box::pin(CubeScanStreamRouter::new(
682676
None,
@@ -842,9 +836,10 @@ async fn load_data(
842836
transport: Arc<dyn TransportService>,
843837
meta: LoadRequestMeta,
844838
schema: SchemaRef,
839+
member_fields: Vec<MemberField>,
845840
options: CubeScanOptions,
846841
sql_query: Option<SqlQuery>,
847-
) -> ArrowResult<V1LoadResult> {
842+
) -> ArrowResult<Vec<RecordBatch>> {
848843
let no_members_query = request.measures.as_ref().map(|v| v.len()).unwrap_or(0) == 0
849844
&& request.dimensions.as_ref().map(|v| v.len()).unwrap_or(0) == 0
850845
&& request
@@ -862,22 +857,27 @@ async fn load_data(
862857
data.push(serde_json::Value::Null)
863858
}
864859

865-
V1LoadResult::new(
866-
V1LoadResultAnnotation {
867-
measures: json!(Vec::<serde_json::Value>::new()),
868-
dimensions: json!(Vec::<serde_json::Value>::new()),
869-
segments: json!(Vec::<serde_json::Value>::new()),
870-
time_dimensions: json!(Vec::<serde_json::Value>::new()),
871-
},
872-
data,
873-
)
860+
let mut response = JsonValueObject::new(data);
861+
let rec = transform_response(&mut response, schema.clone(), &member_fields)
862+
.map_err(|e| DataFusionError::Execution(e.message.to_string()))?;
863+
864+
rec
874865
} else {
875866
let result = transport
876-
.load(span_id, request, sql_query, auth_context, meta, schema)
877-
.await;
878-
let mut response = result.map_err(|err| ArrowError::ComputeError(err.to_string()))?;
879-
if let Some(data) = response.results.pop() {
880-
match (options.max_records, data.data.len()) {
867+
.load(
868+
span_id,
869+
request,
870+
sql_query,
871+
auth_context,
872+
meta,
873+
schema,
874+
member_fields,
875+
)
876+
.await
877+
.map_err(|err| ArrowError::ComputeError(err.to_string()))?;
878+
let response = result.first();
879+
if let Some(data) = response.cloned() {
880+
match (options.max_records, data.num_rows()) {
881881
(Some(max_records), len) if len >= max_records => {
882882
return Err(ArrowError::ComputeError(format!("One of the Cube queries exceeded the maximum row limit ({}). JOIN/UNION is not possible as it will produce incorrect results. Try filtering the results more precisely or moving post-processing functions to an outer query.", max_records)));
883883
}
@@ -892,7 +892,7 @@ async fn load_data(
892892
}
893893
};
894894

895-
Ok(result)
895+
Ok(vec![result])
896896
}
897897

898898
fn load_to_stream_sync(one_shot_stream: &mut CubeScanOneShotStream) -> Result<()> {
@@ -902,6 +902,7 @@ fn load_to_stream_sync(one_shot_stream: &mut CubeScanOneShotStream) -> Result<()
902902
let transport = one_shot_stream.transport.clone();
903903
let meta = one_shot_stream.meta.clone();
904904
let schema = one_shot_stream.schema.clone();
905+
let member_fields = one_shot_stream.member_fields.clone();
905906
let options = one_shot_stream.options.clone();
906907
let wrapped_sql = one_shot_stream.wrapped_sql.clone();
907908

@@ -914,22 +915,16 @@ fn load_to_stream_sync(one_shot_stream: &mut CubeScanOneShotStream) -> Result<()
914915
transport,
915916
meta,
916917
schema,
918+
member_fields,
917919
options,
918920
wrapped_sql,
919921
))
920922
})
921923
.join()
922-
.map_err(|_| DataFusionError::Execution(format!("Can't load to stream")))?;
923-
924-
let mut response = JsonValueObject::new(res.unwrap().data);
925-
one_shot_stream.data = Some(
926-
transform_response(
927-
&mut response,
928-
one_shot_stream.schema.clone(),
929-
&one_shot_stream.member_fields,
930-
)
931-
.map_err(|e| DataFusionError::Execution(e.message.to_string()))?,
932-
);
924+
.map_err(|_| DataFusionError::Execution(format!("Can't load to stream")))??;
925+
926+
let response = res.first();
927+
one_shot_stream.data = Some(response.cloned().unwrap());
933928

934929
Ok(())
935930
}
@@ -1335,6 +1330,21 @@ pub fn transform_response<V: ValueObject>(
13351330
Ok(RecordBatch::try_new(schema.clone(), columns)?)
13361331
}
13371332

1333+
pub fn convert_transport_response(
1334+
response: V1LoadResponse,
1335+
schema: SchemaRef,
1336+
member_fields: Vec<MemberField>,
1337+
) -> std::result::Result<Vec<RecordBatch>, CubeError> {
1338+
response
1339+
.results
1340+
.into_iter()
1341+
.map(|r| {
1342+
let mut response = JsonValueObject::new(r.data.clone());
1343+
transform_response(&mut response, schema.clone(), &member_fields)
1344+
})
1345+
.collect::<std::result::Result<Vec<RecordBatch>, CubeError>>()
1346+
}
1347+
13381348
#[cfg(test)]
13391349
mod tests {
13401350
use super::*;
@@ -1398,10 +1408,12 @@ mod tests {
13981408
_sql_query: Option<SqlQuery>,
13991409
_ctx: AuthContextRef,
14001410
_meta_fields: LoadRequestMeta,
1401-
_schema: SchemaRef,
1402-
) -> Result<V1LoadResponse, CubeError> {
1411+
schema: SchemaRef,
1412+
member_fields: Vec<MemberField>,
1413+
) -> Result<Vec<RecordBatch>, CubeError> {
14031414
let response = r#"
1404-
{
1415+
{
1416+
"results": [{
14051417
"annotation": {
14061418
"measures": [],
14071419
"dimensions": [],
@@ -1415,17 +1427,13 @@ mod tests {
14151427
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderDate": "9999-12-31 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 4"},
14161428
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": null}
14171429
]
1418-
}
1430+
}]
1431+
}
14191432
"#;
14201433

1421-
let result: V1LoadResult = serde_json::from_str(response).unwrap();
1422-
1423-
Ok(V1LoadResponse {
1424-
pivot_query: None,
1425-
slow_query: None,
1426-
query_type: None,
1427-
results: vec![result],
1428-
})
1434+
let result: V1LoadResponse = serde_json::from_str(response).unwrap();
1435+
convert_transport_response(result, schema.clone(), member_fields)
1436+
.map_err(|err| CubeError::user(err.to_string()))
14291437
}
14301438

14311439
async fn load_stream(

0 commit comments

Comments
 (0)