@@ -2,8 +2,11 @@ use arrow_flight::flight_service_client::FlightServiceClient;
22use async_trait:: async_trait;
33use aws_config:: BehaviorVersion ;
44use aws_sdk_ec2:: Client as Ec2Client ;
5+ use axum:: body:: Body ;
6+ use axum:: http:: Response ;
57use axum:: { Json , Router , extract:: Query , http:: StatusCode , routing:: get} ;
68use dashmap:: { DashMap , Entry } ;
9+ use datafusion:: arrow:: ipc:: writer:: { IpcWriteOptions , StreamWriter } ;
710use datafusion:: common:: DataFusionError ;
811use datafusion:: common:: instant:: Instant ;
912use datafusion:: common:: runtime:: SpawnedTask ;
@@ -15,7 +18,7 @@ use datafusion_distributed::{
1518 DistributedPhysicalOptimizerRule , DistributedSessionBuilder , DistributedSessionBuilderContext ,
1619 create_flight_client, display_plan_ascii,
1720} ;
18- use futures:: { StreamExt , TryFutureExt } ;
21+ use futures:: { StreamExt , TryFutureExt , TryStreamExt } ;
1922use log:: { error, info, warn} ;
2023use object_store:: ObjectStore ;
2124use object_store:: aws:: AmazonS3Builder ;
@@ -27,15 +30,10 @@ use std::sync::atomic::AtomicBool;
2730use std:: sync:: { Arc , RwLock } ;
2831use std:: time:: Duration ;
2932use structopt:: StructOpt ;
33+ use tonic:: Status ;
3034use tonic:: transport:: { Channel , Server } ;
3135use url:: Url ;
3236
33- #[ derive( Serialize ) ]
34- struct QueryResult {
35- plan : String ,
36- count : usize ,
37- }
38-
3937#[ derive( Debug , StructOpt , Clone ) ]
4038#[ structopt( about = "worker spawn command" ) ]
4139struct Cmd {
@@ -115,7 +113,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
115113 let ctx = ctx. clone ( ) ;
116114
117115 async move {
118- let sql = params. get ( "sql" ) . ok_or ( err ( "Missing 'sql' parameter" ) ) ?;
116+ let sql = params
117+ . get ( "sql" )
118+ . ok_or ( err ( "Missing 'sql' parameter" ) ) ?
119+ . clone ( ) ;
119120
120121 let mut df_opt = None ;
121122 for sql in sql. split ( ";" ) {
@@ -129,12 +130,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
129130 return Err ( err ( "Empty 'sql' parameter" ) ) ;
130131 } ;
131132
132- let start = Instant :: now ( ) ;
133-
134133 info ! ( "Executing query..." ) ;
135134 let abort_notifier = AbortNotifier :: new ( "Query aborted" ) ;
136135 let abort_notifier_clone = abort_notifier. clone ( ) ;
137- let task = SpawnedTask :: spawn ( async move {
136+ let still_running_log_task = SpawnedTask :: spawn ( async move {
138137 let _ = abort_notifier_clone;
139138 loop {
140139 tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
@@ -144,25 +143,42 @@ async fn main() -> Result<(), Box<dyn Error>> {
144143 let physical = df. create_physical_plan ( ) . await . map_err ( err) ?;
145144 let mut stream =
146145 execute_stream ( physical. clone ( ) , ctx. task_ctx ( ) ) . map_err ( err) ?;
147- let mut count = 0 ;
148- while let Some ( batch) = stream. next ( ) . await {
149- count += batch. map_err ( err) ?. num_rows ( ) ;
150- info ! ( "Gathered {count} rows, query still in progress.." )
151- }
152- let plan = display_plan_ascii ( physical. as_ref ( ) , true ) ;
153- drop ( task) ;
154146
155- let elapsed = start. elapsed ( ) ;
156- let ms = elapsed. as_secs_f64 ( ) * 1000.0 ;
157- info ! ( "Finished executing query:\n {sql}\n \n {plan}" ) ;
158- info ! ( "Returned {count} rows in {ms} ms" ) ;
159- abort_notifier. finished ( ) ;
147+ let start = Instant :: now ( ) ;
148+ let mut count: usize = 0 ;
149+
150+ let stream = async_stream:: stream! {
151+ // Stream the data
152+ while let Some ( batch) = stream. next( ) . await {
153+ let batch = batch?;
154+ count += batch. num_rows( ) ;
155+ info!( "Gathered {count} rows, query still in progress.." ) ;
156+
157+ let mut writer = StreamWriter :: try_new( vec![ ] , batch. schema( ) . as_ref( ) ) ?;
158+ writer. write( & batch) ?;
159+ yield writer. into_inner( )
160+ }
161+
162+ // After stream completes gracefully - all cleanup code runs here
163+ let elapsed = start. elapsed( ) ;
164+ let ms = elapsed. as_secs_f64( ) * 1000.0 ;
165+ info!( "Finished executing query:\n {sql}" ) ;
166+ info!( "Returned {count} rows in {ms} ms" ) ;
167+ abort_notifier. finished( ) ;
168+ // keep the task alive
169+ drop( still_running_log_task) ;
170+ } ;
160171
161- Ok :: < _ , ( StatusCode , String ) > ( Json ( QueryResult { count, plan } ) )
172+ Ok ( Response :: builder ( )
173+ . header ( "content-type" , "application/octet-stream" )
174+ . body ( Body :: from_stream (
175+ stream. map_err ( |err| Status :: internal ( err. to_string ( ) ) ) ,
176+ ) )
177+ . expect ( "building a Response from a body should never fail" ) )
162178 }
163- . inspect_err ( |( _, msg) | {
164- error ! ( "Error executing query: {msg}" ) ;
165- } )
179+ . inspect_err ( |( _, msg) | {
180+ error ! ( "Error executing query: {msg}" ) ;
181+ } )
166182 } ) ,
167183 ) ,
168184 ) ;
0 commit comments