Skip to content

Commit 404c3fb

Browse files
committed
code rearrangement
1 parent 0b59aaf commit 404c3fb

File tree

3 files changed

+251
-240
lines changed

3 files changed

+251
-240
lines changed

packages/cubejs-backend-native/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod logger;
1313
pub mod node_export;
1414
pub mod node_obj_deserializer;
1515
pub mod node_obj_serializer;
16+
pub mod orchestrator;
1617
#[cfg(feature = "python")]
1718
pub mod python;
1819
pub mod stream;

packages/cubejs-backend-native/src/node_export.rs

Lines changed: 1 addition & 240 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,9 @@ use std::rc::Rc;
3131
use std::str::FromStr;
3232
use std::sync::Arc;
3333

34-
use cubeorchestrator::cubestore_message_parser::CubeStoreResult;
35-
3634
use cubesql::{telemetry::ReportingLogger, CubeError};
3735

38-
use crate::node_obj_deserializer::JsValueDeserializer;
39-
use cubeorchestrator::cubestore_result_transform::{
40-
get_final_cubestore_result_array, RequestResultArray, RequestResultData,
41-
RequestResultDataMulti, TransformedData,
42-
};
43-
use cubeorchestrator::transport::TransformDataRequest;
4436
use neon::prelude::*;
45-
use neon::types::buffer::TypedArray;
46-
use serde::Deserialize;
4737

