Skip to content

Commit 5657478

Browse files
committed
pass ResultWrapper to native and offload transformData from the eventloop
1 parent fb54ff7 commit 5657478

File tree

6 files changed

+170
-99
lines changed

6 files changed

+170
-99
lines changed

packages/cubejs-backend-native/js/ResultWrapper.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ class BaseWrapperArray extends BaseWrapper {
186186
return [transformDataJson, rawData, resultDataJson];
187187
}
188188

189+
// Is invoked from the native side to get
190+
// an array of all raw wrapped results
189191
public getResults(): ResultWrapper[] {
190192
return this.results;
191193
}

packages/cubejs-backend-native/js/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ function wrapNativeFunctionWithStream(
267267
} else if (response.error) {
268268
writerOrChannel.reject(errorString(response));
269269
} else if (response.isWrapper) { // Native wrapped result
270-
writerOrChannel.resolve(await response.getFinalResult());
270+
writerOrChannel.resolve(response);
271271
} else {
272272
writerOrChannel.resolve(JSON.stringify(response));
273273
}

packages/cubejs-backend-native/src/channel.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ use std::collections::HashMap;
44
use std::sync::atomic::{AtomicU64, Ordering};
55
use std::sync::Arc;
66

7+
use crate::orchestrator::ResultWrapper;
78
use crate::transport::MapCubeErrExt;
89
use crate::utils::bind_method;
910
use async_trait::async_trait;
10-
use cubeorchestrator::query_result_transform::RequestResultArray;
1111
use cubesql::transport::{SqlGenerator, SqlTemplates};
1212
use cubesql::CubeError;
1313
#[cfg(debug_assertions)]
@@ -197,7 +197,7 @@ where
197197
#[derive(Debug)]
198198
pub enum ValueFromJs {
199199
String(String),
200-
RequestResultArray(RequestResultArray),
200+
ResultWrapper(Vec<ResultWrapper>),
201201
}
202202

203203
#[allow(clippy::type_complexity)]

packages/cubejs-backend-native/src/orchestrator.rs

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
use crate::node_obj_deserializer::JsValueDeserializer;
2+
use crate::transport::MapCubeErrExt;
23
use cubeorchestrator::query_message_parser::QueryResult;
34
use cubeorchestrator::query_result_transform::{
4-
get_final_cubestore_result_array, RequestResultArray, RequestResultData, RequestResultDataMulti,
5+
get_final_cubestore_result_array, RequestResultArray, RequestResultData,
6+
RequestResultDataMulti, TransformedData,
57
};
68
use cubeorchestrator::transport::{JsRawData, TransformDataRequest};
9+
use cubesql::CubeError;
710
use neon::context::{Context, FunctionContext, ModuleContext};
811
use neon::handle::Handle;
912
use neon::object::Object;
1013
use neon::prelude::{
11-
JsArray, JsArrayBuffer, JsBox, JsBuffer, JsObject, JsPromise, JsResult, JsValue, NeonResult,
14+
JsArray, JsArrayBuffer, JsBox, JsBuffer, JsFunction, JsObject, JsPromise, JsResult, JsValue,
15+
NeonResult,
1216
};
1317
use neon::types::buffer::TypedArray;
1418
use serde::Deserialize;
@@ -27,6 +31,108 @@ pub fn register_module(cx: &mut ModuleContext) -> NeonResult<()> {
2731
Ok(())
2832
}
2933

