@@ -221,45 +221,72 @@ async fn handle_sql_query(
221221 session
222222 . state
223223 . set_auth_context ( Some ( native_auth_ctx. clone ( ) ) ) ;
224+
225+ let connection_id = session. state . connection_id ;
224226
225- // todo: can we use compiler_cache?
226- let meta_context = transport_service
227- . meta ( native_auth_ctx)
228- . await
229- . map_err ( |err| CubeError :: internal ( format ! ( "Failed to get meta context: {}" , err) ) ) ?;
230- let query_plan = convert_sql_to_cube_query ( sql_query, meta_context, session) . await ?;
227+ let execute = || async move {
228+ // todo: can we use compiler_cache?
229+ let meta_context = transport_service
230+ . meta ( native_auth_ctx)
231+ . await
232+ . map_err ( |err| CubeError :: internal ( format ! ( "Failed to get meta context: {}" , err) ) ) ?;
233+ let query_plan = convert_sql_to_cube_query ( sql_query, meta_context, session) . await ?;
231234
232- let mut stream = get_df_batches ( & query_plan) . await ?;
235+ let mut stream = get_df_batches ( & query_plan) . await ?;
233236
234- let semaphore = Arc :: new ( Semaphore :: new ( 0 ) ) ;
237+ let semaphore = Arc :: new ( Semaphore :: new ( 0 ) ) ;
235238
236- let drain_handler = OnDrainHandler :: new (
237- channel. clone ( ) ,
238- stream_methods. stream . clone ( ) ,
239- semaphore. clone ( ) ,
240- ) ;
241-
242- drain_handler. handle ( stream_methods. on . clone ( ) ) . await ?;
239+ let drain_handler = OnDrainHandler :: new (
240+ channel. clone ( ) ,
241+ stream_methods. stream . clone ( ) ,
242+ semaphore. clone ( ) ,
243+ ) ;
243244
244- let mut is_first_batch = true ;
245- while let Some ( batch) = stream. next ( ) . await {
246- let ( columns, data) = batch_to_rows ( batch?) ?;
245+ drain_handler. handle ( stream_methods. on . clone ( ) ) . await ?;
246+
247+ let mut is_first_batch = true ;
248+ while let Some ( batch) = stream. next ( ) . await {
249+ let ( columns, data) = batch_to_rows ( batch?) ?;
250+
251+ if is_first_batch {
252+ let mut schema = Map :: new ( ) ;
253+ schema. insert ( "schema" . into ( ) , columns) ;
254+ let columns = format ! (
255+ "{}{}" ,
256+ serde_json:: to_string( & serde_json:: Value :: Object ( schema) ) ?,
257+ CHUNK_DELIM
258+ ) ;
259+ is_first_batch = false ;
260+
261+ call_js_fn (
262+ channel. clone ( ) ,
263+ stream_methods. write . clone ( ) ,
264+ Box :: new ( |cx| {
265+ let arg = cx. string ( columns) . upcast :: < JsValue > ( ) ;
266+
267+ Ok ( vec ! [ arg. upcast:: <JsValue >( ) ] )
268+ } ) ,
269+ Box :: new ( |cx, v| match v. downcast_or_throw :: < JsBoolean , _ > ( cx) {
270+ Ok ( v) => Ok ( v. value ( cx) ) ,
271+ Err ( _) => Err ( CubeError :: internal (
272+ "Failed to downcast write response" . to_string ( ) ,
273+ ) ) ,
274+ } ) ,
275+ stream_methods. stream . clone ( ) ,
276+ )
277+ . await ?;
278+ }
247279
248- if is_first_batch {
249- let mut schema = Map :: new ( ) ;
250- schema. insert ( "schema" . into ( ) , columns) ;
251- let columns = format ! (
252- "{}{}" ,
253- serde_json:: to_string( & serde_json:: Value :: Object ( schema) ) ?,
254- CHUNK_DELIM
255- ) ;
256- is_first_batch = false ;
280+ let mut rows = Map :: new ( ) ;
281+ rows. insert ( "data" . into ( ) , serde_json:: Value :: Array ( data) ) ;
282+ let data = format ! ( "{}{}" , serde_json:: to_string( & rows) ?, CHUNK_DELIM ) ;
283+ let js_stream_write_fn = stream_methods. write . clone ( ) ;
257284
258- call_js_fn (
285+ let should_pause = ! call_js_fn (
259286 channel. clone ( ) ,
260- stream_methods . write . clone ( ) ,
287+ js_stream_write_fn ,
261288 Box :: new ( |cx| {
262- let arg = cx. string ( columns ) . upcast :: < JsValue > ( ) ;
289+ let arg = cx. string ( data ) . upcast :: < JsValue > ( ) ;
263290
264291 Ok ( vec ! [ arg. upcast:: <JsValue >( ) ] )
265292 } ) ,
@@ -271,39 +298,22 @@ async fn handle_sql_query(
271298 } ) ,
272299 stream_methods. stream . clone ( ) ,
273300 )
274- . await ?;
275- }
276-
277- let mut rows = Map :: new ( ) ;
278- rows. insert ( "data" . into ( ) , serde_json:: Value :: Array ( data) ) ;
279- let data = format ! ( "{}{}" , serde_json:: to_string( & rows) ?, CHUNK_DELIM ) ;
280- let js_stream_write_fn = stream_methods. write . clone ( ) ;
281-
282- let should_pause = !call_js_fn (
283- channel. clone ( ) ,
284- js_stream_write_fn,
285- Box :: new ( |cx| {
286- let arg = cx. string ( data) . upcast :: < JsValue > ( ) ;
287-
288- Ok ( vec ! [ arg. upcast:: <JsValue >( ) ] )
289- } ) ,
290- Box :: new ( |cx, v| match v. downcast_or_throw :: < JsBoolean , _ > ( cx) {
291- Ok ( v) => Ok ( v. value ( cx) ) ,
292- Err ( _) => Err ( CubeError :: internal (
293- "Failed to downcast write response" . to_string ( ) ,
294- ) ) ,
295- } ) ,
296- stream_methods. stream . clone ( ) ,
297- )
298- . await ?;
301+ . await ?;
299302
300- if should_pause {
301- let permit = semaphore. acquire ( ) . await . unwrap ( ) ;
302- permit. forget ( ) ;
303+ if should_pause {
304+ let permit = semaphore. acquire ( ) . await ?;
305+ permit. forget ( ) ;
306+ }
303307 }
304- }
305308
306- Ok ( ( ) )
309+ Ok :: < ( ) , CubeError > ( ( ) )
310+ } ;
311+
312+ let result = execute ( ) . await ;
313+
314+ session_manager. drop_session ( connection_id) . await ;
315+
316+ result
307317}
308318
309319struct WritableStreamMethods {
@@ -375,7 +385,7 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult<JsValue> {
375385 stream_methods,
376386 & sql_query,
377387 )
378- . await ;
388+ . await ;
379389
380390 let _ = channel. try_send ( move |mut cx| {
381391 let method = match Arc :: try_unwrap ( js_stream_end_fn) {
@@ -454,7 +464,7 @@ pub fn setup_logger(mut cx: FunctionContext) -> JsResult<JsUndefined> {
454464 Box :: new ( NodeBridgeLogger :: new ( cx. channel ( ) , cube_logger) ) ,
455465 log_level. to_level_filter ( ) ,
456466 )
457- . unwrap ( ) ;
467+ . unwrap ( ) ;
458468
459469 Ok ( cx. undefined ( ) )
460470}
0 commit comments