Skip to content

Commit 46a0fb8

Browse files
authored
Merge pull request #10532 from zhang2014/fix/cluster_hang
fix(cluster): using global io spawn flight client
2 parents f09de52 + 2f4b232 commit 46a0fb8

File tree

8 files changed

+520
-138
lines changed

8 files changed

+520
-138
lines changed

src/query/service/src/api/rpc/exchange/exchange_sink.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ impl ExchangeSink {
7979
Ok(ProcessorPtr::create(ExchangeWriterSink::create(
8080
input,
8181
flight_exchange.clone(),
82+
params.query_id.to_string(),
83+
params.fragment_id,
8284
)))
8385
})
8486
}
@@ -87,7 +89,11 @@ impl ExchangeSink {
8789

8890
// exchange writer sink
8991
let len = pipeline.output_len();
90-
let items = create_writer_items(flight_exchanges);
92+
let items = create_writer_items(
93+
flight_exchanges,
94+
params.query_id.clone(),
95+
params.fragment_id,
96+
);
9197
pipeline.add_pipe(Pipe::create(len, 0, items));
9298
Ok(())
9399
}

src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,27 @@ use common_pipeline_core::processors::processor::ProcessorPtr;
2424
use common_pipeline_core::processors::Processor;
2525
use common_pipeline_sinks::AsyncSink;
2626
use common_pipeline_sinks::AsyncSinker;
27+
use tracing::info;
2728

2829
use crate::api::rpc::exchange::serde::exchange_serializer::ExchangeSerializeMeta;
2930
use crate::api::rpc::flight_client::FlightExchangeRef;
3031

3132
pub struct ExchangeWriterSink {
33+
query_id: String,
34+
fragment: usize,
3235
exchange: FlightExchangeRef,
3336
}
3437