34+
// #[derive(Debug, Clone)]
35+
// pub enum ResultWrapperData {
36+
// JsObj(RecordBatch),
37+
// Native()
38+
// }
39+
40+
#[derive(Debug, Clone)]
41+
pub struct ResultWrapper {
42+
transform_data: TransformDataRequest,
43+
data: Arc<QueryResult>,
44+
}
45+
46+
impl ResultWrapper {
47+
pub fn from_js_result_wrapper(
48+
cx: &mut FunctionContext<'_>,
49+
js_result_wrapper_val: Handle<JsValue>,
50+
) -> Result<Self, CubeError> {
51+
let js_result_wrapper = js_result_wrapper_val
52+
.downcast::<JsObject, _>(cx)
53+
.map_cube_err("Can't downcast JS ResultWrapper to object")?;
54+
55+
let get_transform_data_js_method: Handle<JsFunction> = js_result_wrapper
56+
.get(cx, "getTransformData")
57+
.map_cube_err("Can't get getTransformData() method from JS ResultWrapper object")?;
58+
59+
let transform_data_js_arr = get_transform_data_js_method
60+
.call(cx, js_result_wrapper.upcast::<JsValue>(), [])
61+
.map_cube_err("Error calling getTransformData() method of ResultWrapper object")?
62+
.downcast::<JsArray, _>(cx)
63+
.map_cube_err("Can't downcast JS transformData to array")?
64+
.to_vec(cx)
65+
.map_cube_err("Can't convert JS transformData to array")?;
66+
67+
let transform_data_js = transform_data_js_arr.first().unwrap();
68+
69+
let deserializer = JsValueDeserializer::new(cx, *transform_data_js);
70+
let transform_request: TransformDataRequest = match Deserialize::deserialize(deserializer) {
71+
Ok(data) => data,
72+
Err(_) => {
73+
return Err(CubeError::internal(
74+
"Can't deserialize transformData from JS ResultWrapper object".to_string(),
75+
))
76+
}
77+
};
78+
79+
let get_raw_data_js_method: Handle<JsFunction> = js_result_wrapper
80+
.get(cx, "getRawData")
81+
.map_cube_err("Can't get getRawData() method from JS ResultWrapper object")?;
82+
83+
let raw_data_js_arr = get_raw_data_js_method
84+
.call(cx, js_result_wrapper.upcast::<JsValue>(), [])
85+
.map_cube_err("Error calling getRawData() method of ResultWrapper object")?
86+
.downcast::<JsArray, _>(cx)
87+
.map_cube_err("Can't downcast JS rawData to array")?
88+
.to_vec(cx)
89+
.map_cube_err("Can't convert JS rawData to array")?;
90+
91+
let raw_data_js = raw_data_js_arr.first().unwrap();
92+
93+
let query_result =
94+
if let Ok(js_box) = raw_data_js.downcast::<JsBox<Arc<QueryResult>>, _>(cx) {
95+
Arc::clone(&js_box)
96+
} else if let Ok(js_array) = raw_data_js.downcast::<JsArray, _>(cx) {
97+
let deserializer = JsValueDeserializer::new(cx, js_array.upcast());
98+
let js_raw_data: JsRawData = match Deserialize::deserialize(deserializer) {
99+
Ok(data) => data,
100+
Err(_) => {
101+
return Err(CubeError::internal(
102+
"Can't deserialize results raw data from JS ResultWrapper object".to_string(),
103+
));
104+
}
105+
};
106+
107+
QueryResult::from_js_raw_data(js_raw_data)
108+
.map(Arc::new)
109+
.map_cube_err("Can't build results data from JS rawData")?
110+
} else {
111+
return Err(CubeError::internal(
112+
"Can't deserialize results raw data from JS ResultWrapper object".to_string(),
113+
));
114+
};
115+
116+
Ok(Self {
117+
transform_data: transform_request,
118+
data: query_result,
119+
})
120+
}
121+
122+
pub fn transform_result(&self) -> Result<TransformedData, CubeError> {
123+
let transformed = TransformedData::transform(&self.transform_data, &self.data)
124+
.map_cube_err("Can't prepare transformed data")?;
125+
126+
Ok(transformed)
127+
}
128+
}
129+
130+
pub type JsResultDataVectors = (
131+
Vec<TransformDataRequest>,
132+
Vec<Arc<QueryResult>>,
133+
Vec<RequestResultData>,
134+
);
135+
30136
fn json_to_array_buffer<'a, C>(
31137
mut cx: C,
32138
json_data: Result<String, anyhow::Error>,
@@ -144,12 +250,6 @@ pub fn final_query_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
144250
Ok(promise)
145251
}
146252

