@@ -3,7 +3,18 @@ use neon::prelude::*;
33use std:: collections:: HashMap ;
44use std:: fmt:: Display ;
55
6+ use crate :: auth:: NativeAuthContext ;
7+ use crate :: channel:: { call_raw_js_with_channel_as_callback, NodeSqlGenerator , ValueFromJs } ;
8+ use crate :: node_obj_serializer:: NodeObjSerializer ;
9+ use crate :: orchestrator:: convert_final_query_result_array_from_js;
10+ use crate :: {
11+ auth:: TransportRequest , channel:: call_js_with_channel_as_callback,
12+ stream:: call_js_with_stream_as_callback,
13+ } ;
614use async_trait:: async_trait;
15+ use cubeorchestrator:: query_result_transform:: {
16+ get_final_cubestore_result_array, RequestResultArray ,
17+ } ;
718use cubesql:: compile:: engine:: df:: scan:: { MemberField , SchemaRef } ;
819use cubesql:: compile:: engine:: df:: wrapper:: SqlQuery ;
920use cubesql:: transport:: {
@@ -20,14 +31,6 @@ use serde::Serialize;
2031use std:: sync:: Arc ;
2132use uuid:: Uuid ;
2233
23- use crate :: auth:: NativeAuthContext ;
24- use crate :: channel:: { call_raw_js_with_channel_as_callback, NodeSqlGenerator } ;
25- use crate :: node_obj_serializer:: NodeObjSerializer ;
26- use crate :: {
27- auth:: TransportRequest , channel:: call_js_with_channel_as_callback,
28- stream:: call_js_with_stream_as_callback,
29- } ;
30-
3134#[ derive( Debug ) ]
3235pub struct NodeBridgeTransport {
3336 channel : Arc < Channel > ,
@@ -369,54 +372,116 @@ impl TransportService for NodeBridgeTransport {
369372 streaming : false ,
370373 } ) ?;
371374
372- let result = call_js_with_channel_as_callback (
375+ let result = call_raw_js_with_channel_as_callback (
373376 self . channel . clone ( ) ,
374377 self . on_sql_api_load . clone ( ) ,
375- Some ( extra) ,
378+ extra,
379+ Box :: new ( |cx, v| Ok ( cx. string ( v) . as_value ( cx) ) ) ,
380+ Box :: new ( move |cx, v| {
381+ // It's too heavy/slow to get instance of ResultArrayWrapper from JS
382+ // and then call/await the .getFinalResult() method which needs be
383+ // executed again on JS side to get the actual needed date,
384+ // instead we pass it directly from JS side.
385+ // In case of wrapped result it's actually a tuple of
386+ // (transformDataJson[], rawData[], resultDataJson[])
387+ if let Ok ( result_wrapped) = v. downcast :: < JsArray , _ > ( cx) {
388+ let res_wrapped_vec = result_wrapped. to_vec ( cx) . map_cube_err ( "Can't convert JS result to array" ) ?;
389+
390+ if res_wrapped_vec. len ( ) != 3 {
391+ return Err ( CubeError :: internal ( "Expected a tuple with 3 elements: transformDataJson[], rawData[], resultDataJson[]" . to_string ( ) ) ) ;
392+ }
393+
394+ let transform_data_array = res_wrapped_vec. first ( ) . unwrap ( ) ;
395+ let data_array = res_wrapped_vec. get ( 1 ) . unwrap ( )
396+ . downcast_or_throw :: < JsArray , _ > ( cx) . map_cube_err ( "Can't downcast js data to array" ) ?;
397+ let results_data_array = res_wrapped_vec. get ( 2 ) . unwrap ( ) ;
398+
399+ match convert_final_query_result_array_from_js (
400+ cx,
401+ * transform_data_array,
402+ data_array,
403+ * results_data_array,
404+ ) {
405+ Ok ( ( transform_requests, cube_store_results, mut request_results) ) => {
406+ get_final_cubestore_result_array (
407+ & transform_requests,
408+ & cube_store_results,
409+ & mut request_results,
410+ ) . map_cube_err ( "Can't build result array" ) ?;
411+
412+ Ok ( ValueFromJs :: RequestResultArray ( RequestResultArray {
413+ results : request_results,
414+ } ) )
415+ }
416+ Err ( err) => {
417+ Err ( CubeError :: internal ( format ! ( "Error converting result data: {:?}" , err. to_string( ) ) ) )
418+ }
419+ }
420+
421+ } else if let Ok ( str) = v. downcast :: < JsString , _ > ( cx) {
422+ Ok ( ValueFromJs :: String ( str. value ( cx) ) )
423+ } else {
424+ Err ( CubeError :: internal ( "Can't downcast callback argument to string or resultWrapper object" . to_string ( ) ) )
425+ }
426+ } )
376427 )
377428 . await ;
429+
378430 if let Err ( e) = & result {
379431 if e. message . to_lowercase ( ) . contains ( "continue wait" ) {
380432 continue ;
381433 }
382434 }
383435
384- let response: serde_json:: Value = result?;
436+ match result? {
437+ ValueFromJs :: String ( result) => {
438+ let response: serde_json:: Value = serde_json:: Value :: String ( result) ;
385439
386- #[ cfg( debug_assertions) ]
387- trace ! ( "[transport] Request <- {:?}" , response) ;
388- #[ cfg( not( debug_assertions) ) ]
389- trace ! ( "[transport] Request <- <hidden>" ) ;
440+ #[ cfg( debug_assertions) ]
441+ trace ! ( "[transport] Request <- {:?}" , response) ;
442+ #[ cfg( not( debug_assertions) ) ]
443+ trace ! ( "[transport] Request <- <hidden>" ) ;
390444
391- if let Some ( error_value) = response. get ( "error" ) {
392- match error_value {
393- serde_json:: Value :: String ( error) => {
394- if error. to_lowercase ( ) == * "continue wait" {
395- debug ! (
445+ if let Some ( error_value) = response. get ( "error" ) {
446+ match error_value {
447+ serde_json:: Value :: String ( error) => {
448+ if error. to_lowercase ( ) == * "continue wait" {
449+ debug ! (
396450 "[transport] load - retrying request (continue wait) requestId: {}" ,
397451 request_id
398452 ) ;
399453
400- continue ;
401- } else {
402- return Err ( CubeError :: user ( error. clone ( ) ) ) ;
403- }
404- }
405- other => {
406- error ! (
454+ continue ;
455+ } else {
456+ return Err ( CubeError :: user ( error. clone ( ) ) ) ;
457+ }
458+ }
459+ other => {
460+ error ! (
407461 "[transport] load - strange response, success which contains error: {:?}" ,
408462 other
409463 ) ;
410464
411- return Err ( CubeError :: internal (
412- "Error response with broken data inside" . to_string ( ) ,
413- ) ) ;
414- }
415- }
416- } ;
465+ return Err ( CubeError :: internal (
466+ "Error response with broken data inside" . to_string ( ) ,
467+ ) ) ;
468+ }
469+ }
470+ } ;
417471
418- break serde_json:: from_value :: < TransportLoadResponse > ( response)
419- . map_err ( |err| CubeError :: user ( err. to_string ( ) ) ) ;
472+ break serde_json:: from_value :: < TransportLoadResponse > ( response)
473+ . map_err ( |err| CubeError :: user ( err. to_string ( ) ) ) ;
474+ }
475+ ValueFromJs :: RequestResultArray ( result) => {
476+ let response = TransportLoadResponse {
477+ pivot_query : None ,
478+ slow_query : None ,
479+ query_type : None ,
480+ results : result. results . into_iter ( ) . map ( |v| v. into ( ) ) . collect ( ) ,
481+ } ;
482+ break Ok ( response) ;
483+ }
484+ }
420485 }
421486 }
422487
0 commit comments