3538
impl ExchangeWriterSink {
36-
pub fn create(input: Arc<InputPort>, flight_exchange: FlightExchangeRef) -> Box<dyn Processor> {
39+
pub fn create(
40+
input: Arc<InputPort>,
41+
flight_exchange: FlightExchangeRef,
42+
query_id: String,
43+
fragment: usize,
44+
) -> Box<dyn Processor> {
3745
AsyncSinker::create(input, ExchangeWriterSink {
46+
query_id,
47+
fragment,
3848
exchange: flight_exchange,
3949
})
4050
}
@@ -45,12 +55,20 @@ impl AsyncSink for ExchangeWriterSink {
4555
const NAME: &'static str = "ExchangeWriterSink";
4656

4757
async fn on_start(&mut self) -> Result<()> {
48-
self.exchange.close_input().await;
58+
let res = self.exchange.close_input().await;
59+
info!(
60+
"Started query:{:?}, fragment:{:?} exchange write. {}",
61+
self.query_id, self.fragment, res
62+
);
4963
Ok(())
5064
}
5165

5266
async fn on_finish(&mut self) -> Result<()> {
53-
self.exchange.close_output().await;
67+
let res = self.exchange.close_output().await;
68+
info!(
69+
"Finished query:{:?}, fragment:{:?} exchange write. {}",
70+
self.query_id, self.fragment, res
71+
);
5472
Ok(())
5573
}
5674

@@ -79,15 +97,34 @@ impl AsyncSink for ExchangeWriterSink {
7997
}
8098
}
8199

82-
pub fn create_writer_item(exchange: FlightExchangeRef) -> PipeItem {
100+
pub fn create_writer_item(
101+
exchange: FlightExchangeRef,
102+
query_id: String,
103+
fragment: usize,
104+
) -> PipeItem {
83105
let input = InputPort::create();
84106
PipeItem::create(
85-
ProcessorPtr::create(ExchangeWriterSink::create(input.clone(), exchange)),
107+
ProcessorPtr::create(ExchangeWriterSink::create(
108+
input.clone(),
109+
exchange,
110+
query_id,
111+
fragment,
112+
)),
86113
vec![input],
87114
vec![],
88115
)
89116
}
90117

91-
pub fn create_writer_items(exchanges: Vec<FlightExchangeRef>) -> Vec<PipeItem> {
92-
exchanges.into_iter().map(create_writer_item).collect()
118+
pub fn create_writer_items(
119+
exchanges: Vec<FlightExchangeRef>,
120+
query_id: String,
121+
fragment: usize,
122+
) -> Vec<PipeItem> {
123+
let mut items = Vec::with_capacity(exchanges.len());
124+
125+
for exchange in exchanges {
126+
items.push(create_writer_item(exchange, query_id.clone(), fragment));
127+
}
128+
129+
items
93130
}

src/query/service/src/api/rpc/exchange/exchange_source.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,13 @@ pub fn via_exchange_source(
5454

5555
let last_output_len = pipeline.output_len();
5656
let flight_exchanges_len = flight_exchanges.len();
57-
exchange_source_reader::via_reader(last_output_len, flight_exchanges, pipeline);
57+
exchange_source_reader::via_reader(
58+
last_output_len,
59+
flight_exchanges,
60+
pipeline,
61+
params.query_id.clone(),
62+
params.fragment_id,
63+
);
5864

5965
injector.apply_merge_deserializer(flight_exchanges_len, params, pipeline)
6066
}

src/query/service/src/api/rpc/exchange/exchange_source_reader.rs

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use common_pipeline_core::processors::processor::ProcessorPtr;
2525
use common_pipeline_core::processors::Processor;
2626
use common_pipeline_core::Pipeline;
2727
use common_pipeline_transforms::processors::transforms::TransformDummy;
28+
use tracing::info;
2829

2930
use crate::api::rpc::exchange::serde::exchange_deserializer::ExchangeDeserializeMeta;
3031
use crate::api::rpc::flight_client::FlightExchangeRef;
@@ -33,19 +34,28 @@ use crate::api::DataPacket;
3334
pub struct ExchangeSourceReader {
3435
finished: bool,
3536
initialized: bool,
37+
query_id: String,
38+
fragment: usize,
3639
output: Arc<OutputPort>,
3740
output_data: Option<DataPacket>,
3841
flight_exchange: FlightExchangeRef,
3942
}
4043

4144
impl ExchangeSourceReader {
42-
pub fn create(output: Arc<OutputPort>, flight_exchange: FlightExchangeRef) -> ProcessorPtr {
45+
pub fn create(
46+
output: Arc<OutputPort>,
47+
flight_exchange: FlightExchangeRef,
48+
query_id: String,
49+
fragment: usize,
50+
) -> ProcessorPtr {
4351
ProcessorPtr::create(Box::new(ExchangeSourceReader {
4452
output,
4553
flight_exchange,
4654
finished: false,
4755
output_data: None,
4856
initialized: false,
57+
fragment,
58+
query_id,
4959
}))
5060
}
5161
}
@@ -90,7 +100,16 @@ impl Processor for ExchangeSourceReader {
90100
async fn async_process(&mut self) -> common_exception::Result<()> {
91101
if !self.initialized {
92102
self.initialized = true;
93-
self.flight_exchange.close_output().await;
103+
info!(
104+
"Start query:{:?}, fragment:{:?} exchange read.",
105+
self.query_id, self.fragment
106+
);
107+
let res = self.flight_exchange.close_output().await;
108+
109+
info!(
110+
"Started query:{:?}, fragment:{:?} exchange read. {}",
111+
self.query_id, self.fragment, res
112+
);
94113
}
95114

96115
if self.output_data.is_none() {
@@ -102,14 +121,29 @@ impl Processor for ExchangeSourceReader {
102121

103122
if !self.finished {
104123
self.finished = true;
105-
self.flight_exchange.close_input().await;
124+
info!(
125+
"Finish query:{:?}, fragment:{:?} exchange read.",
126+
self.query_id, self.fragment
127+
);
128+
let res = self.flight_exchange.close_input().await;
129+
130+
info!(
131+
"Finished query:{:?}, fragment:{:?} exchange read. {}",
132+
self.query_id, self.fragment, res
133+
);
106134
}
107135

108136
Ok(())
109137
}
110138
}
111139

112-
pub fn via_reader(prefix_size: usize, exchanges: Vec<FlightExchangeRef>, pipeline: &mut Pipeline) {
140+
pub fn via_reader(
141+
prefix_size: usize,
142+
exchanges: Vec<FlightExchangeRef>,
143+
pipeline: &mut Pipeline,
144+
query_id: String,
145+
fragment: usize,
146+
) {
113147
let mut items = Vec::with_capacity(prefix_size + exchanges.len());
114148

115149
for _index in 0..prefix_size {
@@ -126,7 +160,12 @@ pub fn via_reader(prefix_size: usize, exchanges: Vec<FlightExchangeRef>, pipelin
126160
for flight_exchange in exchanges {
127161
let output = OutputPort::create();
128162
items.push(PipeItem::create(
129-
ExchangeSourceReader::create(output.clone(), flight_exchange),
163+
ExchangeSourceReader::create(
164+
output.clone(),
165+
flight_exchange,
166+
query_id.clone(),
167+
fragment,
168+
),
130169
vec![],
131170
vec![output],
132171
));
@@ -135,10 +174,14 @@ pub fn via_reader(prefix_size: usize, exchanges: Vec<FlightExchangeRef>, pipelin
135174
pipeline.add_pipe(Pipe::create(prefix_size, items.len(), items));
136175
}
137176

138-
pub fn create_reader_item(exchange: FlightExchangeRef) -> PipeItem {
177+
pub fn create_reader_item(
178+
exchange: FlightExchangeRef,
179+
query_id: String,
180+
fragment: usize,
181+
) -> PipeItem {
139182
let output = OutputPort::create();
140183
PipeItem::create(
141-
ExchangeSourceReader::create(output.clone(), exchange),
184+
ExchangeSourceReader::create(output.clone(), exchange, query_id, fragment),
142185
vec![],
143186
vec![output],
144187
)

src/query/service/src/api/rpc/exchange/exchange_transform.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ impl ExchangeTransform {
5959
items.push(match destination_id == &params.executor_id {
6060
true if max_threads == 1 => create_dummy_item(),
6161
true => create_resize_item(1, max_threads),
62-
false => create_writer_item(exchange),
62+
false => create_writer_item(
63+
exchange,
64+
params.query_id.clone(),
65+
params.fragment_id,
66+
),
6367
});
6468
}
6569

@@ -68,7 +72,11 @@ impl ExchangeTransform {
6872
for (destination_id, exchange) in params.destination_ids.iter().zip(exchanges) {
6973
if destination_id != &params.executor_id {
7074
nodes_source += 1;
71-
items.push(create_reader_item(exchange));
75+
items.push(create_reader_item(
76+
exchange,
77+
params.query_id.clone(),
78+
params.fragment_id,
79+
));
7280
}
7381
}
7482

src/query/service/src/api/rpc/exchange/statistics_sender.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use std::sync::Arc;
1818

1919
use async_channel::Receiver;
2020
use async_channel::Sender;
21-
use common_base::runtime::GlobalQueryRuntime;
21+
use common_base::base::tokio::task::JoinHandle;
2222
use common_base::runtime::TrySpawn;
2323
use common_catalog::table_context::TableContext;
2424
use common_exception::ErrorCode;
@@ -38,6 +38,7 @@ pub struct StatisticsSender {
3838
shutdown_flag: Arc<AtomicBool>,
3939
shutdown_flag_sender: Sender<Option<ErrorCode>>,
4040
shutdown_flag_receiver: Receiver<Option<ErrorCode>>,
41+
join_handle: Option<JoinHandle<()>>,
4142
}
4243

4344
impl StatisticsSender {
@@ -54,6 +55,7 @@ impl StatisticsSender {
5455
exchange: Some(exchange),
5556
query_id: query_id.to_string(),
5657
shutdown_flag: Arc::new(AtomicBool::new(false)),
58+
join_handle: None,
5759
}
5860
}
5961

@@ -64,8 +66,8 @@ impl StatisticsSender {
6466
let shutdown_flag = self.shutdown_flag.clone();
6567
let shutdown_flag_receiver = self.shutdown_flag_receiver.clone();
6668

67-
let spawner = GlobalQueryRuntime::instance();
68-
spawner.runtime().spawn(async move {
69+
let spawner = ctx.clone();
70+
self.join_handle = Some(spawner.spawn(async move {
6971
let mut recv = Box::pin(flight_exchange.recv());
7072
let mut notified = Box::pin(shutdown_flag_receiver.recv());
7173

@@ -113,13 +115,14 @@ impl StatisticsSender {
113115

114116
flight_exchange.close_input().await;
115117
flight_exchange.close_output().await;
116-
});
118+
}));
117119
}
118120

119121
pub fn shutdown(&mut self, error: Option<ErrorCode>) {
120122
self.shutdown_flag.store(true, Ordering::Release);
121123
let shutdown_flag_sender = self.shutdown_flag_sender.clone();
122124

125+
let join_handle = self.join_handle.take();
123126
futures::executor::block_on(async move {
124127
if let Err(error_code) = shutdown_flag_sender.send(error).await {
125128
tracing::warn!(
@@ -129,6 +132,10 @@ impl StatisticsSender {
129132
}
130133

131134
shutdown_flag_sender.close();
135+
136+
if let Some(join_handle) = join_handle {
137+
let _ = join_handle.await;
138+
}
132139
});
133140
}
134141

0 commit comments

Comments
 (0)