Skip to content

Commit 52ce278

Browse files
committed
always use transform data native for all results
1 parent 1fdc9af commit 52ce278

File tree

8 files changed

+171
-160
lines changed

8 files changed

+171
-160
lines changed

packages/cubejs-api-gateway/src/gateway.ts

Lines changed: 36 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import {
1111
QueryAlias,
1212
} from '@cubejs-backend/shared';
1313
import {
14-
getFinalCubestoreResult,
15-
getFinalCubestoreResultArray,
16-
getFinalCubestoreResultMulti,
14+
getFinalQueryResult,
15+
getFinalQueryResultArray,
16+
getFinalQueryResultMulti,
1717
transformData as transformDataNative,
1818
TransformDataResponse
1919
} from '@cubejs-backend/native';
@@ -123,10 +123,8 @@ function systemAsyncHandler(handler: (req: Request & { context: ExtendedRequestC
123123
function cleanupResult(result) {
124124
return {
125125
...result,
126-
dataCb: undefined,
127126
rawData: undefined,
128127
transformDataParams: undefined,
129-
isNative: undefined,
130128
};
131129
}
132130

@@ -1625,23 +1623,8 @@ class ApiGateway {
16251623
};
16261624

16271625
// We postpone data transformation until the last minute
1628-
// in case when all responses are native - we process them in native part
1629-
const dataCb: TransformDataResponseCb = response.data.isNative ?
1630-
async () => {
1631-
const jsonData = await transformDataNative(
1632-
transformDataParams, response.data.getNativeRef()
1633-
);
1634-
return JSON.parse(jsonData.result) as TransformDataResponse;
1635-
}
1636-
:
1637-
async () => transformData({
1638-
...transformDataParams,
1639-
data: response.data,
1640-
});
1641-
16421626
return {
16431627
query: normalizedQuery,
1644-
dataCb,
16451628
rawData: response.data,
16461629
transformDataParams,
16471630
lastRefreshTime: response.lastRefreshTime?.toISOString(),
@@ -1663,7 +1646,6 @@ class ApiGateway {
16631646
external: response.external,
16641647
slowQuery: Boolean(response.slowQuery),
16651648
total: normalizedQuery.total ? response.total : null,
1666-
isNative: response.data.isNative
16671649
};
16681650
}
16691651

@@ -1822,57 +1804,30 @@ class ApiGateway {
18221804
context,
18231805
);
18241806

1825-
const allNative = results.every(r => r.isNative);
1826-
18271807
if (props.queryType === 'multi') {
1828-
// If all query results are from Cubestore (are native)
1829-
// we prepare the final json result on native side
1830-
if (allNative) {
1831-
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[Object[], any[], Object[]]>(
1832-
([transformList, rawList, resultList], r) => {
1833-
transformList.push(r.transformDataParams);
1834-
rawList.push(r.rawData.getNativeRef());
1835-
resultList.push(cleanupResult(r));
1836-
return [transformList, rawList, resultList];
1837-
},
1838-
[[], [], []]
1839-
);
1808+
// We prepare the final json result on native side
1809+
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[Object[], any[], Object[]]>(
1810+
([transformList, rawList, resultList], r) => {
1811+
transformList.push(r.transformDataParams);
1812+
rawList.push(r.rawData.isNative ? r.rawData.getNativeRef() : r.rawData);
1813+
resultList.push(cleanupResult(r));
1814+
return [transformList, rawList, resultList];
1815+
},
1816+
[[], [], []]
1817+
);
18401818

1841-
const responseDataObj = {
1842-
queryType,
1843-
results: cleanResultList,
1844-
slowQuery
1845-
};
1819+
const responseDataObj = {
1820+
queryType,
1821+
results: cleanResultList,
1822+
slowQuery
1823+
};
18461824

1847-
res(await getFinalCubestoreResultMulti(transformDataJson, rawDataRef, responseDataObj));
1848-
} else {
1849-
// if we have mixed query results (there are js and native)
1850-
// we prepare results separately: on js and native sides
1851-
// and serve final response from JS side
1852-
res({
1853-
queryType,
1854-
results: await Promise.all(results.map(async (r) => {
1855-
const data = await r.dataCb();
1856-
return {
1857-
...cleanupResult(r),
1858-
data,
1859-
};
1860-
})),
1861-
pivotQuery: getPivotQuery(queryType, normalizedQueries),
1862-
slowQuery
1863-
});
1864-
}
1865-
} else if (allNative) {
1825+
res(await getFinalQueryResultMulti(transformDataJson, rawDataRef, responseDataObj));
1826+
} else {
18661827
// We prepare the full final json result on native side
18671828
const r = results[0];
1868-
const rawDataRef = r.rawData.getNativeRef();
1869-
res(await getFinalCubestoreResult(r.transformDataParams, rawDataRef, cleanupResult(r)));
1870-
} else {
1871-
const data = await results[0].dataCb();
1872-
res({
1873-
...cleanupResult(results[0]),
1874-
data,
1875-
});
1829+
const rawData = r.rawData.isNative ? r.rawData.getNativeRef() : r.rawData;
1830+
res(await getFinalQueryResult(r.transformDataParams, rawData, cleanupResult(r)));
18761831
}
18771832
} catch (e: any) {
18781833
this.handleError({
@@ -2002,40 +1957,22 @@ class ApiGateway {
20021957
})
20031958
);
20041959

2005-
const allNative = results.every(r => r.isNative);
2006-
20071960
if (!request.streaming) {
2008-
// If all query results are from Cubestore (are native)
2009-
// we prepare the final json result on native side
2010-
if (allNative) {
2011-
const [transformDataJson, rawDataRef, resultDataJson] = (results as {
2012-
transformDataParams: any;
2013-
rawData: { getNativeRef: () => any };
2014-
}[]).reduce<[Object[], any[], Object[]]>(
2015-
([transformList, rawList, resultList], r) => {
2016-
transformList.push(r.transformDataParams);
2017-
rawList.push(r.rawData.getNativeRef());
2018-
resultList.push(cleanupResult(r));
2019-
return [transformList, rawList, resultList];
2020-
},
2021-
[[], [], []]
2022-
);
1961+
// We prepare the final json result on native side
1962+
const [transformDataJson, rawData, resultDataJson] = (results as {
1963+
transformDataParams: any;
1964+
rawData: { isNative: boolean, getNativeRef: () => any };
1965+
}[]).reduce<[Object[], any[], Object[]]>(
1966+
([transformList, rawList, resultList], r) => {
1967+
transformList.push(r.transformDataParams);
1968+
rawList.push(r.rawData.isNative ? r.rawData.getNativeRef() : r.rawData);
1969+
resultList.push(cleanupResult(r));
1970+
return [transformList, rawList, resultList];
1971+
},
1972+
[[], [], []]
1973+
);
20231974

2024-
res(await getFinalCubestoreResultArray(transformDataJson, rawDataRef, resultDataJson));
2025-
} else {
2026-
// if we have mixed query results (there are js and native)
2027-
// we prepare results separately: on js and native sides
2028-
// and serve final response from JS side
2029-
res({
2030-
results: await Promise.all(results.map(async (r) => {
2031-
const data = await r.dataCb();
2032-
return {
2033-
...cleanupResult(r),
2034-
data,
2035-
};
2036-
})),
2037-
});
2038-
}
1975+
res(await getFinalQueryResultArray(transformDataJson, rawData, resultDataJson));
20391976
} else {
20401977
res(results[0]);
20411978
}

packages/cubejs-backend-native/js/index.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -389,22 +389,22 @@ export const transformData = (transformDataObj: Object, rows: any): Promise<Tran
389389
return native.transformQueryData(transformDataObj, rows);
390390
};
391391

392-
export const getFinalCubestoreResult = (transformDataObj: Object, rows: any, resultData: Object): Promise<ArrayBuffer> => {
392+
export const getFinalQueryResult = (transformDataObj: Object, rows: any, resultData: Object): Promise<ArrayBuffer> => {
393393
const native = loadNative();
394394

395-
return native.getFinalCubestoreResult(transformDataObj, rows, resultData);
395+
return native.getFinalQueryResult(transformDataObj, rows, resultData);
396396
};
397397

398-
export const getFinalCubestoreResultArray = (transformDataArr: Object[], rows: any[], resultDataArr: Object[]): Promise<ArrayBuffer> => {
398+
export const getFinalQueryResultArray = (transformDataArr: Object[], rows: any[], resultDataArr: Object[]): Promise<ArrayBuffer> => {
399399
const native = loadNative();
400400

401-
return native.getFinalCubestoreResultArray(transformDataArr, rows, resultDataArr);
401+
return native.getFinalQueryResultArray(transformDataArr, rows, resultDataArr);
402402
};
403403

404-
export const getFinalCubestoreResultMulti = (transformDataArr: Object[], rows: any[], responseData: Object): Promise<ArrayBuffer> => {
404+
export const getFinalQueryResultMulti = (transformDataArr: Object[], rows: any[], responseData: Object): Promise<ArrayBuffer> => {
405405
const native = loadNative();
406406

407-
return native.getFinalCubestoreResultMulti(transformDataArr, rows, responseData);
407+
return native.getFinalQueryResultMulti(transformDataArr, rows, responseData);
408408
};
409409

410410
export interface PyConfiguration {

packages/cubejs-backend-native/src/node_obj_deserializer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl<'de, 'a, 'b> Deserializer<'de> for JsValueDeserializer<'a, 'b> {
103103
}
104104

105105
Err(JsDeserializationError(
106-
"Unsupported type for deserialization".to_string(),
106+
"Unsupported number type for deserialization".to_string(),
107107
))
108108
} else if self.value.is_a::<JsBoolean, _>(self.cx) {
109109
let value = self

packages/cubejs-backend-native/src/orchestrator.rs

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::node_obj_deserializer::JsValueDeserializer;
2-
use cubeorchestrator::cubestore_message_parser::CubeStoreResult;
3-
use cubeorchestrator::cubestore_result_transform::{
2+
use cubeorchestrator::query_message_parser::QueryResult;
3+
use cubeorchestrator::query_result_transform::{
44
get_final_cubestore_result_array, RequestResultArray, RequestResultData,
55
RequestResultDataMulti, TransformedData,
66
};
7-
use cubeorchestrator::transport::TransformDataRequest;
7+
use cubeorchestrator::transport::{JsRawData, TransformDataRequest};
88
use neon::context::{Context, FunctionContext, ModuleContext};
99
use neon::handle::Handle;
1010
use neon::object::Object;
@@ -22,9 +22,9 @@ pub fn register_module(cx: &mut ModuleContext) -> NeonResult<()> {
2222
)?;
2323
cx.export_function("getCubestoreResult", get_cubestore_result)?;
2424
cx.export_function("transformQueryData", transform_query_data)?;
25-
cx.export_function("getFinalCubestoreResult", final_cubestore_result)?;
26-
cx.export_function("getFinalCubestoreResultMulti", final_cubestore_result_multi)?;
27-
cx.export_function("getFinalCubestoreResultArray", final_cubestore_result_array)?;
25+
cx.export_function("getFinalQueryResult", final_query_result)?;
26+
cx.export_function("getFinalQueryResultMulti", final_query_result_multi)?;
27+
cx.export_function("getFinalQueryResultArray", final_query_result_array)?;
2828

2929
Ok(())
3030
}
@@ -50,12 +50,31 @@ where
5050
}
5151
}
5252

53+
fn extract_query_result(
54+
cx: &mut FunctionContext<'_>,
55+
data_arg: Handle<JsValue>,
56+
) -> Result<Arc<QueryResult>, anyhow::Error> {
57+
58+
if let Ok(js_box) = data_arg.downcast::<JsBox<Arc<QueryResult>>, _>(cx) {
59+
Ok(Arc::clone(&js_box))
60+
} else if let Ok(js_array) = data_arg.downcast::<JsArray, _>(cx) {
61+
let deserializer = JsValueDeserializer::new(cx, js_array.upcast());
62+
let js_raw_data: JsRawData = Deserialize::deserialize(deserializer)?;
63+
64+
QueryResult::from_js_raw_data(js_raw_data)
65+
.map(Arc::new)
66+
.map_err(anyhow::Error::from)
67+
} else {
68+
Err(anyhow::anyhow!("Second argument must be an Array of JsBox<Arc<QueryResult>> or JsArray"))
69+
}
70+
}
71+
5372
pub fn parse_cubestore_result_message(mut cx: FunctionContext) -> JsResult<JsPromise> {
5473
let msg = cx.argument::<JsBuffer>(0)?;
5574
let msg_data = msg.as_slice(&cx).to_vec();
5675

5776
let promise = cx
58-
.task(move || CubeStoreResult::from_fb(&msg_data))
77+
.task(move || QueryResult::from_cubestore_fb(&msg_data))
5978
.promise(move |mut cx, res| match res {
6079
Ok(result) => Ok(cx.boxed(Arc::new(result))),
6180
Err(err) => cx.throw_error(err.to_string()),
@@ -65,7 +84,7 @@ pub fn parse_cubestore_result_message(mut cx: FunctionContext) -> JsResult<JsPro
6584
}
6685

6786
pub fn get_cubestore_result(mut cx: FunctionContext) -> JsResult<JsValue> {
68-
let result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(0)?;
87+
let result = cx.argument::<JsBox<Arc<QueryResult>>>(0)?;
6988

7089
let js_array = cx.execute_scoped(|mut cx| {
7190
let js_array = JsArray::new(&mut cx, result.rows.len());
@@ -99,7 +118,7 @@ pub fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsPromise> {
99118
Err(err) => return cx.throw_error(err.to_string()),
100119
};
101120

102-
let cube_store_result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(1)?;
121+
let cube_store_result = cx.argument::<JsBox<Arc<QueryResult>>>(1)?;
103122
let cube_store_result = Arc::clone(&cube_store_result);
104123

105124
let promise = cx
@@ -126,7 +145,7 @@ pub fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsPromise> {
126145
Ok(promise)
127146
}
128147

129-
pub fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
148+
pub fn final_query_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
130149
let transform_data_js_object = cx.argument::<JsValue>(0)?;
131150
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_js_object);
132151
let transform_request_data: TransformDataRequest = match Deserialize::deserialize(deserializer)
@@ -135,8 +154,12 @@ pub fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
135154
Err(err) => return cx.throw_error(err.to_string()),
136155
};
137156

138-
let cube_store_result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(1)?;
139-
let cube_store_result = Arc::clone(&cube_store_result);
157+
let data_arg = cx.argument::<JsValue>(1)?;
158+
let cube_store_result: Arc<QueryResult> = match extract_query_result(&mut cx, data_arg) {
159+
Ok(query_result) => query_result,
160+
Err(err) => return cx.throw_error(err.to_string()),
161+
};
162+
140163
let result_data_js_object = cx.argument::<JsValue>(2)?;
141164
let deserializer = JsValueDeserializer::new(&mut cx, result_data_js_object);
142165
let mut result_data: RequestResultData = match Deserialize::deserialize(deserializer) {
@@ -158,7 +181,7 @@ pub fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
158181
Ok(promise)
159182
}
160183

161-
pub fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
184+
pub fn final_query_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
162185
let transform_data_array = cx.argument::<JsValue>(0)?;
163186
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
164187
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
@@ -167,16 +190,14 @@ pub fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromi
167190
Err(err) => return cx.throw_error(err.to_string()),
168191
};
169192

170-
let cube_store_array = cx.argument::<JsArray>(1)?;
171-
let cube_store_results_boxed: Vec<Handle<JsBox<Arc<CubeStoreResult>>>> = cube_store_array
172-
.to_vec(&mut cx)?
173-
.into_iter()
174-
.map(|js_value| js_value.downcast_or_throw::<JsBox<Arc<CubeStoreResult>>, _>(&mut cx))
175-
.collect::<Result<_, _>>()?;
176-
let cube_store_results: Vec<Arc<CubeStoreResult>> = cube_store_results_boxed
177-
.iter()
178-
.map(|handle| (**handle).clone())
179-
.collect();
193+
let data_array = cx.argument::<JsArray>(1)?;
194+
let mut cube_store_results: Vec<Arc<QueryResult>> = vec![];
195+
for data_arg in data_array.to_vec(&mut cx)? {
196+
match extract_query_result(&mut cx, data_arg) {
197+
Ok(query_result) => cube_store_results.push(query_result),
198+
Err(err) => return cx.throw_error(err.to_string()),
199+
};
200+
}
180201

181202
let results_data_array = cx.argument::<JsValue>(2)?;
182203
let deserializer = JsValueDeserializer::new(&mut cx, results_data_array);
@@ -207,7 +228,7 @@ pub fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromi
207228
Ok(promise)
208229
}
209230

210-
pub fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise> {
231+
pub fn final_query_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise> {
211232
let transform_data_array = cx.argument::<JsValue>(0)?;
212233
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
213234
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
@@ -216,16 +237,14 @@ pub fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsPromi
216237
Err(err) => return cx.throw_error(err.to_string()),
217238
};
218239

219-
let cube_store_array = cx.argument::<JsArray>(1)?;
220-
let cube_store_results_boxed: Vec<Handle<JsBox<Arc<CubeStoreResult>>>> = cube_store_array
221-
.to_vec(&mut cx)?
222-
.into_iter()
223-
.map(|js_value| js_value.downcast_or_throw::<JsBox<Arc<CubeStoreResult>>, _>(&mut cx))
224-
.collect::<Result<_, _>>()?;
225-
let cube_store_results: Vec<Arc<CubeStoreResult>> = cube_store_results_boxed
226-
.iter()
227-
.map(|handle| (**handle).clone())
228-
.collect();
240+
let data_array = cx.argument::<JsArray>(1)?;
241+
let mut cube_store_results: Vec<Arc<QueryResult>> = vec![];
242+
for data_arg in data_array.to_vec(&mut cx)? {
243+
match extract_query_result(&mut cx, data_arg) {
244+
Ok(query_result) => cube_store_results.push(query_result),
245+
Err(err) => return cx.throw_error(err.to_string()),
246+
};
247+
}
229248

230249
let result_data_js_object = cx.argument::<JsValue>(2)?;
231250
let deserializer = JsValueDeserializer::new(&mut cx, result_data_js_object);

0 commit comments

Comments
 (0)