Skip to content

Commit 4ebbb9d

Browse files
committed
implement get_final_cubestore_result_array() native
1 parent 4498f31 commit 4ebbb9d

File tree

4 files changed

+136
-7
lines changed

4 files changed

+136
-7
lines changed

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ import {
1111
QueryAlias,
1212
} from '@cubejs-backend/shared';
1313
import {
14-
getFinalCubestoreResult, getFinalCubestoreResultMulti,
14+
getFinalCubestoreResult,
15+
getFinalCubestoreResultArray,
16+
getFinalCubestoreResultMulti,
1517
transformData as transformDataNative,
1618
TransformDataResponse
1719
} from '@cubejs-backend/native';
@@ -1825,7 +1827,7 @@ class ApiGateway {
18251827
// If all query results are from Cubestore (are native)
18261828
// we prepare the final json result on native side
18271829
if (allNative) {
1828-
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[string[], any[], string[]]>(
1830+
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[string[], any[], any[]]>(
18291831
([transformList, rawList, resultList], r) => {
18301832
transformList.push(JSON.stringify(r.transformDataParams));
18311833
rawList.push(r.rawData.getNativeRef());
@@ -1968,6 +1970,8 @@ class ApiGateway {
19681970
annotation
19691971
}];
19701972
}
1973+
1974+
res(request.streaming ? results[0] : { results });
19711975
} else {
19721976
results = await Promise.all(
19731977
normalizedQueries.map(async (normalizedQuery, index) => {
@@ -1999,11 +2003,45 @@ class ApiGateway {
19992003
);
20002004
})
20012005
);
2002-
}
20032006

2004-
res(request.streaming ? results[0] : {
2005-
results,
2006-
});
2007+
const allNative = results.every(r => r.isNative);
2008+
2009+
if (!request.streaming) {
2010+
// If all query results are from Cubestore (are native)
2011+
// we prepare the final json result on native side
2012+
if (allNative) {
2013+
const [transformDataJson, rawDataRef, resultDataJson] = (results as {
2014+
transformDataParams: any;
2015+
rawData: { getNativeRef: () => any };
2016+
}[]).reduce<[string[], any[], string[]]>(
2017+
([transformList, rawList, resultList], r) => {
2018+
transformList.push(JSON.stringify(r.transformDataParams));
2019+
rawList.push(r.rawData.getNativeRef());
2020+
resultList.push(JSON.stringify(cleanupResult(r)));
2021+
return [transformList, rawList, resultList];
2022+
},
2023+
[[], [], []]
2024+
);
2025+
2026+
res(getFinalCubestoreResultArray(transformDataJson, rawDataRef, resultDataJson));
2027+
} else {
2028+
// if we have mixed query results (there are js and native)
2029+
// we prepare results separately: on js and native sides
2030+
// and serve final response from JS side
2031+
res({
2032+
results: results.map(r => {
2033+
const data = r.dataCb();
2034+
return {
2035+
...cleanupResult(r),
2036+
data,
2037+
};
2038+
}),
2039+
});
2040+
}
2041+
} else {
2042+
res(results[0]);
2043+
}
2044+
}
20072045
} catch (e: any) {
20082046
this.handleError({
20092047
e, context, query, res, requestStarted

packages/cubejs-backend-native/js/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,12 @@ export const getFinalCubestoreResult = (transformDataJson: string, rows: any, re
395395
return native.getFinalCubestoreResult(transformDataJson, rows, resultData);
396396
};
397397

398+
export const getFinalCubestoreResultArray = (transformDataJson: string[], rows: any[], resultDataJson: string[]): ArrayBuffer => {
399+
const native = loadNative();
400+
401+
return native.getFinalCubestoreResultArray(transformDataJson, rows, resultDataJson);
402+
};
403+
398404
export const getFinalCubestoreResultMulti = (transformDataJson: string[], rows: any[], responseData: string): ArrayBuffer => {
399405
const native = loadNative();
400406

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ use cubeorchestrator::cubestore_message_parser::CubeStoreResult;
3737
use cubesql::{telemetry::ReportingLogger, CubeError};
3838

3939
use cubeorchestrator::cubestore_result_transform::{
40-
get_final_cubestore_result, get_final_cubestore_result_multi, transform_data,
40+
get_final_cubestore_result, get_final_cubestore_result_array, get_final_cubestore_result_multi,
41+
transform_data,
4142
};
4243
use cubeorchestrator::types::{RequestResultData, RequestResultDataMulti, TransformDataRequest};
4344
use neon::prelude::*;
@@ -624,6 +625,73 @@ fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsArrayBuffer> {
624625
Ok(js_buffer)
625626
}
626627

628+
fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsArrayBuffer> {
629+
let transform_data_array = cx.argument::<JsArray>(0)?;
630+
let transform_requests: Vec<TransformDataRequest> = transform_data_array
631+
.to_vec(&mut cx)?
632+
.into_iter()
633+
.map(|js_value| {
634+
let js_string = js_value
635+
.downcast_or_throw::<JsString, _>(&mut cx)?
636+
.value(&mut cx);
637+
638+
match serde_json::from_str::<TransformDataRequest>(&js_string) {
639+
Ok(request) => Ok(request),
640+
Err(err) => cx.throw_error(err.to_string()),
641+
}
642+
})
643+
.collect::<Result<_, _>>()?;
644+
645+
let cube_store_array = cx.argument::<JsArray>(1)?;
646+
let cube_store_results_boxed: Vec<Handle<JsBox<CubeStoreResult>>> = cube_store_array
647+
.to_vec(&mut cx)?
648+
.into_iter()
649+
.map(|js_value| js_value.downcast_or_throw::<JsBox<CubeStoreResult>, _>(&mut cx))
650+
.collect::<Result<_, _>>()?;
651+
let cube_store_results: Vec<&CubeStoreResult> = cube_store_results_boxed
652+
.iter()
653+
.map(|handle| &***handle)
654+
.collect();
655+
656+
let results_data_array = cx.argument::<JsArray>(2)?;
657+
let mut request_results: Vec<RequestResultData> = results_data_array
658+
.to_vec(&mut cx)?
659+
.into_iter()
660+
.map(|js_value| {
661+
let js_string = js_value
662+
.downcast_or_throw::<JsString, _>(&mut cx)?
663+
.value(&mut cx);
664+
665+
match serde_json::from_str::<RequestResultData>(&js_string) {
666+
Ok(request) => Ok(request),
667+
Err(err) => cx.throw_error(err.to_string()),
668+
}
669+
})
670+
.collect::<Result<_, _>>()?;
671+
672+
if let Err(err) = get_final_cubestore_result_array(
673+
&transform_requests,
674+
&cube_store_results,
675+
&mut request_results,
676+
) {
677+
return cx.throw_error(err.to_string());
678+
}
679+
680+
let json_data = match serde_json::to_string(&request_results) {
681+
Ok(data) => data,
682+
Err(e) => return cx.throw_error(format!("Serialization error: {}", e)),
683+
};
684+
let json_bytes = json_data.as_bytes();
685+
686+
let mut js_buffer = cx.array_buffer(json_bytes.len())?;
687+
{
688+
let buffer = js_buffer.as_mut_slice(&mut cx);
689+
buffer.copy_from_slice(json_bytes);
690+
}
691+
692+
Ok(js_buffer)
693+
}
694+
627695
fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsArrayBuffer> {
628696
let transform_data_array = cx.argument::<JsArray>(0)?;
629697
let transform_requests: Vec<TransformDataRequest> = transform_data_array
@@ -701,6 +769,7 @@ pub fn register_module_exports<C: NodeConfiguration + 'static>(
701769
cx.export_function("transformQueryData", transform_query_data)?;
702770
cx.export_function("getFinalCubestoreResult", final_cubestore_result)?;
703771
cx.export_function("getFinalCubestoreResultMulti", final_cubestore_result_multi)?;
772+
cx.export_function("getFinalCubestoreResultArray", final_cubestore_result_array)?;
704773

705774
crate::template::template_register_module(&mut cx)?;
706775

rust/cubeorchestrator/src/cubestore_result_transform.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,22 @@ pub fn get_final_cubestore_result(
436436
Ok(())
437437
}
438438

439+
pub fn get_final_cubestore_result_array(
440+
transform_requests: &[TransformDataRequest],
441+
cube_store_results: &[&CubeStoreResult],
442+
result_data: &mut [RequestResultData],
443+
) -> Result<()> {
444+
for (transform_data, cube_store_result, result) in multizip((
445+
transform_requests.iter(),
446+
cube_store_results.iter(),
447+
result_data.iter_mut(),
448+
)) {
449+
get_final_cubestore_result(transform_data, cube_store_result, result)?;
450+
}
451+
452+
Ok(())
453+
}
454+
439455
pub fn get_final_cubestore_result_multi(
440456
request_data: &[TransformDataRequest],
441457
cube_store_result: &[&CubeStoreResult],

0 commit comments

Comments
 (0)