Skip to content

Commit 3ed4d82

Browse files
committed
omplement get_final_cubestore_result & get_final_cubestore_result_multi in native
1 parent 95c6566 commit 3ed4d82

File tree

10 files changed

+368
-58
lines changed

10 files changed

+368
-58
lines changed

packages/cubejs-api-gateway/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"@ungap/structured-clone": "^0.3.4",
3333
"body-parser": "^1.19.0",
3434
"chrono-node": "^2.6.2",
35+
"express": "^4.17.1",
3536
"express-graphql": "^0.12.0",
3637
"graphql": "^15.8.0",
3738
"graphql-scalars": "^1.10.0",
@@ -59,7 +60,6 @@
5960
"@types/ramda": "^0.27.32",
6061
"@types/supertest": "^2.0.10",
6162
"@types/uuid": "^8.3.1",
62-
"express": "^4.17.1",
6363
"jest": "^27",
6464
"mysql": "^2.18.1",
6565
"should": "^13.2.3",

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ import {
1010
getRealType,
1111
QueryAlias,
1212
} from '@cubejs-backend/shared';
13-
import { transformData as transformDataNative, TransformDataResponse } from '@cubejs-backend/native';
13+
import {
14+
getFinalCubestoreResult, getFinalCubestoreResultMulti,
15+
transformData as transformDataNative,
16+
TransformDataResponse
17+
} from '@cubejs-backend/native';
1418
import type {
1519
Application as ExpressApplication,
1620
ErrorRequestHandler,
@@ -114,6 +118,16 @@ function systemAsyncHandler(handler: (req: Request & { context: ExtendedRequestC
114118
};
115119
}
116120

121+
function cleanupResult(result) {
122+
return {
123+
...result,
124+
dataCb: undefined,
125+
rawData: undefined,
126+
transformDataParams: undefined,
127+
isNative: undefined,
128+
};
129+
}
130+
117131
// Prepared CheckAuthFn, default or from config: always async, returns nothing
118132
type PreparedCheckAuthFn = (ctx: any, authorization?: string) => Promise<void>;
119133

@@ -1625,6 +1639,7 @@ class ApiGateway {
16251639
return {
16261640
query: normalizedQuery,
16271641
dataCb,
1642+
rawData: response.data,
16281643
transformDataParams,
16291644
lastRefreshTime: response.lastRefreshTime?.toISOString(),
16301645
...(
@@ -1807,27 +1822,56 @@ class ApiGateway {
18071822
const allNative = results.every(r => r.isNative);
18081823

18091824
if (props.queryType === 'multi') {
1810-
res({
1811-
queryType,
1812-
results: results.map(r => {
1813-
const data = r.dataCb();
1814-
return {
1815-
...r,
1816-
data,
1817-
dataCb: undefined,
1818-
transformDataParams: undefined
1819-
};
1820-
}),
1821-
pivotQuery: getPivotQuery(queryType, normalizedQueries),
1822-
slowQuery
1823-
});
1825+
// If all query results are from Cubestore (are native)
1826+
// we prepare the final json result on native side
1827+
if (allNative) {
1828+
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[string[], any[], string[]]>(
1829+
([transformList, rawList, resultList], r) => {
1830+
transformList.push(JSON.stringify(r.transformDataParams));
1831+
rawList.push(r.rawData.getNativeRef());
1832+
resultList.push(cleanupResult(r));
1833+
return [transformList, rawList, resultList];
1834+
},
1835+
[[], [], []]
1836+
);
1837+
1838+
const responseDataObj = {
1839+
queryType,
1840+
results: cleanResultList,
1841+
slowQuery
1842+
};
1843+
const resultDataJson = JSON.stringify(responseDataObj);
1844+
1845+
res(getFinalCubestoreResultMulti(transformDataJson, rawDataRef, resultDataJson));
1846+
} else {
1847+
// if we have mixed query results (there are js and native)
1848+
// we prepare results separately: on js and native sides
1849+
// and serve final response from JS side
1850+
res({
1851+
queryType,
1852+
results: results.map(r => {
1853+
const data = r.dataCb();
1854+
return {
1855+
...cleanupResult(r),
1856+
data,
1857+
};
1858+
}),
1859+
pivotQuery: getPivotQuery(queryType, normalizedQueries),
1860+
slowQuery
1861+
});
1862+
}
1863+
} else if (allNative) {
1864+
// We prepare the full final json result on native side
1865+
const r = results[0];
1866+
const transformDataJson = JSON.stringify(r.transformDataParams);
1867+
const rawDataRef = r.rawData.getNativeRef();
1868+
const resultDataJson = JSON.stringify(cleanupResult(r));
1869+
res(getFinalCubestoreResult(transformDataJson, rawDataRef, resultDataJson));
18241870
} else {
18251871
const data = results[0].dataCb();
18261872
res({
1827-
...results[0],
1873+
...cleanupResult(results[0]),
18281874
data,
1829-
dataCb: undefined,
1830-
transformDataParams: undefined
18311875
});
18321876
}
18331877
} catch (e: any) {
@@ -2036,10 +2080,10 @@ class ApiGateway {
20362080

20372081
if (message instanceof ArrayBuffer) {
20382082
res.set('Content-Type', 'application/json');
2039-
res.send(message);
2083+
res.send(Buffer.from(message));
2084+
} else {
2085+
res.json(message);
20402086
}
2041-
2042-
res.json(message);
20432087
};
20442088
}
20452089

packages/cubejs-backend-native/Cargo.lock

Lines changed: 14 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cubejs-backend-native/js/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,18 @@ export const transformData = (transformDataJson: string, rows: any): TransformDa
389389
return native.transformQueryData(transformDataJson, rows);
390390
};
391391

392+
export const getFinalCubestoreResult = (transformDataJson: string, rows: any, resultData: string): ArrayBuffer => {
393+
const native = loadNative();
394+
395+
return native.getFinalCubestoreResult(transformDataJson, rows, resultData);
396+
};
397+
398+
export const getFinalCubestoreResultMulti = (transformDataJson: string[], rows: any[], responseData: string): ArrayBuffer => {
399+
const native = loadNative();
400+
401+
return native.getFinalCubestoreResultMulti(transformDataJson, rows, responseData);
402+
};
403+
392404
export interface PyConfiguration {
393405
repositoryFactory?: (ctx: unknown) => Promise<unknown>,
394406
logger?: (msg: string, params: Record<string, any>) => void,

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 100 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ use cubeorchestrator::cubestore_message_parser::CubeStoreResult;
3636

3737
use cubesql::{telemetry::ReportingLogger, CubeError};
3838

39+
use cubeorchestrator::cubestore_result_transform::{
40+
get_final_cubestore_result, get_final_cubestore_result_multi, transform_data,
41+
};
42+
use cubeorchestrator::types::{RequestResultData, RequestResultDataMulti, TransformDataRequest};
3943
use neon::prelude::*;
4044
use neon::types::buffer::TypedArray;
41-
use cubeorchestrator::cubestore_result_transform::transform_data;
42-
use cubeorchestrator::types::{TransformDataRequest};
4345

4446
struct SQLInterface {
4547
services: Arc<NodeCubeServices>,
@@ -546,9 +548,7 @@ fn get_cubestore_result(mut cx: FunctionContext) -> JsResult<JsValue> {
546548
}
547549

548550
fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsObject> {
549-
let json_str = cx
550-
.argument::<JsString>(0)?
551-
.value(&mut cx);
551+
let json_str = cx.argument::<JsString>(0)?.value(&mut cx);
552552
let request_data = match serde_json::from_str::<TransformDataRequest>(&json_str) {
553553
Ok(data) => data,
554554
Err(err) => return cx.throw_error(err.to_string()),
@@ -571,7 +571,7 @@ fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsObject> {
571571
res_type.clone(),
572572
) {
573573
Ok(data) => data,
574-
Err(err) => return cx.throw_error(err.to_string())
574+
Err(err) => return cx.throw_error(err.to_string()),
575575
};
576576

577577
let json_data = match serde_json::to_string(&transformed) {
@@ -587,6 +587,98 @@ fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsObject> {
587587
Ok(js_result)
588588
}
589589

590+
fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsArrayBuffer> {
591+
let transform_data_str = cx.argument::<JsString>(0)?.value(&mut cx);
592+
let transform_request_data =
593+
match serde_json::from_str::<TransformDataRequest>(&transform_data_str) {
594+
Ok(data) => data,
595+
Err(err) => return cx.throw_error(err.to_string()),
596+
};
597+
let cube_store_result = cx.argument::<JsBox<CubeStoreResult>>(1)?;
598+
let result_data_str = cx.argument::<JsString>(2)?.value(&mut cx);
599+
let mut result_data = match serde_json::from_str::<RequestResultData>(&result_data_str) {
600+
Ok(data) => data,
601+
Err(err) => return cx.throw_error(err.to_string()),
602+
};
603+
604+
if let Err(err) = get_final_cubestore_result(
605+
&transform_request_data,
606+
&**cube_store_result,
607+
&mut result_data,
608+
) {
609+
return cx.throw_error(err.to_string());
610+
}
611+
612+
let json_data = match serde_json::to_string(&result_data) {
613+
Ok(data) => data,
614+
Err(e) => return cx.throw_error(format!("Serialization error: {}", e)),
615+
};
616+
let json_bytes = json_data.as_bytes();
617+
618+
let mut js_buffer = cx.array_buffer(json_bytes.len())?;
619+
{
620+
let buffer = js_buffer.as_mut_slice(&mut cx);
621+
buffer.copy_from_slice(json_bytes);
622+
}
623+
624+
Ok(js_buffer)
625+
}
626+
627+
fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsArrayBuffer> {
628+
let transform_data_array = cx.argument::<JsArray>(0)?;
629+
let transform_requests: Vec<TransformDataRequest> = transform_data_array
630+
.to_vec(&mut cx)?
631+
.into_iter()
632+
.map(|js_value| {
633+
let js_string = js_value
634+
.downcast_or_throw::<JsString, _>(&mut cx)?
635+
.value(&mut cx);
636+
637+
match serde_json::from_str::<TransformDataRequest>(&js_string) {
638+
Ok(request) => Ok(request),
639+
Err(err) => return cx.throw_error(err.to_string()),
640+
}
641+
})
642+
.collect::<Result<_, _>>()?;
643+
644+
let cube_store_array = cx.argument::<JsArray>(1)?;
645+
let cube_store_results_boxed: Vec<Handle<JsBox<CubeStoreResult>>> = cube_store_array
646+
.to_vec(&mut cx)?
647+
.into_iter()
648+
.map(|js_value| js_value.downcast_or_throw::<JsBox<CubeStoreResult>, _>(&mut cx))
649+
.collect::<Result<_, _>>()?;
650+
let cube_store_results: Vec<&CubeStoreResult> = cube_store_results_boxed
651+
.iter()
652+
.map(|handle| &***handle)
653+
.collect();
654+
655+
let result_data_str = cx.argument::<JsString>(2)?.value(&mut cx);
656+
let mut result_data = match serde_json::from_str::<RequestResultDataMulti>(&result_data_str) {
657+
Ok(data) => data,
658+
Err(err) => return cx.throw_error(err.to_string()),
659+
};
660+
661+
if let Err(err) =
662+
get_final_cubestore_result_multi(&transform_requests, &cube_store_results, &mut result_data)
663+
{
664+
return cx.throw_error(err.to_string());
665+
}
666+
667+
let json_data = match serde_json::to_string(&result_data) {
668+
Ok(data) => data,
669+
Err(e) => return cx.throw_error(format!("Serialization error: {}", e)),
670+
};
671+
let json_bytes = json_data.as_bytes();
672+
673+
let mut js_buffer = cx.array_buffer(json_bytes.len())?;
674+
{
675+
let buffer = js_buffer.as_mut_slice(&mut cx);
676+
buffer.copy_from_slice(json_bytes);
677+
}
678+
679+
Ok(js_buffer)
680+
}
681+
590682
pub fn register_module_exports<C: NodeConfiguration + 'static>(
591683
mut cx: ModuleContext,
592684
) -> NeonResult<()> {
@@ -607,6 +699,8 @@ pub fn register_module_exports<C: NodeConfiguration + 'static>(
607699
)?;
608700
cx.export_function("getCubestoreResult", get_cubestore_result)?;
609701
cx.export_function("transformQueryData", transform_query_data)?;
702+
cx.export_function("getFinalCubestoreResult", final_cubestore_result)?;
703+
cx.export_function("getFinalCubestoreResultMulti", final_cubestore_result_multi)?;
610704

611705
crate::template::template_register_module(&mut cx)?;
612706

0 commit comments

Comments
 (0)