File tree Expand file tree Collapse file tree 4 files changed +37
-21
lines changed
Expand file tree Collapse file tree 4 files changed +37
-21
lines changed Original file line number Diff line number Diff line change @@ -65,11 +65,20 @@ impl<T: AsyncSink + 'static> AsyncSinker<T> {
6565
6666impl < T : AsyncSink + ' static > Drop for AsyncSinker < T > {
6767 fn drop ( & mut self ) {
68- if !self . called_on_finish {
69- self . called_on_finish = true ;
68+ if !self . called_on_start || !self . called_on_finish {
7069 if let Some ( mut inner) = self . inner . take ( ) {
71- GlobalIORuntime :: instance ( ) . spawn ( async move {
72- let _ = inner. on_finish ( ) . await ;
70+ GlobalIORuntime :: instance ( ) . spawn ( {
71+ let called_on_start = self . called_on_start ;
72+ let called_on_finish = self . called_on_finish ;
73+ async move {
74+ if !called_on_start {
75+ let _ = inner. on_start ( ) . await ;
76+ }
77+
78+ if !called_on_finish {
79+ let _ = inner. on_finish ( ) . await ;
80+ }
81+ }
7382 } ) ;
7483 }
7584 }
Original file line number Diff line number Diff line change @@ -55,6 +55,11 @@ impl AsyncSink for ExchangeWriterSink {
5555 const NAME : & ' static str = "ExchangeWriterSink" ;
5656
5757 async fn on_start ( & mut self ) -> Result < ( ) > {
58+ info ! (
59+ "Start query:{:?}, fragment:{:?} exchange write." ,
60+ self . query_id, self . fragment
61+ ) ;
62+
5863 let res = self . exchange . close_input ( ) . await ;
5964 info ! (
6065 "Started query:{:?}, fragment:{:?} exchange write. {}" ,
@@ -64,6 +69,11 @@ impl AsyncSink for ExchangeWriterSink {
6469 }
6570
6671 async fn on_finish ( & mut self ) -> Result < ( ) > {
72+ info ! (
73+ "Finish query:{:?}, fragment:{:?} exchange write." ,
74+ self . query_id, self . fragment
75+ ) ;
76+
6777 let res = self . exchange . close_output ( ) . await ;
6878 info ! (
6979 "Finished query:{:?}, fragment:{:?} exchange write. {}" ,
Original file line number Diff line number Diff line change @@ -33,7 +33,6 @@ use crate::api::DataPacket;
3333
3434pub struct ExchangeSourceReader {
3535 finished : bool ,
36- initialized : bool ,
3736 query_id : String ,
3837 fragment : usize ,
3938 output : Arc < OutputPort > ,
@@ -48,12 +47,13 @@ impl ExchangeSourceReader {
4847 query_id : String ,
4948 fragment : usize ,
5049 ) -> ProcessorPtr {
50+ flight_exchange. dec_output_ref ( ) ;
51+
5152 ProcessorPtr :: create ( Box :: new ( ExchangeSourceReader {
5253 output,
5354 flight_exchange,
5455 finished : false ,
5556 output_data : None ,
56- initialized : false ,
5757 fragment,
5858 query_id,
5959 } ) )
@@ -98,20 +98,6 @@ impl Processor for ExchangeSourceReader {
9898 }
9999
100100 async fn async_process ( & mut self ) -> common_exception:: Result < ( ) > {
101- if !self . initialized {
102- self . initialized = true ;
103- info ! (
104- "Start query:{:?}, fragment:{:?} exchange read." ,
105- self . query_id, self . fragment
106- ) ;
107- let res = self . flight_exchange . close_output ( ) . await ;
108-
109- info ! (
110- "Started query:{:?}, fragment:{:?} exchange read. {}" ,
111- self . query_id, self . fragment, res
112- ) ;
113- }
114-
115101 if self . output_data . is_none ( ) {
116102 if let Some ( output_data) = self . flight_exchange . recv ( ) . await ? {
117103 self . output_data = Some ( output_data) ;
Original file line number Diff line number Diff line change @@ -231,7 +231,6 @@ impl FlightExchange {
231231 Either :: Left ( ( _, right) ) => {
232232 debug_assert ! ( state. closed_both( ) ) ;
233233
234- // break 'loop_worker;
235234 tx. close ( ) ;
236235 drop ( network_tx) ;
237236 response_tx. close ( ) ;
@@ -467,6 +466,12 @@ impl FlightExchange {
467466
468467 ' publisher_worker: loop {
469468 if channel_state. closed_both ( ) {
469+ while let Ok ( response) = response_rx. try_recv ( ) {
470+ if network_tx. send ( response) . await . is_err ( ) {
471+ break ' publisher_worker;
472+ }
473+ }
474+
470475 break ' publisher_worker;
471476 }
472477
@@ -632,6 +637,12 @@ impl FlightExchangeRef {
632637 false
633638 }
634639
640+ pub fn dec_output_ref ( & self ) {
641+ if !self . is_closed_response . fetch_or ( true , Ordering :: SeqCst ) {
642+ self . state . response_count . fetch_sub ( 1 , Ordering :: SeqCst ) ;
643+ }
644+ }
645+
635646 pub async fn close_output ( & self ) -> bool {
636647 if self . is_closed_response . fetch_or ( true , Ordering :: SeqCst ) {
637648 return false ;
You can’t perform that action at this time.
0 commit comments