4838
struct SQLInterface {
4939
services: Arc<NodeCubeServices>,
@@ -512,227 +502,6 @@ fn debug_js_to_clrepr_to_js(mut cx: FunctionContext) -> JsResult<JsValue> {
512502
arg_clrep.into_js(&mut cx)
513503
}
514504

515-
//============ sql orchestrator ===================
516-
517-
fn json_to_array_buffer<'a, C>(
518-
mut cx: C,
519-
json_data: Result<String, anyhow::Error>,
520-
) -> JsResult<'a, JsArrayBuffer>
521-
where
522-
C: Context<'a>,
523-
{
524-
match json_data {
525-
Ok(json_data) => {
526-
let json_bytes = json_data.as_bytes();
527-
let mut js_buffer = cx.array_buffer(json_bytes.len())?;
528-
{
529-
let buffer = js_buffer.as_mut_slice(&mut cx);
530-
buffer.copy_from_slice(json_bytes);
531-
}
532-
Ok(js_buffer)
533-
}
534-
Err(err) => cx.throw_error(err.to_string()),
535-
}
536-
}
537-
538-
fn parse_cubestore_ws_result_message(mut cx: FunctionContext) -> JsResult<JsPromise> {
539-
let msg = cx.argument::<JsBuffer>(0)?;
540-
let msg_data = msg.as_slice(&cx).to_vec();
541-
542-
let promise = cx
543-
.task(move || CubeStoreResult::from_fb(&msg_data))
544-
.promise(move |mut cx, res| match res {
545-
Ok(result) => Ok(cx.boxed(Arc::new(result))),
546-
Err(err) => cx.throw_error(err.to_string()),
547-
});
548-
549-
Ok(promise)
550-
}
551-
552-
fn get_cubestore_result(mut cx: FunctionContext) -> JsResult<JsValue> {
553-
let result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(0)?;
554-
555-
let js_array = cx.execute_scoped(|mut cx| {
556-
let js_array = JsArray::new(&mut cx, result.rows.len());
557-
558-
for (i, row) in result.rows.iter().enumerate() {
559-
let js_row = cx.execute_scoped(|mut cx| {
560-
let js_row = JsObject::new(&mut cx);
561-
for (key, value) in result.columns.iter().zip(row.iter()) {
562-
let js_key = cx.string(key);
563-
let js_value = cx.string(value);
564-
js_row.set(&mut cx, js_key, js_value)?;
565-
}
566-
Ok(js_row)
567-
})?;
568-
569-
js_array.set(&mut cx, i as u32, js_row)?;
570-
}
571-
572-
Ok(js_array)
573-
})?;
574-
575-
Ok(js_array.upcast())
576-
}
577-
578-
fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsPromise> {
579-
let transform_data_js_object = cx.argument::<JsValue>(0)?;
580-
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_js_object);
581-
582-
let request_data: TransformDataRequest = match Deserialize::deserialize(deserializer) {
583-
Ok(data) => data,
584-
Err(err) => return cx.throw_error(err.to_string()),
585-
};
586-
587-
let cube_store_result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(1)?;
588-
let cube_store_result = Arc::clone(&cube_store_result);
589-
590-
let promise = cx
591-
.task(move || {
592-
let transformed = TransformedData::transform(&request_data, &cube_store_result)?;
593-
594-
match serde_json::to_string(&transformed) {
595-
Ok(json) => Ok(json),
596-
Err(err) => Err(anyhow::Error::from(err)),
597-
}
598-
})
599-
.promise(move |mut cx, json_data| match json_data {
600-
Ok(json_data) => {
601-
let js_string = cx.string(json_data);
602-
603-
let js_result = cx.empty_object();
604-
js_result.set(&mut cx, "result", js_string)?;
605-
606-
Ok(js_result)
607-
}
608-
Err(err) => cx.throw_error(err.to_string()),
609-
});
610-
611-
Ok(promise)
612-
}
613-
614-
fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
615-
let transform_data_js_object = cx.argument::<JsValue>(0)?;
616-
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_js_object);
617-
let transform_request_data: TransformDataRequest = match Deserialize::deserialize(deserializer)
618-
{
619-
Ok(data) => data,
620-
Err(err) => return cx.throw_error(err.to_string()),
621-
};
622-
623-
let cube_store_result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(1)?;
624-
let cube_store_result = Arc::clone(&cube_store_result);
625-
let result_data_js_object = cx.argument::<JsValue>(2)?;
626-
let deserializer = JsValueDeserializer::new(&mut cx, result_data_js_object);
627-
let mut result_data: RequestResultData = match Deserialize::deserialize(deserializer) {
628-
Ok(data) => data,
629-
Err(err) => return cx.throw_error(err.to_string()),
630-
};
631-
632-
let promise = cx
633-
.task(move || {
634-
result_data.prepare_results(&transform_request_data, &cube_store_result)?;
635-
636-
match serde_json::to_string(&result_data) {
637-
Ok(json) => Ok(json),
638-
Err(err) => Err(anyhow::Error::from(err)),
639-
}
640-
})
641-
.promise(move |cx, json_data| json_to_array_buffer(cx, json_data));
642-
643-
Ok(promise)
644-
}
645-
646-
fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
647-
let transform_data_array = cx.argument::<JsValue>(0)?;
648-
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
649-
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
650-
{
651-
Ok(data) => data,
652-
Err(err) => return cx.throw_error(err.to_string()),
653-
};
654-
655-
let cube_store_array = cx.argument::<JsArray>(1)?;
656-
let cube_store_results_boxed: Vec<Handle<JsBox<Arc<CubeStoreResult>>>> = cube_store_array
657-
.to_vec(&mut cx)?
658-
.into_iter()
659-
.map(|js_value| js_value.downcast_or_throw::<JsBox<Arc<CubeStoreResult>>, _>(&mut cx))
660-
.collect::<Result<_, _>>()?;
661-
let cube_store_results: Vec<Arc<CubeStoreResult>> = cube_store_results_boxed
662-
.iter()
663-
.map(|handle| (**handle).clone())
664-
.collect();
665-
666-
let results_data_array = cx.argument::<JsValue>(2)?;
667-
let deserializer = JsValueDeserializer::new(&mut cx, results_data_array);
668-
let mut request_results: Vec<RequestResultData> = match Deserialize::deserialize(deserializer) {
669-
Ok(data) => data,
670-
Err(err) => return cx.throw_error(err.to_string()),
671-
};
672-
673-
let promise = cx
674-
.task(move || {
675-
get_final_cubestore_result_array(
676-
&transform_requests,
677-
&cube_store_results,
678-
&mut request_results,
679-
)?;
680-
681-
let final_obj = RequestResultArray {
682-
results: request_results,
683-
};
684-
685-
match serde_json::to_string(&final_obj) {
686-
Ok(json) => Ok(json),
687-
Err(err) => Err(anyhow::Error::from(err)),
688-
}
689-
})
690-
.promise(move |cx, json_data| json_to_array_buffer(cx, json_data));
691-
692-
Ok(promise)
693-
}
694-
695-
fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise> {
696-
let transform_data_array = cx.argument::<JsValue>(0)?;
697-
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
698-
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
699-
{
700-
Ok(data) => data,
701-
Err(err) => return cx.throw_error(err.to_string()),
702-
};
703-
704-
let cube_store_array = cx.argument::<JsArray>(1)?;
705-
let cube_store_results_boxed: Vec<Handle<JsBox<Arc<CubeStoreResult>>>> = cube_store_array
706-
.to_vec(&mut cx)?
707-
.into_iter()
708-
.map(|js_value| js_value.downcast_or_throw::<JsBox<Arc<CubeStoreResult>>, _>(&mut cx))
709-
.collect::<Result<_, _>>()?;
710-
let cube_store_results: Vec<Arc<CubeStoreResult>> = cube_store_results_boxed
711-
.iter()
712-
.map(|handle| (**handle).clone())
713-
.collect();
714-
715-
let result_data_js_object = cx.argument::<JsValue>(2)?;
716-
let deserializer = JsValueDeserializer::new(&mut cx, result_data_js_object);
717-
let mut result_data: RequestResultDataMulti = match Deserialize::deserialize(deserializer) {
718-
Ok(data) => data,
719-
Err(err) => return cx.throw_error(err.to_string()),
720-
};
721-
722-
let promise = cx
723-
.task(move || {
724-
result_data.prepare_results(&transform_requests, &cube_store_results)?;
725-
726-
match serde_json::to_string(&result_data) {
727-
Ok(json) => Ok(json),
728-
Err(err) => Err(anyhow::Error::from(err)),
729-
}
730-
})
731-
.promise(move |cx, json_data| json_to_array_buffer(cx, json_data));
732-
733-
Ok(promise)
734-
}
735-
736505
pub fn register_module_exports<C: NodeConfiguration + 'static>(
737506
mut cx: ModuleContext,
738507
) -> NeonResult<()> {
@@ -747,15 +516,7 @@ pub fn register_module_exports<C: NodeConfiguration + 'static>(
747516
cx.export_function("buildSqlAndParams", build_sql_and_params)?;
748517

749518
//========= sql orchestrator exports =================
750-
cx.export_function(
751-
"parseCubestoreResultMessage",
752-
parse_cubestore_ws_result_message,
753-
)?;
754-
cx.export_function("getCubestoreResult", get_cubestore_result)?;
755-
cx.export_function("transformQueryData", transform_query_data)?;
756-
cx.export_function("getFinalCubestoreResult", final_cubestore_result)?;
757-
cx.export_function("getFinalCubestoreResultMulti", final_cubestore_result_multi)?;
758-
cx.export_function("getFinalCubestoreResultArray", final_cubestore_result_array)?;
519+
crate::orchestrator::register_module(&mut cx)?;
759520

760521
crate::template::template_register_module(&mut cx)?;
761522

0 commit comments

Comments
 (0)