Skip to content

Commit 669f5d0

Browse files
committed
omplement get_final_cubestore_result & get_final_cubestore_result_multi in native
1 parent fc22e6e commit 669f5d0

File tree

10 files changed

+368
-58
lines changed

10 files changed

+368
-58
lines changed

packages/cubejs-api-gateway/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"@ungap/structured-clone": "^0.3.4",
3333
"body-parser": "^1.19.0",
3434
"chrono-node": "^2.6.2",
35+
"express": "^4.17.1",
3536
"express-graphql": "^0.12.0",
3637
"graphql": "^15.8.0",
3738
"graphql-scalars": "^1.10.0",
@@ -59,7 +60,6 @@
5960
"@types/ramda": "^0.27.32",
6061
"@types/supertest": "^2.0.10",
6162
"@types/uuid": "^8.3.1",
62-
"express": "^4.17.1",
6363
"jest": "^27",
6464
"mysql": "^2.18.1",
6565
"should": "^13.2.3",

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,11 @@ import {
1010
getRealType,
1111
QueryAlias,
1212
} from '@cubejs-backend/shared';
13-
import { transformData as transformDataNative, TransformDataResponse } from '@cubejs-backend/native';
13+
import {
14+
getFinalCubestoreResult, getFinalCubestoreResultMulti,
15+
transformData as transformDataNative,
16+
TransformDataResponse
17+
} from '@cubejs-backend/native';
1418
import type {
1519
Application as ExpressApplication,
1620
ErrorRequestHandler,
@@ -114,6 +118,16 @@ function systemAsyncHandler(handler: (req: Request & { context: ExtendedRequestC
114118
};
115119
}
116120

121+
function cleanupResult(result) {
122+
return {
123+
...result,
124+
dataCb: undefined,
125+
rawData: undefined,
126+
transformDataParams: undefined,
127+
isNative: undefined,
128+
};
129+
}
130+
117131
// Prepared CheckAuthFn, default or from config: always async, returns nothing
118132
type PreparedCheckAuthFn = (ctx: any, authorization?: string) => Promise<void>;
119133

@@ -1623,6 +1637,7 @@ class ApiGateway {
16231637
return {
16241638
query: normalizedQuery,
16251639
dataCb,
1640+
rawData: response.data,
16261641
transformDataParams,
16271642
lastRefreshTime: response.lastRefreshTime?.toISOString(),
16281643
...(
@@ -1805,27 +1820,56 @@ class ApiGateway {
18051820
const allNative = results.every(r => r.isNative);
18061821

18071822
if (props.queryType === 'multi') {
1808-
res({
1809-
queryType,
1810-
results: results.map(r => {
1811-
const data = r.dataCb();
1812-
return {
1813-
...r,
1814-
data,
1815-
dataCb: undefined,
1816-
transformDataParams: undefined
1817-
};
1818-
}),
1819-
pivotQuery: getPivotQuery(queryType, normalizedQueries),
1820-
slowQuery
1821-
});
1823+
// If all query results are from Cubestore (are native)
1824+
// we prepare the final json result on native side
1825+
if (allNative) {
1826+
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[string[], any[], string[]]>(
1827+
([transformList, rawList, resultList], r) => {
1828+
transformList.push(JSON.stringify(r.transformDataParams));
1829+
rawList.push(r.rawData.getNativeRef());
1830+
resultList.push(cleanupResult(r));
1831+
return [transformList, rawList, resultList];
1832+
},
1833+
[[], [], []]
1834+
);
1835+
1836+
const responseDataObj = {
1837+
queryType,
1838+
results: cleanResultList,
1839+
slowQuery
1840+
};
1841+
const resultDataJson = JSON.stringify(responseDataObj);
1842+
1843+
res(getFinalCubestoreResultMulti(transformDataJson, rawDataRef, resultDataJson));
1844+
} else {
1845+
// if we have mixed query results (there are js and native)
1846+
// we prepare results separately: on js and native sides
1847+
// and serve final response from JS side
1848+
res({
1849+
queryType,
1850+
results: results.map(r => {
1851+
const data = r.dataCb();
1852+
return {
1853+
...cleanupResult(r),
1854+
data,
1855+
};
1856+
}),
1857+
pivotQuery: getPivotQuery(queryType, normalizedQueries),
1858+
slowQuery
1859+
});
1860+
}
1861+
} else if (allNative) {
1862+
// We prepare the full final json result on native side
1863+
const r = results[0];
1864+
const transformDataJson = JSON.stringify(r.transformDataParams);
1865+
const rawDataRef = r.rawData.getNativeRef();
1866+
const resultDataJson = JSON.stringify(cleanupResult(r));
1867+
res(getFinalCubestoreResult(transformDataJson, rawDataRef, resultDataJson));
18221868
} else {
18231869
const data = results[0].dataCb();
18241870
res({
1825-
...results[0],
1871+
...cleanupResult(results[0]),
18261872
data,
1827-
dataCb: undefined,
1828-
transformDataParams: undefined
18291873
});
18301874
}
18311875
} catch (e: any) {
@@ -2034,10 +2078,10 @@ class ApiGateway {
20342078

20352079
if (message instanceof ArrayBuffer) {
20362080
res.set('Content-Type', 'application/json');
2037-
res.send(message);
2081+
res.send(Buffer.from(message));
2082+
} else {
2083+
res.json(message);
20382084
}
2039-
2040-
res.json(message);
20412085
};
20422086
}
20432087

packages/cubejs-backend-native/Cargo.lock

Lines changed: 14 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cubejs-backend-native/js/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,18 @@ export const transformData = (transformDataJson: string, rows: any): TransformDa
389389
return native.transformQueryData(transformDataJson, rows);
390390
};
391391

392+
export const getFinalCubestoreResult = (transformDataJson: string, rows: any, resultData: string): ArrayBuffer => {
393+
const native = loadNative();
394+
395+
return native.getFinalCubestoreResult(transformDataJson, rows, resultData);
396+
};
397+
398+
export const getFinalCubestoreResultMulti = (transformDataJson: string[], rows: any[], responseData: string): ArrayBuffer => {
399+
const native = loadNative();
400+
401+
return native.getFinalCubestoreResultMulti(transformDataJson, rows, responseData);
402+
};
403+
392404
export interface PyConfiguration {
393405
repositoryFactory?: (ctx: unknown) => Promise<unknown>,
394406
logger?: (msg: string, params: Record<string, any>) => void,

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 100 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,12 @@ use cubeorchestrator::cubestore_message_parser::CubeStoreResult;
3636

3737
use cubesql::{telemetry::ReportingLogger, CubeError};
3838

39+
use cubeorchestrator::cubestore_result_transform::{
40+
get_final_cubestore_result, get_final_cubestore_result_multi, transform_data,
41+
};
42+
use cubeorchestrator::types::{RequestResultData, RequestResultDataMulti, TransformDataRequest};
3943
use neon::prelude::*;
4044
use neon::types::buffer::TypedArray;
41-
use cubeorchestrator::cubestore_result_transform::transform_data;
42-
use cubeorchestrator::types::{TransformDataRequest};
4345

4446
struct SQLInterface {
4547
services: Arc<NodeCubeServices>,
@@ -546,9 +548,7 @@ fn get_cubestore_result(mut cx: FunctionContext) -> JsResult<JsValue> {
546548
}
547549

548550
fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsObject> {
549-
let json_str = cx
550-
.argument::<JsString>(0)?
551-
.value(&mut cx);
551+
let json_str = cx.argument::<JsString>(0)?.value(&mut cx);
552552
let request_data = match serde_json::from_str::<TransformDataRequest>(&json_str) {
553553
Ok(data) => data,
554554
Err(err) => return cx.throw_error(err.to_string()),
@@ -571,7 +571,7 @@ fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsObject> {
571571
res_type.clone(),
572572
) {
573573
Ok(data) => data,
574-
Err(err) => return cx.throw_error(err.to_string())
574+
Err(err) => return cx.throw_error(err.to_string()),
575575
};
576576

577577
let json_data = match serde_json::to_string(&transformed) {
@@ -587,6 +587,98 @@ fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsObject> {
587587
Ok(js_result)
588588
}
589589

590+
fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsArrayBuffer> {
591+
let transform_data_str = cx.argument::<JsString>(0)?.value(&mut cx);
592+
let transform_request_data =
593+
match serde_json::from_str::<TransformDataRequest>(&transform_data_str) {
594+
Ok(data) => data,
595+
Err(err) => return cx.throw_error(err.to_string()),
596+
};
597+
let cube_store_result = cx.argument::<JsBox<CubeStoreResult>>(1)?;
598+
let result_data_str = cx.argument::<JsString>(2)?.value(&mut cx);
599+
let mut result_data = match serde_json::from_str::<RequestResultData>(&result_data_str) {
600+
Ok(data) => data,
601+
Err(err) => return cx.throw_error(err.to_string()),
602+
};
603+
604+
if let Err(err) = get_final_cubestore_result(
605+
&transform_request_data,
606+
&**cube_store_result,
607+
&mut result_data,
608+
) {
609+
return cx.throw_error(err.to_string());
610+
}
611+
612+
let json_data = match serde_json::to_string(&result_data) {
613+
Ok(data) => data,
614+
Err(e) => return cx.throw_error(format!("Serialization error: {}", e)),
615+
};
616+
let json_bytes = json_data.as_bytes();
617+
618+
let mut js_buffer = cx.array_buffer(json_bytes.len())?;
619+
{
620+
let buffer = js_buffer.as_mut_slice(&mut cx);
621+
buffer.copy_from_slice(json_bytes);
622+
}
623+
624+
Ok(js_buffer)
625+
}
626+
627+
fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsArrayBuffer> {
628+
let transform_data_array = cx.argument::<JsArray>(0)?;
629+
let transform_requests: Vec<TransformDataRequest> = transform_data_array
630+
.to_vec(&mut cx)?
631+
.into_iter()
632+
.map(|js_value| {
633+
let js_string = js_value
634+
.downcast_or_throw::<JsString, _>(&mut cx)?
635+
.value(&mut cx);
636+
637+
match serde_json::from_str::<TransformDataRequest>(&js_string) {
638+
Ok(request) => Ok(request),
639+
Err(err) => return cx.throw_error(err.to_string()),
640+
}
641+
})
642+
.collect::<Result<_, _>>()?;
643+
644+
let cube_store_array = cx.argument::<JsArray>(1)?;
645+
let cube_store_results_boxed: Vec<Handle<JsBox<CubeStoreResult>>> = cube_store_array
646+
.to_vec(&mut cx)?
647+
.into_iter()
648+
.map(|js_value| js_value.downcast_or_throw::<JsBox<CubeStoreResult>, _>(&mut cx))
649+
.collect::<Result<_, _>>()?;
650+
let cube_store_results: Vec<&CubeStoreResult> = cube_store_results_boxed
651+
.iter()
652+
.map(|handle| &***handle)
653+
.collect();
654+
655+
let result_data_str = cx.argument::<JsString>(2)?.value(&mut cx);
656+
let mut result_data = match serde_json::from_str::<RequestResultDataMulti>(&result_data_str) {
657+
Ok(data) => data,
658+
Err(err) => return cx.throw_error(err.to_string()),
659+
};
660+
661+
if let Err(err) =
662+
get_final_cubestore_result_multi(&transform_requests, &cube_store_results, &mut result_data)
663+
{
664+
return cx.throw_error(err.to_string());
665+
}
666+
667+
let json_data = match serde_json::to_string(&result_data) {
668+
Ok(data) => data,
669+
Err(e) => return cx.throw_error(format!("Serialization error: {}", e)),
670+
};
671+
let json_bytes = json_data.as_bytes();
672+
673+
let mut js_buffer = cx.array_buffer(json_bytes.len())?;
674+
{
675+
let buffer = js_buffer.as_mut_slice(&mut cx);
676+
buffer.copy_from_slice(json_bytes);
677+
}
678+
679+
Ok(js_buffer)
680+
}
681+
590682
pub fn register_module_exports<C: NodeConfiguration + 'static>(
591683
mut cx: ModuleContext,
592684
) -> NeonResult<()> {
@@ -607,6 +699,8 @@ pub fn register_module_exports<C: NodeConfiguration + 'static>(
607699
)?;
608700
cx.export_function("getCubestoreResult", get_cubestore_result)?;
609701
cx.export_function("transformQueryData", transform_query_data)?;
702+
cx.export_function("getFinalCubestoreResult", final_cubestore_result)?;
703+
cx.export_function("getFinalCubestoreResultMulti", final_cubestore_result_multi)?;
610704

611705
crate::template::template_register_module(&mut cx)?;
612706

0 commit comments

Comments
 (0)