@@ -6,6 +6,7 @@ use axum::{Json, Router, extract::Query, http::StatusCode, routing::get};
66use dashmap:: { DashMap , Entry } ;
77use datafusion:: common:: DataFusionError ;
88use datafusion:: common:: instant:: Instant ;
9+ use datafusion:: common:: runtime:: SpawnedTask ;
910use datafusion:: execution:: { SessionState , SessionStateBuilder } ;
1011use datafusion:: physical_plan:: execute_stream;
1112use datafusion:: prelude:: SessionContext ;
@@ -14,15 +15,17 @@ use datafusion_distributed::{
1415 DistributedPhysicalOptimizerRule , DistributedSessionBuilder , DistributedSessionBuilderContext ,
1516 create_flight_client, display_plan_ascii,
1617} ;
17- use futures:: { TryFutureExt , TryStreamExt } ;
18- use log:: { error, info} ;
18+ use futures:: { StreamExt , TryFutureExt } ;
19+ use log:: { error, info, warn } ;
1920use object_store:: ObjectStore ;
2021use object_store:: aws:: AmazonS3Builder ;
2122use serde:: Serialize ;
2223use std:: collections:: HashMap ;
2324use std:: error:: Error ;
2425use std:: fmt:: Display ;
26+ use std:: sync:: atomic:: AtomicBool ;
2527use std:: sync:: { Arc , RwLock } ;
28+ use std:: time:: Duration ;
2629use structopt:: StructOpt ;
2730use tonic:: transport:: { Channel , Server } ;
2831use url:: Url ;
@@ -119,7 +122,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
119122 if sql. trim ( ) . is_empty ( ) {
120123 continue ;
121124 }
122- info ! ( "Executing query: {sql}" ) ;
123125 let df = ctx. sql ( sql) . await . map_err ( err) ?;
124126 df_opt = Some ( df) ;
125127 }
@@ -129,15 +131,32 @@ async fn main() -> Result<(), Box<dyn Error>> {
129131
130132 let start = Instant :: now ( ) ;
131133
134+ info ! ( "Executing query..." ) ;
135+ let abort_notifier = AbortNotifier :: new ( "Query aborted" ) ;
136+ let abort_notifier_clone = abort_notifier. clone ( ) ;
137+ let task = SpawnedTask :: spawn ( async move {
138+ let _ = abort_notifier_clone;
139+ loop {
140+ tokio:: time:: sleep ( Duration :: from_secs ( 5 ) ) . await ;
141+ info ! ( "Query still running..." ) ;
142+ }
143+ } ) ;
132144 let physical = df. create_physical_plan ( ) . await . map_err ( err) ?;
133- let stream = execute_stream ( physical. clone ( ) , ctx. task_ctx ( ) ) . map_err ( err) ?;
134- let batches = stream. try_collect :: < Vec < _ > > ( ) . await . map_err ( err) ?;
135- let count = batches. iter ( ) . map ( |b| b. num_rows ( ) ) . sum :: < usize > ( ) ;
145+ let mut stream =
146+ execute_stream ( physical. clone ( ) , ctx. task_ctx ( ) ) . map_err ( err) ?;
147+ let mut count = 0 ;
148+ while let Some ( batch) = stream. next ( ) . await {
149+ count += batch. map_err ( err) ?. num_rows ( ) ;
150+ info ! ( "Gathered {count} rows, query still in progress.." )
151+ }
136152 let plan = display_plan_ascii ( physical. as_ref ( ) , true ) ;
153+ drop ( task) ;
137154
138155 let elapsed = start. elapsed ( ) ;
139156 let ms = elapsed. as_secs_f64 ( ) * 1000.0 ;
157+ info ! ( "Finished executing query:\n {sql}\n \n {plan}" ) ;
140158 info ! ( "Returned {count} rows in {ms} ms" ) ;
159+ abort_notifier. finished ( ) ;
141160
142161 Ok :: < _ , ( StatusCode , String ) > ( Json ( QueryResult { count, plan } ) )
143162 }
@@ -162,6 +181,33 @@ async fn main() -> Result<(), Box<dyn Error>> {
162181 Ok ( ( ) )
163182}
164183
184+ struct AbortNotifier {
185+ aborted : AtomicBool ,
186+ msg : String ,
187+ }
188+
189+ impl AbortNotifier {
190+ fn new ( msg : impl Display ) -> Arc < Self > {
191+ Arc :: new ( AbortNotifier {
192+ aborted : AtomicBool :: new ( true ) ,
193+ msg : msg. to_string ( ) ,
194+ } )
195+ }
196+
197+ fn finished ( & self ) {
198+ self . aborted
199+ . store ( false , std:: sync:: atomic:: Ordering :: Relaxed )
200+ }
201+ }
202+
203+ impl Drop for AbortNotifier {
204+ fn drop ( & mut self ) {
205+ if self . aborted . load ( std:: sync:: atomic:: Ordering :: Relaxed ) {
206+ warn ! ( "{}" , self . msg) ;
207+ }
208+ }
209+ }
210+
165211fn err ( s : impl Display ) -> ( StatusCode , String ) {
166212 ( StatusCode :: INTERNAL_SERVER_ERROR , s. to_string ( ) )
167213}
@@ -222,7 +268,7 @@ async fn background_ec2_worker_resolver(urls: Arc<RwLock<Vec<Url>>>) {
222268 ) ;
223269 * urls. write ( ) . unwrap ( ) = workers;
224270 }
225- tokio:: time:: sleep ( tokio :: time :: Duration :: from_secs ( 1 ) ) . await ;
271+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
226272 }
227273 } ) ;
228274}
0 commit comments