147-
pub type JsResultDataVectors = (
148-
Vec<TransformDataRequest>,
149-
Vec<Arc<QueryResult>>,
150-
Vec<RequestResultData>,
151-
);
152-
153253
pub fn convert_final_query_result_array_from_js(
154254
cx: &mut FunctionContext<'_>,
155255
transform_data_array: Handle<JsValue>,

packages/cubejs-backend-native/src/transport.rs

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,12 @@ use std::fmt::Display;
66
use crate::auth::NativeAuthContext;
77
use crate::channel::{call_raw_js_with_channel_as_callback, NodeSqlGenerator, ValueFromJs};
88
use crate::node_obj_serializer::NodeObjSerializer;
9-
use crate::orchestrator::convert_final_query_result_array_from_js;
9+
use crate::orchestrator::ResultWrapper;
1010
use crate::{
1111
auth::TransportRequest, channel::call_js_with_channel_as_callback,
1212
stream::call_js_with_stream_as_callback,
1313
};
1414
use async_trait::async_trait;
15-
use cubeorchestrator::query_result_transform::{
16-
get_final_cubestore_result_array, RequestResultArray,
17-
};
1815
use cubesql::compile::engine::df::scan::{MemberField, SchemaRef};
1916
use cubesql::compile::engine::df::wrapper::SqlQuery;
2017
use cubesql::transport::{
@@ -337,6 +334,7 @@ impl TransportService for NodeBridgeTransport {
337334
sql_query: Option<SqlQuery>,
338335
ctx: AuthContextRef,
339336
meta: LoadRequestMeta,
337+
// ) -> Result<Vec<RecordBatch>, CubeError> {
340338
) -> Result<TransportLoadResponse, CubeError> {
341339
trace!("[transport] Request ->");
342340

@@ -378,52 +376,43 @@ impl TransportService for NodeBridgeTransport {
378376
extra,
379377
Box::new(|cx, v| Ok(cx.string(v).as_value(cx))),
380378
Box::new(move |cx, v| {
381-
// It's too heavy/slow to get instance of ResultArrayWrapper from JS
382-
// and then call/await the .getFinalResult() method which needs be
383-
// executed again on JS side to get the actual needed date,
384-
// instead we pass it directly from JS side.
385-
// In case of wrapped result it's actually a tuple of
386-
// (transformDataJson[], rawData[], resultDataJson[])
387-
if let Ok(result_wrapped) = v.downcast::<JsArray, _>(cx) {
388-
let res_wrapped_vec = result_wrapped.to_vec(cx).map_cube_err("Can't convert JS result to array")?;
389-
390-
if res_wrapped_vec.len() != 3 {
391-
return Err(CubeError::internal("Expected a tuple with 3 elements: transformDataJson[], rawData[], resultDataJson[]".to_string()));
392-
}
379+
if let Ok(js_result_wrapped) = v.downcast::<JsObject, _>(cx) {
380+
let get_results_js_method: Handle<JsFunction> = js_result_wrapped
381+
.get(cx, "getResults")
382+
.map_cube_err(
383+
"Can't get getResults() method from JS ResultWrapper object",
384+
)?;
393385

394-
let transform_data_array = res_wrapped_vec.first().unwrap();
395-
let data_array = res_wrapped_vec.get(1).unwrap()
396-
.downcast_or_throw::<JsArray, _>(cx).map_cube_err("Can't downcast js data to array")?;
397-
let results_data_array = res_wrapped_vec.get(2).unwrap();
398-
399-
match convert_final_query_result_array_from_js(
400-
cx,
401-
*transform_data_array,
402-
data_array,
403-
*results_data_array,
404-
) {
405-
Ok((transform_requests, cube_store_results, mut request_results)) => {
406-
get_final_cubestore_result_array(
407-
&transform_requests,
408-
&cube_store_results,
409-
&mut request_results,
410-
).map_cube_err("Can't build result array")?;
411-
412-
Ok(ValueFromJs::RequestResultArray(RequestResultArray {
413-
results: request_results,
414-
}))
415-
}
416-
Err(err) => {
417-
Err(CubeError::internal(format!("Error converting result data: {:?}", err.to_string())))
418-
}
419-
}
386+
let results = get_results_js_method
387+
.call(cx, js_result_wrapped.upcast::<JsValue>(), [])
388+
.map_cube_err(
389+
"Error calling getResults() method of ResultWrapper object",
390+
)?;
420391

392+
let js_res_wrapped_vec = results
393+
.downcast::<JsArray, _>(cx)
394+
.map_cube_err("Can't downcast JS result to array")?
395+
.to_vec(cx)
396+
.map_cube_err("Can't convert JS result to array")?;
397+
398+
let native_wrapped_results = js_res_wrapped_vec
399+
.iter()
400+
.map(|r| ResultWrapper::from_js_result_wrapper(cx, *r))
401+
.collect::<Result<Vec<_>, _>>()
402+
.map_cube_err(
403+
"Can't construct result wrapper from JS ResultWrapper object",
404+
)?;
405+
406+
Ok(ValueFromJs::ResultWrapper(native_wrapped_results))
421407
} else if let Ok(str) = v.downcast::<JsString, _>(cx) {
422408
Ok(ValueFromJs::String(str.value(cx)))
423409
} else {
424-
Err(CubeError::internal("Can't downcast callback argument to string or resultWrapper object".to_string()))
410+
Err(CubeError::internal(
411+
"Can't downcast callback argument to string or resultWrapper object"
412+
.to_string(),
413+
))
425414
}
426-
})
415+
}),
427416
)
428417
.await;
429418

@@ -475,12 +464,15 @@ impl TransportService for NodeBridgeTransport {
475464
break serde_json::from_value::<TransportLoadResponse>(response)
476465
.map_err(|err| CubeError::user(err.to_string()));
477466
}
478-
ValueFromJs::RequestResultArray(result) => {
467+
ValueFromJs::ResultWrapper(result_wrappers) => {
479468
let response = TransportLoadResponse {
480469
pivot_query: None,
481470
slow_query: None,
482471
query_type: None,
483-
results: result.results.into_iter().map(|v| v.into()).collect(),
472+
results: result_wrappers
473+
.into_iter()
474+
.map(|v| v.transform_result().unwrap().into())
475+
.collect(),
484476
};
485477
break Ok(response);
486478
}

rust/cubesql/cubeclient/src/models/v1_load_result.rs

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
*
88
* Generated by: https://openapi-generator.tech
99
*/
10-
use cubeorchestrator::query_result_transform::{RequestResultData, TransformedData};
10+
use cubeorchestrator::query_result_transform::TransformedData;
1111

1212
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
1313
pub struct V1LoadResult {
@@ -35,55 +35,32 @@ impl V1LoadResult {
3535
}
3636
}
3737

38-
impl From<RequestResultData> for V1LoadResult {
39-
fn from(rrd: RequestResultData) -> Self {
40-
let data = match rrd.data {
41-
None => {
38+
impl From<TransformedData> for V1LoadResult {
39+
fn from(d: TransformedData) -> Self {
40+
let data = match d {
41+
TransformedData::Compact {
42+
members: _members,
43+
dataset: _dataset,
44+
} => {
45+
// TODO V1LoadResult expects only Vanilla results?
4246
vec![]
4347
}
44-
Some(d) => {
45-
match d {
46-
TransformedData::Compact {
47-
members: _members,
48-
dataset: _dataset,
49-
} => {
50-
// TODO V1LoadResult expects only Vanilla results?
51-
vec![]
52-
}
53-
TransformedData::Vanilla(v) => v
54-
.iter()
55-
.map(|item| serde_json::to_value(item).unwrap_or(serde_json::Value::Null))
56-
.collect(),
57-
}
58-
}
48+
TransformedData::Vanilla(v) => v
49+
.iter()
50+
.map(|item| serde_json::to_value(item).unwrap_or(serde_json::Value::Null))
51+
.collect(),
5952
};
6053
let annotation = crate::models::V1LoadResultAnnotation {
61-
measures: rrd
62-
.annotation
63-
.get("measures")
64-
.map(|v| serde_json::to_value(v).unwrap_or(serde_json::Value::Null))
65-
.unwrap_or(serde_json::Value::Null),
66-
dimensions: rrd
67-
.annotation
68-
.get("dimensions")
69-
.map(|v| serde_json::to_value(v).unwrap_or(serde_json::Value::Null))
70-
.unwrap_or(serde_json::Value::Null),
71-
segments: rrd
72-
.annotation
73-
.get("segments")
74-
.map(|v| serde_json::to_value(v).unwrap_or(serde_json::Value::Null))
75-
.unwrap_or(serde_json::Value::Null),
76-
time_dimensions: rrd
77-
.annotation
78-
.get("timeDimensions")
79-
.map(|v| serde_json::to_value(v).unwrap_or(serde_json::Value::Null))
80-
.unwrap_or(serde_json::Value::Null),
54+
measures: serde_json::Value::Null,
55+
dimensions: serde_json::Value::Null,
56+
segments: serde_json::Value::Null,
57+
time_dimensions: serde_json::Value::Null,
8158
};
8259
V1LoadResult {
83-
data_source: rrd.data_source,
60+
data_source: None,
8461
annotation: Box::new(annotation),
8562
data,
86-
refresh_key_values: rrd.refresh_key_values,
63+
refresh_key_values: None,
8764
}
8865
}
8966
}

0 commit comments

Comments
 (0)