From f1760cc887ed9e209dac296af72dccd8e6c5404a Mon Sep 17 00:00:00 2001 From: Alex Vasilev Date: Mon, 3 Mar 2025 20:25:46 -0800 Subject: [PATCH 1/3] fix(cubejs-native): sql over http drop sessions, correct error --- .../cubejs-backend-native/src/node_export.rs | 134 ++++++++++-------- rust/cubesql/cubesql/src/compile/plan.rs | 6 +- 2 files changed, 76 insertions(+), 64 deletions(-) diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index 802076279c47d..1652ae65fd8a5 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -221,45 +221,72 @@ async fn handle_sql_query( session .state .set_auth_context(Some(native_auth_ctx.clone())); + + let connection_id = session.state.connection_id; - // todo: can we use compiler_cache? - let meta_context = transport_service - .meta(native_auth_ctx) - .await - .map_err(|err| CubeError::internal(format!("Failed to get meta context: {}", err)))?; - let query_plan = convert_sql_to_cube_query(sql_query, meta_context, session).await?; + let execute = || async move { + // todo: can we use compiler_cache? + let meta_context = transport_service + .meta(native_auth_ctx) + .await + .map_err(|err| CubeError::internal(format!("Failed to get meta context: {}", err)))?; + let query_plan = convert_sql_to_cube_query(sql_query, meta_context, session).await?; - let mut stream = get_df_batches(&query_plan).await?; + let mut stream = get_df_batches(&query_plan).await?; - let semaphore = Arc::new(Semaphore::new(0)); + let semaphore = Arc::new(Semaphore::new(0)); - let drain_handler = OnDrainHandler::new( - channel.clone(), - stream_methods.stream.clone(), - semaphore.clone(), - ); - - drain_handler.handle(stream_methods.on.clone()).await?; + let drain_handler = OnDrainHandler::new( + channel.clone(), + stream_methods.stream.clone(), + semaphore.clone(), + ); - let mut is_first_batch = true; - while let Some(batch) = stream.next().await { - let (columns, data) = batch_to_rows(batch?)?; + drain_handler.handle(stream_methods.on.clone()).await?; + + let mut is_first_batch = true; + while let Some(batch) = stream.next().await { + let (columns, data) = batch_to_rows(batch?)?; + + if is_first_batch { + let mut schema = Map::new(); + schema.insert("schema".into(), columns); + let columns = format!( + "{}{}", + serde_json::to_string(&serde_json::Value::Object(schema))?, + CHUNK_DELIM + ); + is_first_batch = false; + + call_js_fn( + channel.clone(), + stream_methods.write.clone(), + Box::new(|cx| { + let arg = cx.string(columns).upcast::(); + + Ok(vec![arg.upcast::()]) + }), + Box::new(|cx, v| match v.downcast_or_throw::(cx) { + Ok(v) => Ok(v.value(cx)), + Err(_) => Err(CubeError::internal( + "Failed to downcast write response".to_string(), + )), + }), + stream_methods.stream.clone(), + ) + .await?; + } - if is_first_batch { - let mut schema = Map::new(); - schema.insert("schema".into(), columns); - let columns = format!( - "{}{}", - serde_json::to_string(&serde_json::Value::Object(schema))?, - CHUNK_DELIM - ); - is_first_batch = false; + let mut rows = Map::new(); + rows.insert("data".into(), serde_json::Value::Array(data)); + let data = format!("{}{}", serde_json::to_string(&rows)?, CHUNK_DELIM); + let js_stream_write_fn = stream_methods.write.clone(); - call_js_fn( + let should_pause = !call_js_fn( channel.clone(), - stream_methods.write.clone(), + js_stream_write_fn, Box::new(|cx| { - let arg = cx.string(columns).upcast::(); + let arg = cx.string(data).upcast::(); Ok(vec![arg.upcast::()]) }), @@ -271,39 +298,22 @@ async fn handle_sql_query( }), stream_methods.stream.clone(), ) - .await?; - } - - let mut rows = Map::new(); - rows.insert("data".into(), serde_json::Value::Array(data)); - let data = format!("{}{}", serde_json::to_string(&rows)?, CHUNK_DELIM); - let js_stream_write_fn = stream_methods.write.clone(); - - let should_pause = !call_js_fn( - channel.clone(), - js_stream_write_fn, - Box::new(|cx| { - let arg = cx.string(data).upcast::(); - - Ok(vec![arg.upcast::()]) - }), - Box::new(|cx, v| match v.downcast_or_throw::(cx) { - Ok(v) => Ok(v.value(cx)), - Err(_) => Err(CubeError::internal( - "Failed to downcast write response".to_string(), - )), - }), - stream_methods.stream.clone(), - ) - .await?; + .await?; - if should_pause { - let permit = semaphore.acquire().await.unwrap(); - permit.forget(); + if should_pause { + let permit = semaphore.acquire().await?; + permit.forget(); + } } - } - Ok(()) + Ok::<(), CubeError>(()) + }; + + let result = execute().await; + + session_manager.drop_session(connection_id).await; + + result } struct WritableStreamMethods { @@ -375,7 +385,7 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult { stream_methods, &sql_query, ) - .await; + .await; let _ = channel.try_send(move |mut cx| { let method = match Arc::try_unwrap(js_stream_end_fn) { @@ -454,7 +464,7 @@ pub fn setup_logger(mut cx: FunctionContext) -> JsResult { Box::new(NodeBridgeLogger::new(cx.channel(), cube_logger)), log_level.to_level_filter(), ) - .unwrap(); + .unwrap(); Ok(cx.undefined()) } diff --git a/rust/cubesql/cubesql/src/compile/plan.rs b/rust/cubesql/cubesql/src/compile/plan.rs index bf941cb269315..fbcbc21a547c6 100644 --- a/rust/cubesql/cubesql/src/compile/plan.rs +++ b/rust/cubesql/cubesql/src/compile/plan.rs @@ -138,10 +138,12 @@ pub async fn get_df_batches( Ok(stream) => { return Ok(stream); } - Err(err) => return Err(CubeError::panic(Box::new(err))), + Err(err) => return Err(err.into()) }; } - Err(err) => return Err(CubeError::panic(err)), + Err(err) => { + return Err(CubeError::panic(err)) + }, } } _ => Err(CubeError::user( From 47d9d787bf2a150c2625028d3952dc67371d6a3d Mon Sep 17 00:00:00 2001 From: Alex Vasilev Date: Mon, 3 Mar 2025 20:36:06 -0800 Subject: [PATCH 2/3] fmt --- rust/cubesql/cubesql/src/compile/plan.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/plan.rs b/rust/cubesql/cubesql/src/compile/plan.rs index fbcbc21a547c6..6b5926167d2bf 100644 --- a/rust/cubesql/cubesql/src/compile/plan.rs +++ b/rust/cubesql/cubesql/src/compile/plan.rs @@ -138,12 +138,10 @@ pub async fn get_df_batches( Ok(stream) => { return Ok(stream); } - Err(err) => return Err(err.into()) + Err(err) => return Err(err.into()), }; } - Err(err) => { - return Err(CubeError::panic(err)) - }, + Err(err) => return Err(CubeError::panic(err)), } } _ => Err(CubeError::user( From a6b96cdd42173540fc6847236379ddacf4158f6f Mon Sep 17 00:00:00 2001 From: Alex Vasilev Date: Mon, 3 Mar 2025 20:54:26 -0800 Subject: [PATCH 3/3] fmt * 2 --- packages/cubejs-backend-native/src/node_export.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index 1652ae65fd8a5..a826b645fac6d 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -221,7 +221,7 @@ async fn handle_sql_query( session .state .set_auth_context(Some(native_auth_ctx.clone())); - + let connection_id = session.state.connection_id; let execute = || async move { @@ -274,7 +274,7 @@ async fn handle_sql_query( }), stream_methods.stream.clone(), ) - .await?; + .await?; } let mut rows = Map::new(); @@ -298,7 +298,7 @@ async fn handle_sql_query( }), stream_methods.stream.clone(), ) - .await?; + .await?; if should_pause { let permit = semaphore.acquire().await?; @@ -310,9 +310,9 @@ async fn handle_sql_query( }; let result = execute().await; - + session_manager.drop_session(connection_id).await; - + result } @@ -385,7 +385,7 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult { stream_methods, &sql_query, ) - .await; + .await; let _ = channel.try_send(move |mut cx| { let method = match Arc::try_unwrap(js_stream_end_fn) { @@ -464,7 +464,7 @@ pub fn setup_logger(mut cx: FunctionContext) -> JsResult { Box::new(NodeBridgeLogger::new(cx.channel(), cube_logger)), log_level.to_level_filter(), ) - .unwrap(); + .unwrap(); Ok(cx.undefined()) }