11use cubesql:: compile:: parser:: parse_sql_to_statement;
22use cubesql:: compile:: { convert_statement_to_cube_query, get_df_batches} ;
33use cubesql:: config:: processing_loop:: ShutdownMode ;
4+ use cubesql:: sql:: dataframe:: { arrow_to_column_type, Column } ;
5+ use cubesql:: sql:: ColumnFlags ;
46use cubesql:: transport:: { SpanId , TransportService } ;
57use futures:: StreamExt ;
68
@@ -192,6 +194,32 @@ fn shutdown_interface(mut cx: FunctionContext) -> JsResult<JsPromise> {
192194
193195const CHUNK_DELIM : & str = "\n " ;
194196
197+ async fn write_jsonl_message (
198+ channel : Arc < Channel > ,
199+ write_fn : Arc < Root < JsFunction > > ,
200+ stream : Arc < Root < JsObject > > ,
201+ value : serde_json:: Value ,
202+ ) -> Result < bool , CubeError > {
203+ let message = format ! ( "{}{}" , serde_json:: to_string( & value) ?, CHUNK_DELIM ) ;
204+
205+ call_js_fn (
206+ channel,
207+ write_fn,
208+ Box :: new ( move |cx| {
209+ let arg = cx. string ( message) . upcast :: < JsValue > ( ) ;
210+ Ok ( vec ! [ arg. upcast:: <JsValue >( ) ] )
211+ } ) ,
212+ Box :: new ( |cx, v| match v. downcast_or_throw :: < JsBoolean , _ > ( cx) {
213+ Ok ( v) => Ok ( v. value ( cx) ) ,
214+ Err ( _) => Err ( CubeError :: internal (
215+ "Failed to downcast write response" . to_string ( ) ,
216+ ) ) ,
217+ } ) ,
218+ stream,
219+ )
220+ . await
221+ }
222+
195223async fn handle_sql_query (
196224 services : Arc < NodeCubeServices > ,
197225 native_auth_ctx : Arc < NativeSQLAuthContext > ,
@@ -262,59 +290,44 @@ async fn handle_sql_query(
262290
263291 drain_handler. handle ( stream_methods. on . clone ( ) ) . await ?;
264292
265- let mut is_first_batch = true ;
266- while let Some ( batch) = stream. next ( ) . await {
267- let ( columns, data) = batch_to_rows ( batch?) ?;
293+ // Get schema from stream and convert to DataFrame columns format
294+ let stream_schema = stream. schema ( ) ;
295+ let mut columns = Vec :: with_capacity ( stream_schema. fields ( ) . len ( ) ) ;
296+ for field in stream_schema. fields ( ) . iter ( ) {
297+ columns. push ( Column :: new (
298+ field. name ( ) . clone ( ) ,
299+ arrow_to_column_type ( field. data_type ( ) . clone ( ) ) ?,
300+ ColumnFlags :: empty ( ) ,
301+ ) ) ;
302+ }
268303
269- if is_first_batch {
270- let mut schema = Map :: new ( ) ;
271- schema. insert ( "schema" . into ( ) , columns) ;
272- let columns = format ! (
273- "{}{}" ,
274- serde_json:: to_string( & serde_json:: Value :: Object ( schema) ) ?,
275- CHUNK_DELIM
276- ) ;
277- is_first_batch = false ;
304+ // Send schema first
305+ let columns_json = serde_json:: to_value ( & columns) ?;
306+ let mut schema_response = Map :: new ( ) ;
307+ schema_response. insert ( "schema" . into ( ) , columns_json) ;
278308
279- call_js_fn (
280- channel. clone ( ) ,
281- stream_methods. write . clone ( ) ,
282- Box :: new ( |cx| {
283- let arg = cx. string ( columns) . upcast :: < JsValue > ( ) ;
309+ write_jsonl_message (
310+ channel. clone ( ) ,
311+ stream_methods. write . clone ( ) ,
312+ stream_methods. stream . clone ( ) ,
313+ serde_json:: Value :: Object ( schema_response) ,
314+ )
315+ . await ?;
284316
285- Ok ( vec ! [ arg. upcast:: <JsValue >( ) ] )
286- } ) ,
287- Box :: new ( |cx, v| match v. downcast_or_throw :: < JsBoolean , _ > ( cx) {
288- Ok ( v) => Ok ( v. value ( cx) ) ,
289- Err ( _) => Err ( CubeError :: internal (
290- "Failed to downcast write response" . to_string ( ) ,
291- ) ) ,
292- } ) ,
293- stream_methods. stream . clone ( ) ,
294- )
295- . await ?;
296- }
317+ // Process all batches
318+ let mut has_data = false ;
319+ while let Some ( batch) = stream. next ( ) . await {
320+ let ( _, data) = batch_to_rows ( batch?) ?;
321+ has_data = true ;
297322
298323 let mut rows = Map :: new ( ) ;
299324 rows. insert ( "data" . into ( ) , serde_json:: Value :: Array ( data) ) ;
300- let data = format ! ( "{}{}" , serde_json:: to_string( & rows) ?, CHUNK_DELIM ) ;
301- let js_stream_write_fn = stream_methods. write . clone ( ) ;
302325
303- let should_pause = !call_js_fn (
326+ let should_pause = !write_jsonl_message (
304327 channel. clone ( ) ,
305- js_stream_write_fn,
306- Box :: new ( |cx| {
307- let arg = cx. string ( data) . upcast :: < JsValue > ( ) ;
308-
309- Ok ( vec ! [ arg. upcast:: <JsValue >( ) ] )
310- } ) ,
311- Box :: new ( |cx, v| match v. downcast_or_throw :: < JsBoolean , _ > ( cx) {
312- Ok ( v) => Ok ( v. value ( cx) ) ,
313- Err ( _) => Err ( CubeError :: internal (
314- "Failed to downcast write response" . to_string ( ) ,
315- ) ) ,
316- } ) ,
328+ stream_methods. write . clone ( ) ,
317329 stream_methods. stream . clone ( ) ,
330+ serde_json:: Value :: Object ( rows) ,
318331 )
319332 . await ?;
320333
@@ -324,6 +337,20 @@ async fn handle_sql_query(
324337 }
325338 }
326339
340+ // If no data was processed, send empty data
341+ if !has_data {
342+ let mut rows = Map :: new ( ) ;
343+ rows. insert ( "data" . into ( ) , serde_json:: Value :: Array ( vec ! [ ] ) ) ;
344+
345+ write_jsonl_message (
346+ channel. clone ( ) ,
347+ stream_methods. write . clone ( ) ,
348+ stream_methods. stream . clone ( ) ,
349+ serde_json:: Value :: Object ( rows) ,
350+ )
351+ . await ?;
352+ }
353+
327354 Ok :: < ( ) , CubeError > ( ( ) )
328355 } ;
329356
@@ -465,13 +492,13 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult<JsValue> {
465492 Err ( err) => {
466493 let mut error_response = Map :: new ( ) ;
467494 error_response. insert ( "error" . into ( ) , err. to_string ( ) . into ( ) ) ;
468- let error_response = format ! (
495+ let error_message = format ! (
469496 "{}{}" ,
470497 serde_json:: to_string( & serde_json:: Value :: Object ( error_response) )
471498 . expect( "Failed to serialize error response to JSON" ) ,
472499 CHUNK_DELIM
473500 ) ;
474- let arg = cx. string ( error_response ) . upcast :: < JsValue > ( ) ;
501+ let arg = cx. string ( error_message ) . upcast :: < JsValue > ( ) ;
475502
476503 vec ! [ arg]
477504 }
0 commit comments