Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions packages/cubejs-backend-native/src/node_export.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use cubesql::compile::{convert_sql_to_cube_query, get_df_batches};
use cubesql::config::processing_loop::ShutdownMode;
use cubesql::transport::TransportService;
use cubesql::transport::{SpanId, TransportService};
use futures::StreamExt;

use serde_json::Map;
use tokio::sync::Semaphore;
use uuid::Uuid;

use crate::auth::{NativeAuthContext, NodeBridgeAuthService};
use crate::channel::call_js_fn;
Expand All @@ -27,7 +28,6 @@ use cubesqlplanner::cube_bridge::base_query_options::NativeBaseQueryOptions;
use cubesqlplanner::planner::base_query::BaseQuery;
use std::rc::Rc;
use std::sync::Arc;
use std::time::SystemTime;

use cubesql::{telemetry::ReportingLogger, CubeError};

Expand Down Expand Up @@ -183,7 +183,10 @@ async fn handle_sql_query(
stream_methods: WritableStreamMethods,
sql_query: &str,
) -> Result<(), CubeError> {
let start_time = SystemTime::now();
let span_id = Some(Arc::new(SpanId::new(
Uuid::new_v4().to_string(),
serde_json::json!({ "sql": sql_query }),
)));

let transport_service = services
.injector()
Expand All @@ -197,20 +200,19 @@ async fn handle_sql_query(
.server
.transport
.log_load_state(
None,
span_id.clone(),
auth_context,
session.state.get_load_request_meta("sql"),
"Load Request".to_string(),
serde_json::json!({
"query": {
"sql": sql_query,
}
"query": span_id.as_ref().unwrap().query_key,
}),
)
.await?;
}

let session_clone = Arc::clone(&session);
let span_id_clone = span_id.clone();

let execute = || async move {
// todo: can we use compiler_cache?
Expand All @@ -220,7 +222,8 @@ async fn handle_sql_query(
.map_err(|err| {
CubeError::internal(format!("Failed to get meta context: {}", err))
})?;
let query_plan = convert_sql_to_cube_query(sql_query, meta_context, session).await?;
let query_plan =
convert_sql_to_cube_query(sql_query, meta_context, session, span_id_clone).await?;

let mut stream = get_df_batches(&query_plan).await?;

Expand Down Expand Up @@ -300,7 +303,6 @@ async fn handle_sql_query(
};

let result = execute().await;
let duration = start_time.elapsed().unwrap().as_millis() as u64;

match &result {
Ok(_) => {
Expand All @@ -309,7 +311,7 @@ async fn handle_sql_query(
.server
.transport
.log_load_state(
None,
span_id.clone(),
session_clone.state.auth_context().unwrap(),
session_clone.state.get_load_request_meta("sql"),
"Load Request Success".to_string(),
Expand All @@ -318,7 +320,7 @@ async fn handle_sql_query(
"sql": sql_query,
},
"apiType": "sql",
"duration": duration,
"duration": span_id.as_ref().unwrap().duration(),
"isDataQuery": true
}),
)
Expand All @@ -330,7 +332,7 @@ async fn handle_sql_query(
.server
.transport
.log_load_state(
None,
span_id.clone(),
session_clone.state.auth_context().unwrap(),
session_clone.state.get_load_request_meta("sql"),
"Cube SQL Error".to_string(),
Expand All @@ -339,7 +341,7 @@ async fn handle_sql_query(
"sql": sql_query
},
"apiType": "sql",
"duration": duration,
"duration": span_id.as_ref().unwrap().duration(),
"error": err.message,
}),
)
Expand Down
3 changes: 2 additions & 1 deletion packages/cubejs-backend-native/src/sql4sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ async fn handle_sql4sql_query(
.await
.map_err(|err| CubeError::internal(format!("Failed to get meta context: {err}")))?;
let query_plan =
convert_sql_to_cube_query(sql_query, meta_context.clone(), session.clone()).await?;
convert_sql_to_cube_query(sql_query, meta_context.clone(), session.clone(), None)
.await?;
let logical_plan = query_plan.try_as_logical_plan()?;
get_sql(&session, meta_context, Arc::new(logical_plan.clone())).await
})
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-backend-native/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ impl TransportService for NodeBridgeTransport {
) -> Result<Vec<RecordBatch>, CubeError> {
trace!("[transport] Request ->");

println!("!!!! LOAD {:?}", span_id);

let native_auth = ctx
.as_any()
.downcast_ref::<NativeAuthContext>()
Expand Down
3 changes: 2 additions & 1 deletion rust/cubesql/cubesql/src/compile/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ pub async fn convert_sql_to_cube_query(
query: &str,
meta: Arc<MetaContext>,
session: Arc<Session>,
span_id: Option<Arc<SpanId>>,
) -> CompilationResult<QueryPlan> {
let stmt = parse_sql_to_statement(&query, session.state.protocol.clone(), &mut None)?;
convert_statement_to_cube_query(stmt, meta, session, &mut None, None).await
convert_statement_to_cube_query(stmt, meta, session, &mut None, span_id).await
}
2 changes: 1 addition & 1 deletion rust/cubesql/cubesql/src/compile/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ impl SqlService for SqlAuthDefaultImpl {
meta: Arc<MetaContext>,
session: Arc<Session>,
) -> CompilationResult<QueryPlan> {
convert_sql_to_cube_query(&query, meta, session).await
convert_sql_to_cube_query(&query, meta, session, None).await
}
}
9 changes: 8 additions & 1 deletion rust/cubesql/cubesql/src/compile/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,13 @@ impl TestContext {

pub async fn convert_sql_to_cube_query(&self, query: &str) -> CompilationResult<QueryPlan> {
// TODO push to_string() deeper
convert_sql_to_cube_query(&query.to_string(), self.meta.clone(), self.session.clone()).await
convert_sql_to_cube_query(
&query.to_string(),
self.meta.clone(),
self.session.clone(),
None,
)
.await
}

pub async fn execute_query_with_flags(
Expand Down Expand Up @@ -1131,6 +1137,7 @@ pub async fn convert_select_to_query_plan_with_meta(
&query,
meta_context.clone(),
get_test_session(DatabaseProtocol::PostgreSQL, meta_context).await,
None,
)
.await;

Expand Down
3 changes: 3 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/test_cube_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ async fn test_join_cubes_on_wrong_field_error() {
.to_string(),
meta.clone(),
get_test_session(DatabaseProtocol::PostgreSQL, meta).await,
None,
)
.await;

Expand All @@ -527,6 +528,7 @@ async fn test_join_cubes_filter_from_wrong_side_error() {
.to_string(),
meta.clone(),
get_test_session(DatabaseProtocol::PostgreSQL, meta).await,
None
)
.await;

Expand Down Expand Up @@ -554,6 +556,7 @@ async fn test_join_cubes_with_aggr_error() {
.to_string(),
meta.clone(),
get_test_session(DatabaseProtocol::PostgreSQL, meta).await,
None
)
.await;

Expand Down
3 changes: 2 additions & 1 deletion rust/cubesql/cubesql/src/compile/test/test_user_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ async fn test_change_user_via_filter_or() {
convert_sql_to_cube_query(
&"SELECT COUNT(*) as cnt FROM KibanaSampleDataEcommerce WHERE __user = 'gopher' OR customer_gender = 'male'".to_string(),
meta.clone(),
get_test_session(DatabaseProtocol::PostgreSQL, meta).await
get_test_session(DatabaseProtocol::PostgreSQL, meta).await,
None
).await;

// TODO: We need to propagate error to result, to assert message
Expand Down
Loading