@@ -7,7 +7,7 @@ use async_trait::async_trait;
77use cubeclient:: models:: { V1Error , V1LoadRequestQuery , V1LoadResponse , V1MetaResponse } ;
88use cubesql:: compile:: engine:: df:: scan:: { MemberField , SchemaRef } ;
99use cubesql:: compile:: engine:: df:: wrapper:: SqlQuery ;
10- use cubesql:: transport:: { SqlGenerator , SqlResponse } ;
10+ use cubesql:: transport:: { SpanId , SqlGenerator , SqlResponse } ;
1111use cubesql:: {
1212 di_service,
1313 sql:: AuthContextRef ,
@@ -33,6 +33,7 @@ pub struct NodeBridgeTransport {
3333 on_sql_api_load : Arc < Root < JsFunction > > ,
3434 on_sql : Arc < Root < JsFunction > > ,
3535 on_meta : Arc < Root < JsFunction > > ,
36+ log_load_event : Arc < Root < JsFunction > > ,
3637 sql_generators : Arc < Root < JsFunction > > ,
3738 can_switch_user_for_session : Arc < Root < JsFunction > > ,
3839}
@@ -43,6 +44,7 @@ impl NodeBridgeTransport {
4344 on_sql_api_load : Root < JsFunction > ,
4445 on_sql : Root < JsFunction > ,
4546 on_meta : Root < JsFunction > ,
47+ log_load_event : Root < JsFunction > ,
4648 sql_generators : Root < JsFunction > ,
4749 can_switch_user_for_session : Root < JsFunction > ,
4850 ) -> Self {
@@ -51,6 +53,7 @@ impl NodeBridgeTransport {
5153 on_sql_api_load : Arc :: new ( on_sql_api_load) ,
5254 on_sql : Arc :: new ( on_sql) ,
5355 on_meta : Arc :: new ( on_meta) ,
56+ log_load_event : Arc :: new ( log_load_event) ,
5457 sql_generators : Arc :: new ( sql_generators) ,
5558 can_switch_user_for_session : Arc :: new ( can_switch_user_for_session) ,
5659 }
@@ -83,6 +86,16 @@ struct LoadRequest {
8386 #[ serde( rename = "expressionParams" , skip_serializing_if = "Option::is_none" ) ]
8487 expression_params : Option < Vec < Option < String > > > ,
8588 streaming : bool ,
89+ #[ serde( rename = "queryKey" , skip_serializing_if = "Option::is_none" ) ]
90+ query_key : Option < serde_json:: Value > ,
91+ }
92+
93+ #[ derive( Debug , Serialize ) ]
94+ struct LogEvent {
95+ request : TransportRequest ,
96+ session : SessionContext ,
97+ event : String ,
98+ properties : serde_json:: Value ,
8699}
87100
88101#[ derive( Debug , Serialize ) ]
@@ -187,6 +200,7 @@ impl TransportService for NodeBridgeTransport {
187200
188201 async fn sql (
189202 & self ,
203+ span_id : Option < Arc < SpanId > > ,
190204 query : V1LoadRequestQuery ,
191205 ctx : AuthContextRef ,
192206 meta : LoadRequestMeta ,
@@ -198,14 +212,18 @@ impl TransportService for NodeBridgeTransport {
198212 . downcast_ref :: < NativeAuthContext > ( )
199213 . expect ( "Unable to cast AuthContext to NativeAuthContext" ) ;
200214
201- let request_id = Uuid :: new_v4 ( ) . to_string ( ) ;
215+ let request_id = span_id
216+ . as_ref ( )
217+ . map ( |s| s. span_id . clone ( ) )
218+ . unwrap_or_else ( || Uuid :: new_v4 ( ) . to_string ( ) ) ;
202219
203220 let extra = serde_json:: to_string ( & LoadRequest {
204221 request : TransportRequest {
205222 id : format ! ( "{}-span-{}" , request_id, 1 ) ,
206223 meta : Some ( meta. clone ( ) ) ,
207224 } ,
208225 query : query. clone ( ) ,
226+ query_key : span_id. map ( |s| s. query_key . clone ( ) ) ,
209227 session : SessionContext {
210228 user : native_auth. user . clone ( ) ,
211229 superuser : native_auth. superuser ,
@@ -259,6 +277,7 @@ impl TransportService for NodeBridgeTransport {
259277
260278 async fn load (
261279 & self ,
280+ span_id : Option < Arc < SpanId > > ,
262281 query : V1LoadRequestQuery ,
263282 sql_query : Option < SqlQuery > ,
264283 ctx : AuthContextRef ,
@@ -271,16 +290,19 @@ impl TransportService for NodeBridgeTransport {
271290 . downcast_ref :: < NativeAuthContext > ( )
272291 . expect ( "Unable to cast AuthContext to NativeAuthContext" ) ;
273292
274- let request_id = Uuid :: new_v4 ( ) . to_string ( ) ;
275- let mut span_counter: u32 = 1 ;
293+ let request_id = span_id
294+ . as_ref ( )
295+ . map ( |s| s. span_id . clone ( ) )
296+ . unwrap_or_else ( || Uuid :: new_v4 ( ) . to_string ( ) ) ;
276297
277298 loop {
278299 let extra = serde_json:: to_string ( & LoadRequest {
279300 request : TransportRequest {
280- id : format ! ( "{}-span-{}" , request_id, span_counter ) ,
301+ id : format ! ( "{}-span-{}" , request_id, 1 ) ,
281302 meta : Some ( meta. clone ( ) ) ,
282303 } ,
283304 query : query. clone ( ) ,
305+ query_key : span_id. as_ref ( ) . map ( |s| s. query_key . clone ( ) ) ,
284306 session : SessionContext {
285307 user : native_auth. user . clone ( ) ,
286308 superuser : native_auth. superuser ,
@@ -313,12 +335,10 @@ impl TransportService for NodeBridgeTransport {
313335 if let Ok ( res) = serde_json:: from_value :: < V1Error > ( response) {
314336 if res. error . to_lowercase ( ) == * "continue wait" {
315337 debug ! (
316- "[transport] load - retrying request (continue wait) requestId: {}, span: {} " ,
317- request_id, span_counter
338+ "[transport] load - retrying request (continue wait) requestId: {}" ,
339+ request_id
318340 ) ;
319341
320- span_counter += 1 ;
321-
322342 continue ;
323343 } else {
324344 error ! (
@@ -336,6 +356,7 @@ impl TransportService for NodeBridgeTransport {
336356
337357 async fn load_stream (
338358 & self ,
359+ span_id : Option < Arc < SpanId > > ,
339360 query : V1LoadRequestQuery ,
340361 sql_query : Option < SqlQuery > ,
341362 ctx : AuthContextRef ,
@@ -345,8 +366,10 @@ impl TransportService for NodeBridgeTransport {
345366 ) -> Result < CubeStreamReceiver , CubeError > {
346367 trace ! ( "[transport] Request ->" ) ;
347368
348- let request_id = Uuid :: new_v4 ( ) . to_string ( ) ;
349- let mut span_counter: u32 = 1 ;
369+ let request_id = span_id
370+ . as_ref ( )
371+ . map ( |s| s. span_id . clone ( ) )
372+ . unwrap_or_else ( || Uuid :: new_v4 ( ) . to_string ( ) ) ;
350373 loop {
351374 let native_auth = ctx
352375 . as_any ( )
@@ -355,10 +378,11 @@ impl TransportService for NodeBridgeTransport {
355378
356379 let extra = serde_json:: to_string ( & LoadRequest {
357380 request : TransportRequest {
358- id : format ! ( "{}-span-{}" , request_id, span_counter ) ,
381+ id : format ! ( "{}-span-{}" , request_id, 1 ) ,
359382 meta : Some ( meta. clone ( ) ) ,
360383 } ,
361384 query : query. clone ( ) ,
385+ query_key : span_id. as_ref ( ) . map ( |s| s. query_key . clone ( ) ) ,
362386 sql_query : sql_query. clone ( ) . map ( |q| ( q. sql , q. values ) ) ,
363387 session : SessionContext {
364388 user : native_auth. user . clone ( ) ,
@@ -381,7 +405,6 @@ impl TransportService for NodeBridgeTransport {
381405
382406 if let Err ( e) = & res {
383407 if e. message . to_lowercase ( ) . contains ( "continue wait" ) {
384- span_counter += 1 ;
385408 continue ;
386409 }
387410 }
@@ -425,6 +448,47 @@ impl TransportService for NodeBridgeTransport {
425448 . await ?;
426449 Ok ( res)
427450 }
451+
452+ async fn log_load_state (
453+ & self ,
454+ span_id : Option < Arc < SpanId > > ,
455+ ctx : AuthContextRef ,
456+ meta_fields : LoadRequestMeta ,
457+ event : String ,
458+ properties : serde_json:: Value ,
459+ ) -> Result < ( ) , CubeError > {
460+ let native_auth = ctx
461+ . as_any ( )
462+ . downcast_ref :: < NativeAuthContext > ( )
463+ . expect ( "Unable to cast AuthContext to NativeAuthContext" ) ;
464+
465+ let request_id = span_id
466+ . map ( |s| s. span_id . clone ( ) )
467+ . unwrap_or_else ( || Uuid :: new_v4 ( ) . to_string ( ) ) ;
468+ call_raw_js_with_channel_as_callback (
469+ self . channel . clone ( ) ,
470+ self . log_load_event . clone ( ) ,
471+ LogEvent {
472+ request : TransportRequest {
473+ id : format ! ( "{}-span-1" , request_id) ,
474+ meta : Some ( meta_fields. clone ( ) ) ,
475+ } ,
476+ session : SessionContext {
477+ user : native_auth. user . clone ( ) ,
478+ superuser : native_auth. superuser ,
479+ security_context : native_auth. security_context . clone ( ) ,
480+ } ,
481+ event,
482+ properties,
483+ } ,
484+ Box :: new ( |cx, v| match NodeObjSerializer :: serialize ( & v, cx) {
485+ Ok ( res) => Ok ( res) ,
486+ Err ( e) => cx. throw_error ( format ! ( "Can't serialize to node obj: {}" , e) ) ,
487+ } ) ,
488+ Box :: new ( move |_, _| Ok ( ( ) ) ) ,
489+ )
490+ . await
491+ }
428492}
429493
430494// method to get keys to values using function from js object
0 commit comments