Skip to content

Commit 0c89ee8

Browse files
committed
implement custom deserializer from JS Value to rust
and updated the cubestore result transformations with it
1 parent a92fc21 commit 0c89ee8

File tree

6 files changed

+503
-109
lines changed

6 files changed

+503
-109
lines changed

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1629,7 +1629,7 @@ class ApiGateway {
16291629
const dataCb: TransformDataResponseCb = response.data.isNative ?
16301630
async () => {
16311631
const jsonData = await transformDataNative(
1632-
JSON.stringify(transformDataParams), response.data.getNativeRef()
1632+
transformDataParams, response.data.getNativeRef()
16331633
);
16341634
return JSON.parse(jsonData.result) as TransformDataResponse;
16351635
}
@@ -1828,9 +1828,9 @@ class ApiGateway {
18281828
// If all query results are from Cubestore (are native)
18291829
// we prepare the final json result on native side
18301830
if (allNative) {
1831-
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[string[], any[], any[]]>(
1831+
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[Object[], any[], Object[]]>(
18321832
([transformList, rawList, resultList], r) => {
1833-
transformList.push(JSON.stringify(r.transformDataParams));
1833+
transformList.push(r.transformDataParams);
18341834
rawList.push(r.rawData.getNativeRef());
18351835
resultList.push(cleanupResult(r));
18361836
return [transformList, rawList, resultList];
@@ -1843,9 +1843,8 @@ class ApiGateway {
18431843
results: cleanResultList,
18441844
slowQuery
18451845
};
1846-
const resultDataJson = JSON.stringify(responseDataObj);
18471846

1848-
res(await getFinalCubestoreResultMulti(transformDataJson, rawDataRef, resultDataJson));
1847+
res(await getFinalCubestoreResultMulti(transformDataJson, rawDataRef, responseDataObj));
18491848
} else {
18501849
// if we have mixed query results (there are js and native)
18511850
// we prepare results separately: on js and native sides
@@ -1866,10 +1865,8 @@ class ApiGateway {
18661865
} else if (allNative) {
18671866
// We prepare the full final json result on native side
18681867
const r = results[0];
1869-
const transformDataJson = JSON.stringify(r.transformDataParams);
18701868
const rawDataRef = r.rawData.getNativeRef();
1871-
const resultDataJson = JSON.stringify(cleanupResult(r));
1872-
res(await getFinalCubestoreResult(transformDataJson, rawDataRef, resultDataJson));
1869+
res(await getFinalCubestoreResult(r.transformDataParams, rawDataRef, cleanupResult(r)));
18731870
} else {
18741871
const data = await results[0].dataCb();
18751872
res({
@@ -2014,11 +2011,11 @@ class ApiGateway {
20142011
const [transformDataJson, rawDataRef, resultDataJson] = (results as {
20152012
transformDataParams: any;
20162013
rawData: { getNativeRef: () => any };
2017-
}[]).reduce<[string[], any[], string[]]>(
2014+
}[]).reduce<[Object[], any[], Object[]]>(
20182015
([transformList, rawList, resultList], r) => {
2019-
transformList.push(JSON.stringify(r.transformDataParams));
2016+
transformList.push(r.transformDataParams);
20202017
rawList.push(r.rawData.getNativeRef());
2021-
resultList.push(JSON.stringify(cleanupResult(r)));
2018+
resultList.push(cleanupResult(r));
20222019
return [transformList, rawList, resultList];
20232020
},
20242021
[[], [], []]

packages/cubejs-backend-native/js/index.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -383,28 +383,28 @@ export const getCubestoreResult = (ref: CubeStoreResultWrapper): ResultRow[] =>
383383
return native.getCubestoreResult(ref);
384384
};
385385

386-
export const transformData = (transformDataJson: string, rows: any): Promise<TransformDataResponseNative> => {
386+
export const transformData = (transformDataObj: Object, rows: any): Promise<TransformDataResponseNative> => {
387387
const native = loadNative();
388388

389-
return native.transformQueryData(transformDataJson, rows);
389+
return native.transformQueryData(transformDataObj, rows);
390390
};
391391

392-
export const getFinalCubestoreResult = (transformDataJson: string, rows: any, resultData: string): Promise<ArrayBuffer> => {
392+
export const getFinalCubestoreResult = (transformDataObj: Object, rows: any, resultData: Object): Promise<ArrayBuffer> => {
393393
const native = loadNative();
394394

395-
return native.getFinalCubestoreResult(transformDataJson, rows, resultData);
395+
return native.getFinalCubestoreResult(transformDataObj, rows, resultData);
396396
};
397397

398-
export const getFinalCubestoreResultArray = (transformDataJson: string[], rows: any[], resultDataJson: string[]): Promise<ArrayBuffer> => {
398+
export const getFinalCubestoreResultArray = (transformDataArr: Object[], rows: any[], resultDataArr: Object[]): Promise<ArrayBuffer> => {
399399
const native = loadNative();
400400

401-
return native.getFinalCubestoreResultArray(transformDataJson, rows, resultDataJson);
401+
return native.getFinalCubestoreResultArray(transformDataArr, rows, resultDataArr);
402402
};
403403

404-
export const getFinalCubestoreResultMulti = (transformDataJson: string[], rows: any[], responseData: string): Promise<ArrayBuffer> => {
404+
export const getFinalCubestoreResultMulti = (transformDataArr: Object[], rows: any[], responseData: Object): Promise<ArrayBuffer> => {
405405
const native = loadNative();
406406

407-
return native.getFinalCubestoreResultMulti(transformDataJson, rows, responseData);
407+
return native.getFinalCubestoreResultMulti(transformDataArr, rows, responseData);
408408
};
409409

410410
export interface PyConfiguration {

packages/cubejs-backend-native/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod cross;
1111
pub mod gateway;
1212
pub mod logger;
1313
pub mod node_export;
14+
pub mod node_obj_deserializer;
1415
pub mod node_obj_serializer;
1516
#[cfg(feature = "python")]
1617
pub mod python;

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 49 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@ use cubeorchestrator::cubestore_message_parser::CubeStoreResult;
3535

3636
use cubesql::{telemetry::ReportingLogger, CubeError};
3737

38+
use crate::node_obj_deserializer::JsValueDeserializer;
3839
use cubeorchestrator::cubestore_result_transform::{
3940
get_final_cubestore_result_array, RequestResultArray, RequestResultData,
4041
RequestResultDataMulti, TransformedData,
4142
};
4243
use cubeorchestrator::transport::TransformDataRequest;
4344
use neon::prelude::*;
4445
use neon::types::buffer::TypedArray;
46+
use serde::Deserialize;
4547

4648
struct SQLInterface {
4749
services: Arc<NodeCubeServices>,
@@ -553,18 +555,19 @@ fn get_cubestore_result(mut cx: FunctionContext) -> JsResult<JsValue> {
553555
}
554556

555557
fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsPromise> {
556-
let json_str = cx.argument::<JsString>(0)?.value(&mut cx);
558+
let transform_data_js_object = cx.argument::<JsValue>(0)?;
559+
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_js_object);
560+
561+
let request_data: TransformDataRequest = match Deserialize::deserialize(deserializer) {
562+
Ok(data) => data,
563+
Err(err) => return cx.throw_error(err.to_string()),
564+
};
557565

558566
let cube_store_result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(1)?;
559567
let cube_store_result = Arc::clone(&cube_store_result);
560568

561569
let promise = cx
562570
.task(move || {
563-
let request_data = match serde_json::from_str::<TransformDataRequest>(&json_str) {
564-
Ok(data) => data,
565-
Err(err) => return Err(anyhow::Error::from(err)),
566-
};
567-
568571
let transformed = TransformedData::transform(&request_data, &cube_store_result)?;
569572

570573
match serde_json::to_string(&transformed) {
@@ -588,25 +591,25 @@ fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsPromise> {
588591
}
589592

590593
fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
591-
let transform_data_str = cx.argument::<JsString>(0)?.value(&mut cx);
594+
let transform_data_js_object = cx.argument::<JsValue>(0)?;
595+
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_js_object);
596+
let transform_request_data: TransformDataRequest = match Deserialize::deserialize(deserializer)
597+
{
598+
Ok(data) => data,
599+
Err(err) => return cx.throw_error(err.to_string()),
600+
};
601+
592602
let cube_store_result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(1)?;
593603
let cube_store_result = Arc::clone(&cube_store_result);
594-
let result_data_str = cx.argument::<JsString>(2)?.value(&mut cx);
604+
let result_data_js_object = cx.argument::<JsValue>(2)?;
605+
let deserializer = JsValueDeserializer::new(&mut cx, result_data_js_object);
606+
let mut result_data: RequestResultData = match Deserialize::deserialize(deserializer) {
607+
Ok(data) => data,
608+
Err(err) => return cx.throw_error(err.to_string()),
609+
};
595610

596611
let promise = cx
597612
.task(move || {
598-
let transform_request_data =
599-
match serde_json::from_str::<TransformDataRequest>(&transform_data_str) {
600-
Ok(data) => data,
601-
Err(err) => return Err(anyhow::Error::from(err)),
602-
};
603-
604-
let mut result_data = match serde_json::from_str::<RequestResultData>(&result_data_str)
605-
{
606-
Ok(data) => data,
607-
Err(err) => return Err(anyhow::Error::from(err)),
608-
};
609-
610613
result_data.prepare_results(&transform_request_data, &cube_store_result)?;
611614

612615
match serde_json::to_string(&result_data) {
@@ -633,16 +636,13 @@ fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
633636
}
634637

635638
fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
636-
let transform_data_array = cx.argument::<JsArray>(0)?;
637-
let transform_request_strings: Vec<String> = transform_data_array
638-
.to_vec(&mut cx)?
639-
.into_iter()
640-
.map(|js_value| {
641-
js_value
642-
.downcast_or_throw::<JsString, _>(&mut cx)
643-
.map(|js_string| js_string.value(&mut cx))
644-
})
645-
.collect::<Result<_, _>>()?;
639+
let transform_data_array = cx.argument::<JsValue>(0)?;
640+
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
641+
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
642+
{
643+
Ok(data) => data,
644+
Err(err) => return cx.throw_error(err.to_string()),
645+
};
646646

647647
let cube_store_array = cx.argument::<JsArray>(1)?;
648648
let cube_store_results_boxed: Vec<Handle<JsBox<Arc<CubeStoreResult>>>> = cube_store_array
@@ -655,39 +655,15 @@ fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromise>
655655
.map(|handle| (**handle).clone())
656656
.collect();
657657

658-
let results_data_array = cx.argument::<JsArray>(2)?;
659-
let request_result_strings: Vec<String> = results_data_array
660-
.to_vec(&mut cx)?
661-
.into_iter()
662-
.map(|js_value| {
663-
js_value
664-
.downcast_or_throw::<JsString, _>(&mut cx)
665-
.map(|js_string| js_string.value(&mut cx))
666-
})
667-
.collect::<Result<_, _>>()?;
658+
let results_data_array = cx.argument::<JsValue>(2)?;
659+
let deserializer = JsValueDeserializer::new(&mut cx, results_data_array);
660+
let mut request_results: Vec<RequestResultData> = match Deserialize::deserialize(deserializer) {
661+
Ok(data) => data,
662+
Err(err) => return cx.throw_error(err.to_string()),
663+
};
668664

669665
let promise = cx
670666
.task(move || {
671-
let transform_requests: Vec<TransformDataRequest> = transform_request_strings
672-
.into_iter()
673-
.map(
674-
|req_str| match serde_json::from_str::<TransformDataRequest>(&req_str) {
675-
Ok(request) => Ok(request),
676-
Err(err) => Err(anyhow::Error::from(err)),
677-
},
678-
)
679-
.collect::<Result<_, _>>()?;
680-
681-
let mut request_results: Vec<RequestResultData> = request_result_strings
682-
.into_iter()
683-
.map(
684-
|req_str| match serde_json::from_str::<RequestResultData>(&req_str) {
685-
Ok(request) => Ok(request),
686-
Err(err) => Err(anyhow::Error::from(err)),
687-
},
688-
)
689-
.collect::<Result<_, _>>()?;
690-
691667
get_final_cubestore_result_array(
692668
&transform_requests,
693669
&cube_store_results,
@@ -722,16 +698,13 @@ fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromise>
722698
}
723699

724700
fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise> {
725-
let transform_data_array = cx.argument::<JsArray>(0)?;
726-
let transform_request_strings: Vec<String> = transform_data_array
727-
.to_vec(&mut cx)?
728-
.into_iter()
729-
.map(|js_value| {
730-
js_value
731-
.downcast_or_throw::<JsString, _>(&mut cx)
732-
.map(|js_string| js_string.value(&mut cx))
733-
})
734-
.collect::<Result<_, _>>()?;
701+
let transform_data_array = cx.argument::<JsValue>(0)?;
702+
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
703+
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
704+
{
705+
Ok(data) => data,
706+
Err(err) => return cx.throw_error(err.to_string()),
707+
};
735708

736709
let cube_store_array = cx.argument::<JsArray>(1)?;
737710
let cube_store_results_boxed: Vec<Handle<JsBox<Arc<CubeStoreResult>>>> = cube_store_array
@@ -744,26 +717,15 @@ fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise>
744717
.map(|handle| (**handle).clone())
745718
.collect();
746719

747-
let result_data_str = cx.argument::<JsString>(2)?.value(&mut cx);
720+
let result_data_js_object = cx.argument::<JsValue>(2)?;
721+
let deserializer = JsValueDeserializer::new(&mut cx, result_data_js_object);
722+
let mut result_data: RequestResultDataMulti = match Deserialize::deserialize(deserializer) {
723+
Ok(data) => data,
724+
Err(err) => return cx.throw_error(err.to_string()),
725+
};
748726

749727
let promise = cx
750728
.task(move || {
751-
let transform_requests: Vec<TransformDataRequest> = transform_request_strings
752-
.into_iter()
753-
.map(
754-
|req_str| match serde_json::from_str::<TransformDataRequest>(&req_str) {
755-
Ok(request) => Ok(request),
756-
Err(err) => Err(anyhow::Error::from(err)),
757-
},
758-
)
759-
.collect::<Result<_, _>>()?;
760-
761-
let mut result_data =
762-
match serde_json::from_str::<RequestResultDataMulti>(&result_data_str) {
763-
Ok(data) => data,
764-
Err(err) => return Err(anyhow::Error::from(err)),
765-
};
766-
767729
result_data.prepare_results(&transform_requests, &cube_store_results)?;
768730

769731
match serde_json::to_string(&result_data) {

0 commit comments

Comments
 (0)