diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index 802076279c47d..a826b645fac6d 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -222,44 +222,71 @@ async fn handle_sql_query( .state .set_auth_context(Some(native_auth_ctx.clone())); - // todo: can we use compiler_cache? - let meta_context = transport_service - .meta(native_auth_ctx) - .await - .map_err(|err| CubeError::internal(format!("Failed to get meta context: {}", err)))?; - let query_plan = convert_sql_to_cube_query(sql_query, meta_context, session).await?; + let connection_id = session.state.connection_id; - let mut stream = get_df_batches(&query_plan).await?; + let execute = || async move { + // todo: can we use compiler_cache? + let meta_context = transport_service + .meta(native_auth_ctx) + .await + .map_err(|err| CubeError::internal(format!("Failed to get meta context: {}", err)))?; + let query_plan = convert_sql_to_cube_query(sql_query, meta_context, session).await?; - let semaphore = Arc::new(Semaphore::new(0)); + let mut stream = get_df_batches(&query_plan).await?; - let drain_handler = OnDrainHandler::new( - channel.clone(), - stream_methods.stream.clone(), - semaphore.clone(), - ); + let semaphore = Arc::new(Semaphore::new(0)); - drain_handler.handle(stream_methods.on.clone()).await?; + let drain_handler = OnDrainHandler::new( + channel.clone(), + stream_methods.stream.clone(), + semaphore.clone(), + ); - let mut is_first_batch = true; - while let Some(batch) = stream.next().await { - let (columns, data) = batch_to_rows(batch?)?; + drain_handler.handle(stream_methods.on.clone()).await?; + + let mut is_first_batch = true; + while let Some(batch) = stream.next().await { + let (columns, data) = batch_to_rows(batch?)?; + + if is_first_batch { + let mut schema = Map::new(); + schema.insert("schema".into(), columns); + let columns = format!( + "{}{}", + serde_json::to_string(&serde_json::Value::Object(schema))?, + CHUNK_DELIM + ); + is_first_batch = false; + + call_js_fn( + channel.clone(), + stream_methods.write.clone(), + Box::new(|cx| { + let arg = cx.string(columns).upcast::(); + + Ok(vec![arg.upcast::()]) + }), + Box::new(|cx, v| match v.downcast_or_throw::(cx) { + Ok(v) => Ok(v.value(cx)), + Err(_) => Err(CubeError::internal( + "Failed to downcast write response".to_string(), + )), + }), + stream_methods.stream.clone(), + ) + .await?; + } - if is_first_batch { - let mut schema = Map::new(); - schema.insert("schema".into(), columns); - let columns = format!( - "{}{}", - serde_json::to_string(&serde_json::Value::Object(schema))?, - CHUNK_DELIM - ); - is_first_batch = false; + let mut rows = Map::new(); + rows.insert("data".into(), serde_json::Value::Array(data)); + let data = format!("{}{}", serde_json::to_string(&rows)?, CHUNK_DELIM); + let js_stream_write_fn = stream_methods.write.clone(); - call_js_fn( + let should_pause = !call_js_fn( channel.clone(), - stream_methods.write.clone(), + js_stream_write_fn, Box::new(|cx| { - let arg = cx.string(columns).upcast::(); + let arg = cx.string(data).upcast::(); Ok(vec![arg.upcast::()]) }), @@ -272,38 +299,21 @@ async fn handle_sql_query( stream_methods.stream.clone(), ) .await?; + + if should_pause { + let permit = semaphore.acquire().await?; + permit.forget(); + } } - let mut rows = Map::new(); - rows.insert("data".into(), serde_json::Value::Array(data)); - let data = format!("{}{}", serde_json::to_string(&rows)?, CHUNK_DELIM); - let js_stream_write_fn = stream_methods.write.clone(); + Ok::<(), CubeError>(()) + }; - let should_pause = !call_js_fn( - channel.clone(), - js_stream_write_fn, - Box::new(|cx| { - let arg = cx.string(data).upcast::(); - - Ok(vec![arg.upcast::()]) - }), - Box::new(|cx, v| match v.downcast_or_throw::(cx) { - Ok(v) => Ok(v.value(cx)), - Err(_) => Err(CubeError::internal( - "Failed to downcast write response".to_string(), - )), - }), - stream_methods.stream.clone(), - ) - .await?; + let result = execute().await; - if should_pause { - let permit = semaphore.acquire().await.unwrap(); - permit.forget(); - } - } + session_manager.drop_session(connection_id).await; - Ok(()) + result } struct WritableStreamMethods { diff --git a/rust/cubesql/cubesql/src/compile/plan.rs b/rust/cubesql/cubesql/src/compile/plan.rs index bf941cb269315..6b5926167d2bf 100644 --- a/rust/cubesql/cubesql/src/compile/plan.rs +++ b/rust/cubesql/cubesql/src/compile/plan.rs @@ -138,7 +138,7 @@ pub async fn get_df_batches( Ok(stream) => { return Ok(stream); } - Err(err) => return Err(CubeError::panic(Box::new(err))), + Err(err) => return Err(err.into()), }; } Err(err) => return Err(CubeError::panic(err)),