Skip to content

Commit ef94071

Browse files
committed
rewrite all load api calls to return RecordBatch
1 parent d7d8f2d commit ef94071

File tree

5 files changed

+188
-95
lines changed

5 files changed

+188
-95
lines changed

packages/cubejs-backend-native/src/orchestrator.rs

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ use crate::node_obj_deserializer::JsValueDeserializer;
22
use crate::transport::MapCubeErrExt;
33
use cubeorchestrator::query_message_parser::QueryResult;
44
use cubeorchestrator::query_result_transform::{
5-
RequestResultData, RequestResultDataMulti, TransformedData,
5+
DBResponsePrimitive, RequestResultData, RequestResultDataMulti, TransformedData,
66
};
77
use cubeorchestrator::transport::{JsRawData, TransformDataRequest};
8+
use cubesql::compile::engine::df::scan::{FieldValue, ValueObject};
89
use cubesql::CubeError;
910
use neon::context::{Context, FunctionContext, ModuleContext};
1011
use neon::handle::Handle;
@@ -15,6 +16,7 @@ use neon::prelude::{
1516
};
1617
use neon::types::buffer::TypedArray;
1718
use serde::Deserialize;
19+
use std::borrow::Cow;
1820
use std::sync::Arc;
1921

2022
pub fn register_module(cx: &mut ModuleContext) -> NeonResult<()> {
@@ -39,6 +41,7 @@ pub fn register_module(cx: &mut ModuleContext) -> NeonResult<()> {
3941
pub struct ResultWrapper {
4042
transform_data: TransformDataRequest,
4143
data: Arc<QueryResult>,
44+
transformed_data: Option<TransformedData>,
4245
}
4346

4447
impl ResultWrapper {
@@ -115,14 +118,80 @@ impl ResultWrapper {
115118
Ok(Self {
116119
transform_data: transform_request,
117120
data: query_result,
121+
transformed_data: None,
118122
})
119123
}
120124

121-
pub fn transform_result(&self) -> Result<TransformedData, CubeError> {
122-
let transformed = TransformedData::transform(&self.transform_data, &self.data)
123-
.map_cube_err("Can't prepare transformed data")?;
125+
pub fn transform_result(&mut self) -> Result<(), CubeError> {
126+
self.transformed_data = Some(
127+
TransformedData::transform(&self.transform_data, &self.data)
128+
.map_cube_err("Can't prepare transformed data")?,
129+
);
124130

125-
Ok(transformed)
131+
Ok(())
132+
}
133+
}
134+
135+
impl ValueObject for ResultWrapper {
136+
fn len(&mut self) -> Result<usize, CubeError> {
137+
if self.transformed_data.is_none() {
138+
self.transform_result()?;
139+
}
140+
141+
let data = self.transformed_data.as_ref().unwrap();
142+
143+
match data {
144+
TransformedData::Compact {
145+
members: _members,
146+
dataset,
147+
} => Ok(dataset.len()),
148+
TransformedData::Vanilla(dataset) => Ok(dataset.len()),
149+
}
150+
}
151+
152+
fn get(&mut self, index: usize, field_name: &str) -> Result<FieldValue, CubeError> {
153+
if self.transformed_data.is_none() {
154+
self.transform_result()?;
155+
}
156+
157+
let data = self.transformed_data.as_ref().unwrap();
158+
159+
let value = match data {
160+
TransformedData::Compact { members, dataset } => {
161+
let Some(row) = dataset.get(index) else {
162+
return Err(CubeError::user(format!(
163+
"Unexpected response from Cube, can't get {} row",
164+
index
165+
)));
166+
};
167+
168+
let Some(member_index) = members.iter().position(|m| m == field_name) else {
169+
return Err(CubeError::user(format!(
170+
"Field name '{}' not found in members",
171+
field_name
172+
)));
173+
};
174+
175+
row.get(member_index).unwrap_or(&DBResponsePrimitive::Null)
176+
}
177+
TransformedData::Vanilla(dataset) => {
178+
let Some(row) = dataset.get(index) else {
179+
return Err(CubeError::user(format!(
180+
"Unexpected response from Cube, can't get {} row",
181+
index
182+
)));
183+
};
184+
185+
row.get(field_name).unwrap_or(&DBResponsePrimitive::Null)
186+
}
187+
};
188+
189+
Ok(match value {
190+
DBResponsePrimitive::String(s) => FieldValue::String(Cow::Borrowed(s)),
191+
DBResponsePrimitive::Number(n) => FieldValue::Number(*n),
192+
DBResponsePrimitive::Boolean(b) => FieldValue::Bool(*b),
193+
DBResponsePrimitive::Null => FieldValue::Null,
194+
})
126195
}
127196
}
128197

packages/cubejs-backend-native/src/transport.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use crate::{
1212
stream::call_js_with_stream_as_callback,
1313
};
1414
use async_trait::async_trait;
15-
use cubesql::compile::engine::df::scan::{MemberField, SchemaRef};
15+
use cubesql::compile::engine::df::scan::{
16+
convert_transport_response, transform_response, MemberField, RecordBatch, SchemaRef,
17+
};
1618
use cubesql::compile::engine::df::wrapper::SqlQuery;
1719
use cubesql::transport::{
1820
SpanId, SqlGenerator, SqlResponse, TransportLoadRequestQuery, TransportLoadResponse,
@@ -334,9 +336,9 @@ impl TransportService for NodeBridgeTransport {
334336
sql_query: Option<SqlQuery>,
335337
ctx: AuthContextRef,
336338
meta: LoadRequestMeta,
337-
_schema: SchemaRef,
338-
// ) -> Result<Vec<RecordBatch>, CubeError> {
339-
) -> Result<TransportLoadResponse, CubeError> {
339+
schema: SchemaRef,
340+
member_fields: Vec<MemberField>,
341+
) -> Result<Vec<RecordBatch>, CubeError> {
340342
trace!("[transport] Request ->");
341343

342344
let native_auth = ctx
@@ -461,20 +463,23 @@ impl TransportService for NodeBridgeTransport {
461463
}
462464
};
463465

464-
break serde_json::from_value::<TransportLoadResponse>(response)
466+
let response = match serde_json::from_value::<TransportLoadResponse>(response) {
467+
Ok(v) => v,
468+
Err(err) => {
469+
return Err(CubeError::user(err.to_string()));
470+
}
471+
};
472+
473+
break convert_transport_response(response, schema.clone(), member_fields)
465474
.map_err(|err| CubeError::user(err.to_string()));
466475
}
467476
ValueFromJs::ResultWrapper(result_wrappers) => {
468-
let response = TransportLoadResponse {
469-
pivot_query: None,
470-
slow_query: None,
471-
query_type: None,
472-
results: result_wrappers
473-
.into_iter()
474-
.map(|v| v.transform_result().unwrap().into())
475-
.collect(),
476-
};
477-
break Ok(response);
477+
break result_wrappers
478+
.into_iter()
479+
.map(|mut wrapper| {
480+
transform_response(&mut wrapper, schema.clone(), &member_fields)
481+
})
482+
.collect::<Result<Vec<_>, _>>();
478483
}
479484
}
480485
}

rust/cubesql/cubesql/src/compile/engine/df/scan.rs

Lines changed: 71 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use async_trait::async_trait;
2-
use cubeclient::models::{V1LoadRequestQuery, V1LoadResult, V1LoadResultAnnotation};
2+
use cubeclient::models::{V1LoadRequestQuery, V1LoadResponse};
33
pub use datafusion::{
44
arrow::{
55
array::{
@@ -52,7 +52,7 @@ use datafusion::{
5252
logical_plan::JoinType,
5353
scalar::ScalarValue,
5454
};
55-
use serde_json::{json, Value};
55+
use serde_json::Value;
5656

5757
#[derive(Debug, Clone, Eq, PartialEq)]
5858
pub enum MemberField {
@@ -658,28 +658,22 @@ impl ExecutionPlan for CubeScanExecutionPlan {
658658
)));
659659
}
660660

661-
let mut response = JsonValueObject::new(
662-
load_data(
663-
self.span_id.clone(),
664-
request,
665-
self.auth_context.clone(),
666-
self.transport.clone(),
667-
meta.clone(),
668-
self.schema.clone(),
669-
self.options.clone(),
670-
self.wrapped_sql.clone(),
671-
)
672-
.await?
673-
.data,
674-
);
675-
one_shot_stream.data = Some(
676-
transform_response(
677-
&mut response,
678-
one_shot_stream.schema.clone(),
679-
&one_shot_stream.member_fields,
680-
)
681-
.map_err(|e| DataFusionError::Execution(e.message.to_string()))?,
682-
);
661+
let response = load_data(
662+
self.span_id.clone(),
663+
request,
664+
self.auth_context.clone(),
665+
self.transport.clone(),
666+
meta.clone(),
667+
self.schema.clone(),
668+
self.member_fields.clone(),
669+
self.options.clone(),
670+
self.wrapped_sql.clone(),
671+
)
672+
.await?;
673+
674+
// For now execute method executes only one query at a time, so we
675+
// take the first result
676+
one_shot_stream.data = Some(response.first().unwrap().clone());
683677

684678
Ok(Box::pin(CubeScanStreamRouter::new(
685679
None,
@@ -849,9 +843,10 @@ async fn load_data(
849843
transport: Arc<dyn TransportService>,
850844
meta: LoadRequestMeta,
851845
schema: SchemaRef,
846+
member_fields: Vec<MemberField>,
852847
options: CubeScanOptions,
853848
sql_query: Option<SqlQuery>,
854-
) -> ArrowResult<V1LoadResult> {
849+
) -> ArrowResult<Vec<RecordBatch>> {
855850
let no_members_query = request.measures.as_ref().map(|v| v.len()).unwrap_or(0) == 0
856851
&& request.dimensions.as_ref().map(|v| v.len()).unwrap_or(0) == 0
857852
&& request
@@ -869,22 +864,27 @@ async fn load_data(
869864
data.push(serde_json::Value::Null)
870865
}
871866

872-
V1LoadResult::new(
873-
V1LoadResultAnnotation {
874-
measures: json!(Vec::<serde_json::Value>::new()),
875-
dimensions: json!(Vec::<serde_json::Value>::new()),
876-
segments: json!(Vec::<serde_json::Value>::new()),
877-
time_dimensions: json!(Vec::<serde_json::Value>::new()),
878-
},
879-
data,
880-
)
867+
let mut response = JsonValueObject::new(data);
868+
let rec = transform_response(&mut response, schema.clone(), &member_fields)
869+
.map_err(|e| DataFusionError::Execution(e.message.to_string()))?;
870+
871+
rec
881872
} else {
882873
let result = transport
883-
.load(span_id, request, sql_query, auth_context, meta, schema)
884-
.await;
885-
let mut response = result.map_err(|err| ArrowError::ComputeError(err.to_string()))?;
886-
if let Some(data) = response.results.pop() {
887-
match (options.max_records, data.data.len()) {
874+
.load(
875+
span_id,
876+
request,
877+
sql_query,
878+
auth_context,
879+
meta,
880+
schema,
881+
member_fields,
882+
)
883+
.await
884+
.map_err(|err| ArrowError::ComputeError(err.to_string()))?;
885+
let response = result.first();
886+
if let Some(data) = response.cloned() {
887+
match (options.max_records, data.num_rows()) {
888888
(Some(max_records), len) if len >= max_records => {
889889
return Err(ArrowError::ComputeError(format!("One of the Cube queries exceeded the maximum row limit ({}). JOIN/UNION is not possible as it will produce incorrect results. Try filtering the results more precisely or moving post-processing functions to an outer query.", max_records)));
890890
}
@@ -899,7 +899,7 @@ async fn load_data(
899899
}
900900
};
901901

902-
Ok(result)
902+
Ok(vec![result])
903903
}
904904

905905
fn load_to_stream_sync(one_shot_stream: &mut CubeScanOneShotStream) -> Result<()> {
@@ -909,6 +909,7 @@ fn load_to_stream_sync(one_shot_stream: &mut CubeScanOneShotStream) -> Result<()
909909
let transport = one_shot_stream.transport.clone();
910910
let meta = one_shot_stream.meta.clone();
911911
let schema = one_shot_stream.schema.clone();
912+
let member_fields = one_shot_stream.member_fields.clone();
912913
let options = one_shot_stream.options.clone();
913914
let wrapped_sql = one_shot_stream.wrapped_sql.clone();
914915

@@ -921,22 +922,16 @@ fn load_to_stream_sync(one_shot_stream: &mut CubeScanOneShotStream) -> Result<()
921922
transport,
922923
meta,
923924
schema,
925+
member_fields,
924926
options,
925927
wrapped_sql,
926928
))
927929
})
928930
.join()
929-
.map_err(|_| DataFusionError::Execution(format!("Can't load to stream")))?;
930-
931-
let mut response = JsonValueObject::new(res.unwrap().data);
932-
one_shot_stream.data = Some(
933-
transform_response(
934-
&mut response,
935-
one_shot_stream.schema.clone(),
936-
&one_shot_stream.member_fields,
937-
)
938-
.map_err(|e| DataFusionError::Execution(e.message.to_string()))?,
939-
);
931+
.map_err(|_| DataFusionError::Execution(format!("Can't load to stream")))??;
932+
933+
let response = res.first();
934+
one_shot_stream.data = Some(response.cloned().unwrap());
940935

941936
Ok(())
942937
}
@@ -1342,6 +1337,21 @@ pub fn transform_response<V: ValueObject>(
13421337
Ok(RecordBatch::try_new(schema.clone(), columns)?)
13431338
}
13441339

1340+
pub fn convert_transport_response(
1341+
response: V1LoadResponse,
1342+
schema: SchemaRef,
1343+
member_fields: Vec<MemberField>,
1344+
) -> std::result::Result<Vec<RecordBatch>, CubeError> {
1345+
response
1346+
.results
1347+
.into_iter()
1348+
.map(|r| {
1349+
let mut response = JsonValueObject::new(r.data.clone());
1350+
transform_response(&mut response, schema.clone(), &member_fields)
1351+
})
1352+
.collect::<std::result::Result<Vec<RecordBatch>, CubeError>>()
1353+
}
1354+
13451355
#[cfg(test)]
13461356
mod tests {
13471357
use super::*;
@@ -1405,10 +1415,12 @@ mod tests {
14051415
_sql_query: Option<SqlQuery>,
14061416
_ctx: AuthContextRef,
14071417
_meta_fields: LoadRequestMeta,
1408-
_schema: SchemaRef,
1409-
) -> Result<V1LoadResponse, CubeError> {
1418+
schema: SchemaRef,
1419+
member_fields: Vec<MemberField>,
1420+
) -> Result<Vec<RecordBatch>, CubeError> {
14101421
let response = r#"
1411-
{
1422+
{
1423+
"results": [{
14121424
"annotation": {
14131425
"measures": [],
14141426
"dimensions": [],
@@ -1422,17 +1434,13 @@ mod tests {
14221434
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "true", "KibanaSampleDataEcommerce.orderDate": "9999-12-31 00:00:00.000", "KibanaSampleDataEcommerce.city": "City 4"},
14231435
{"KibanaSampleDataEcommerce.count": null, "KibanaSampleDataEcommerce.maxPrice": null, "KibanaSampleDataEcommerce.isBool": "false", "KibanaSampleDataEcommerce.orderDate": null, "KibanaSampleDataEcommerce.city": null}
14241436
]
1425-
}
1437+
}]
1438+
}
14261439
"#;
14271440

1428-
let result: V1LoadResult = serde_json::from_str(response).unwrap();
1429-
1430-
Ok(V1LoadResponse {
1431-
pivot_query: None,
1432-
slow_query: None,
1433-
query_type: None,
1434-
results: vec![result],
1435-
})
1441+
let result: V1LoadResponse = serde_json::from_str(response).unwrap();
1442+
convert_transport_response(result, schema.clone(), member_fields)
1443+
.map_err(|err| CubeError::user(err.to_string()))
14361444
}
14371445

14381446
async fn load_stream(

0 commit comments

Comments
 (0)