Skip to content

Commit 6a927a2

Browse files
authored
Merge branch 'main' into pivot
2 parents 65e7301 + 796ec31 commit 6a927a2

File tree

5 files changed

+63
-65
lines changed

5 files changed

+63
-65
lines changed

src/query/service/src/api/rpc/exchange/exchange_manager.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ use crate::api::rpc::exchange::exchange_sink::ExchangeSink;
4242
use crate::api::rpc::exchange::exchange_transform::ExchangeTransform;
4343
use crate::api::rpc::exchange::statistics_receiver::StatisticsReceiver;
4444
use crate::api::rpc::exchange::statistics_sender::StatisticsSender;
45+
use crate::api::rpc::flight_client::FlightExchange;
4546
use crate::api::rpc::flight_client::FlightReceiver;
4647
use crate::api::rpc::flight_client::FlightSender;
47-
use crate::api::rpc::flight_client::NewFlightExchange;
4848
use crate::api::rpc::Packet;
4949
use crate::api::DataExchange;
5050
use crate::api::DefaultExchangeInjector;
@@ -218,7 +218,7 @@ impl DataExchangeManager {
218218
}
219219
}
220220

221-
pub fn new_handle_exchange_fragment(
221+
pub fn handle_exchange_fragment(
222222
&self,
223223
query: String,
224224
target: String,
@@ -228,10 +228,10 @@ impl DataExchangeManager {
228228
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };
229229

230230
match queries_coordinator.entry(query) {
231-
Entry::Occupied(mut v) => v.get_mut().new_add_fragment_exchange(target, fragment),
231+
Entry::Occupied(mut v) => v.get_mut().add_fragment_exchange(target, fragment),
232232
Entry::Vacant(v) => v
233233
.insert(QueryCoordinator::create())
234-
.new_add_fragment_exchange(target, fragment),
234+
.add_fragment_exchange(target, fragment),
235235
}
236236
}
237237

@@ -308,12 +308,12 @@ impl DataExchangeManager {
308308
match queries_coordinator.get_mut(&query_id) {
309309
None => Err(ErrorCode::Internal("Query not exists.")),
310310
Some(query_coordinator) => {
311-
query_coordinator.new_fragment_exchanges.clear();
311+
query_coordinator.fragment_exchanges.clear();
312312
let injector = DefaultExchangeInjector::create();
313313
let mut build_res =
314314
query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?;
315315

316-
let exchanges = std::mem::take(&mut query_coordinator.new_statistics_exchanges);
316+
let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges);
317317
let statistics_receiver = StatisticsReceiver::spawn_receiver(&ctx, exchanges)?;
318318

319319
let statistics_receiver: Mutex<StatisticsReceiver> =
@@ -396,17 +396,17 @@ struct QueryCoordinator {
396396
info: Option<QueryInfo>,
397397
fragments_coordinator: HashMap<usize, Box<FragmentCoordinator>>,
398398

399-
new_statistics_exchanges: HashMap<String, Vec<NewFlightExchange>>,
400-
new_fragment_exchanges: HashMap<(String, usize, u8), NewFlightExchange>,
399+
statistics_exchanges: HashMap<String, Vec<FlightExchange>>,
400+
fragment_exchanges: HashMap<(String, usize, u8), FlightExchange>,
401401
}
402402

403403
impl QueryCoordinator {
404404
pub fn create() -> QueryCoordinator {
405405
QueryCoordinator {
406406
info: None,
407407
fragments_coordinator: HashMap::new(),
408-
new_fragment_exchanges: HashMap::new(),
409-
new_statistics_exchanges: HashMap::new(),
408+
fragment_exchanges: HashMap::new(),
409+
statistics_exchanges: HashMap::new(),
410410
}
411411
}
412412

@@ -415,12 +415,12 @@ impl QueryCoordinator {
415415
target: String,
416416
) -> Result<Receiver<Result<FlightData, Status>>> {
417417
let (tx, rx) = async_channel::bounded(8);
418-
match self.new_statistics_exchanges.entry(target) {
418+
match self.statistics_exchanges.entry(target) {
419419
Entry::Vacant(v) => {
420-
v.insert(vec![NewFlightExchange::create_sender(tx)]);
420+
v.insert(vec![FlightExchange::create_sender(tx)]);
421421
}
422422
Entry::Occupied(mut v) => {
423-
v.get_mut().push(NewFlightExchange::create_sender(tx));
423+
v.get_mut().push(FlightExchange::create_sender(tx));
424424
}
425425
};
426426

@@ -429,10 +429,10 @@ impl QueryCoordinator {
429429

430430
pub fn add_statistics_exchanges(
431431
&mut self,
432-
exchanges: HashMap<String, NewFlightExchange>,
432+
exchanges: HashMap<String, FlightExchange>,
433433
) -> Result<()> {
434434
for (source, exchange) in exchanges.into_iter() {
435-
match self.new_statistics_exchanges.entry(source) {
435+
match self.statistics_exchanges.entry(source) {
436436
Entry::Vacant(v) => {
437437
v.insert(vec![exchange]);
438438
}
@@ -445,25 +445,25 @@ impl QueryCoordinator {
445445
Ok(())
446446
}
447447

448-
pub fn new_add_fragment_exchange(
448+
pub fn add_fragment_exchange(
449449
&mut self,
450450
target: String,
451451
fragment: usize,
452452
) -> Result<Receiver<Result<FlightData, Status>>> {
453453
let (tx, rx) = async_channel::bounded(8);
454-
self.new_fragment_exchanges.insert(
454+
self.fragment_exchanges.insert(
455455
(target, fragment, FLIGHT_SENDER),
456-
NewFlightExchange::create_sender(tx),
456+
FlightExchange::create_sender(tx),
457457
);
458458
Ok(rx)
459459
}
460460

461461
pub fn add_fragment_exchanges(
462462
&mut self,
463-
exchanges: HashMap<(String, usize), NewFlightExchange>,
463+
exchanges: HashMap<(String, usize), FlightExchange>,
464464
) -> Result<()> {
465465
for ((source, fragment), exchange) in exchanges.into_iter() {
466-
self.new_fragment_exchanges
466+
self.fragment_exchanges
467467
.insert((source, fragment, FLIGHT_RECEIVER), exchange);
468468
}
469469

@@ -474,7 +474,7 @@ impl QueryCoordinator {
474474
match params {
475475
ExchangeParams::MergeExchange(params) => {
476476
let mut exchanges = vec![];
477-
for ((_target, fragment, role), exchange) in &self.new_fragment_exchanges {
477+
for ((_target, fragment, role), exchange) in &self.fragment_exchanges {
478478
if *fragment == params.fragment_id && *role == FLIGHT_SENDER {
479479
exchanges.push(exchange.as_sender());
480480
}
@@ -488,7 +488,7 @@ impl QueryCoordinator {
488488
for destination in &params.destination_ids {
489489
exchanges.push(match destination == &params.executor_id {
490490
true => Ok(FlightSender::create(async_channel::bounded(1).0)),
491-
false => match self.new_fragment_exchanges.get(&(
491+
false => match self.fragment_exchanges.get(&(
492492
destination.clone(),
493493
params.fragment_id,
494494
FLIGHT_SENDER,
@@ -511,7 +511,7 @@ impl QueryCoordinator {
511511
match params {
512512
ExchangeParams::MergeExchange(params) => {
513513
let mut exchanges = vec![];
514-
for ((_target, fragment, role), exchange) in &self.new_fragment_exchanges {
514+
for ((_target, fragment, role), exchange) in &self.fragment_exchanges {
515515
if *fragment == params.fragment_id && *role == FLIGHT_RECEIVER {
516516
exchanges.push(exchange.as_receiver());
517517
}
@@ -525,7 +525,7 @@ impl QueryCoordinator {
525525
for destination in &params.destination_ids {
526526
exchanges.push(match destination == &params.executor_id {
527527
true => Ok(FlightReceiver::create(async_channel::bounded(1).1)),
528-
false => match self.new_fragment_exchanges.get(&(
528+
false => match self.fragment_exchanges.get(&(
529529
destination.clone(),
530530
params.fragment_id,
531531
FLIGHT_RECEIVER,
@@ -692,13 +692,13 @@ impl QueryCoordinator {
692692

693693
let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
694694

695-
self.new_fragment_exchanges.clear();
695+
self.fragment_exchanges.clear();
696696
let info_mut = self.info.as_mut().expect("Query info is None");
697697
info_mut.query_executor = Some(executor.clone());
698698

699699
let query_id = info_mut.query_id.clone();
700700
let query_ctx = info_mut.query_ctx.clone();
701-
let request_server_exchanges = std::mem::take(&mut self.new_statistics_exchanges);
701+
let request_server_exchanges = std::mem::take(&mut self.statistics_exchanges);
702702

703703
if request_server_exchanges.len() != 1 {
704704
return Err(ErrorCode::Internal(

src/query/service/src/api/rpc/exchange/statistics_receiver.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ use common_base::runtime::TrySpawn;
2929
use common_exception::ErrorCode;
3030
use common_exception::Result;
3131

32+
use crate::api::rpc::flight_client::FlightExchange;
3233
use crate::api::rpc::flight_client::FlightSender;
33-
use crate::api::rpc::flight_client::NewFlightExchange;
3434
use crate::api::DataPacket;
3535
use crate::sessions::QueryContext;
3636

@@ -44,7 +44,7 @@ pub struct StatisticsReceiver {
4444
impl StatisticsReceiver {
4545
pub fn spawn_receiver(
4646
ctx: &Arc<QueryContext>,
47-
statistics_exchanges: HashMap<String, Vec<NewFlightExchange>>,
47+
statistics_exchanges: HashMap<String, Vec<FlightExchange>>,
4848
) -> Result<StatisticsReceiver> {
4949
let shutdown_notify = Arc::new(Notify::new());
5050
let shutdown_flag = Arc::new(AtomicBool::new(false));
@@ -55,14 +55,12 @@ impl StatisticsReceiver {
5555
debug_assert_eq!(exchanges.len(), 2);
5656

5757
let (tx, rx) = match (exchanges.remove(0), exchanges.remove(0)) {
58-
(
59-
tx @ NewFlightExchange::Sender { .. },
60-
rx @ NewFlightExchange::Receiver { .. },
61-
) => (tx.as_sender(), rx.as_receiver()),
62-
(
63-
rx @ NewFlightExchange::Receiver { .. },
64-
tx @ NewFlightExchange::Sender { .. },
65-
) => (tx.as_sender(), rx.as_receiver()),
58+
(tx @ FlightExchange::Sender { .. }, rx @ FlightExchange::Receiver { .. }) => {
59+
(tx.as_sender(), rx.as_receiver())
60+
}
61+
(rx @ FlightExchange::Receiver { .. }, tx @ FlightExchange::Sender { .. }) => {
62+
(tx.as_sender(), rx.as_receiver())
63+
}
6664
_ => unreachable!(),
6765
};
6866

src/query/service/src/api/rpc/exchange/statistics_sender.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use common_exception::ErrorCode;
2424
use common_exception::Result;
2525
use futures_util::future::Either;
2626

27+
use crate::api::rpc::flight_client::FlightExchange;
2728
use crate::api::rpc::flight_client::FlightSender;
28-
use crate::api::rpc::flight_client::NewFlightExchange;
2929
use crate::api::rpc::packets::PrecommitBlock;
3030
use crate::api::rpc::packets::ProgressInfo;
3131
use crate::api::DataPacket;
@@ -42,15 +42,15 @@ impl StatisticsSender {
4242
pub fn spawn_sender(
4343
query_id: &str,
4444
ctx: Arc<QueryContext>,
45-
mut exchanges: Vec<NewFlightExchange>,
45+
mut exchanges: Vec<FlightExchange>,
4646
) -> StatisticsSender {
4747
debug_assert_eq!(exchanges.len(), 2);
4848

4949
let (tx, rx) = match (exchanges.remove(0), exchanges.remove(0)) {
50-
(tx @ NewFlightExchange::Sender { .. }, rx @ NewFlightExchange::Receiver { .. }) => {
50+
(tx @ FlightExchange::Sender { .. }, rx @ FlightExchange::Receiver { .. }) => {
5151
(tx.as_sender(), rx.as_receiver())
5252
}
53-
(rx @ NewFlightExchange::Receiver { .. }, tx @ NewFlightExchange::Sender { .. }) => {
53+
(rx @ FlightExchange::Receiver { .. }, tx @ FlightExchange::Sender { .. }) => {
5454
(tx.as_sender(), rx.as_receiver())
5555
}
5656
_ => unreachable!(),

src/query/service/src/api/rpc/flight_client.rs

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl FlightClient {
6262
&mut self,
6363
query_id: &str,
6464
target: &str,
65-
) -> Result<NewFlightExchange> {
65+
) -> Result<FlightExchange> {
6666
let mut streaming = self
6767
.get_streaming(
6868
RequestBuilder::create(Ticket::default())
@@ -84,15 +84,15 @@ impl FlightClient {
8484
}
8585
});
8686

87-
Ok(NewFlightExchange::create_receiver(rx))
87+
Ok(FlightExchange::create_receiver(rx))
8888
}
8989

9090
pub async fn do_get(
9191
&mut self,
9292
query_id: &str,
9393
target: &str,
9494
fragment: usize,
95-
) -> Result<NewFlightExchange> {
95+
) -> Result<FlightExchange> {
9696
let mut streaming = self
9797
.get_streaming(
9898
RequestBuilder::create(Ticket::default())
@@ -117,7 +117,7 @@ impl FlightClient {
117117
}
118118
});
119119

120-
Ok(NewFlightExchange::create_receiver(rx))
120+
Ok(FlightExchange::create_receiver(rx))
121121
}
122122

123123
async fn get_streaming(&mut self, request: Request<Ticket>) -> Result<Streaming<FlightData>> {
@@ -149,7 +149,7 @@ impl FlightClient {
149149
}
150150

151151
pub struct FlightReceiver {
152-
state: Arc<NewState>,
152+
state: Arc<State>,
153153
dropped: AtomicBool,
154154
rx: Receiver<Result<FlightData>>,
155155
}
@@ -164,7 +164,7 @@ impl FlightReceiver {
164164
pub fn create(rx: Receiver<Result<FlightData>>) -> FlightReceiver {
165165
FlightReceiver {
166166
rx,
167-
state: NewState::create(),
167+
state: State::create(),
168168
dropped: AtomicBool::new(false),
169169
}
170170
}
@@ -188,7 +188,7 @@ impl FlightReceiver {
188188
}
189189

190190
pub struct FlightSender {
191-
state: Arc<NewState>,
191+
state: Arc<State>,
192192
dropped: AtomicBool,
193193
tx: Sender<Result<FlightData, Status>>,
194194
}
@@ -214,7 +214,7 @@ impl Drop for FlightSender {
214214
impl FlightSender {
215215
pub fn create(tx: Sender<Result<FlightData, Status>>) -> FlightSender {
216216
FlightSender {
217-
state: NewState::create(),
217+
state: State::create(),
218218
dropped: AtomicBool::new(false),
219219
tx,
220220
}
@@ -240,48 +240,48 @@ impl FlightSender {
240240
}
241241
}
242242

243-
pub struct NewState {
243+
pub struct State {
244244
strong_count: AtomicUsize,
245245
}
246246

247-
impl NewState {
248-
pub fn create() -> Arc<NewState> {
249-
Arc::new(NewState {
247+
impl State {
248+
pub fn create() -> Arc<State> {
249+
Arc::new(State {
250250
strong_count: AtomicUsize::new(0),
251251
})
252252
}
253253
}
254254

255-
pub enum NewFlightExchange {
255+
pub enum FlightExchange {
256256
Dummy,
257257
Receiver {
258-
state: Arc<NewState>,
258+
state: Arc<State>,
259259
receiver: Receiver<Result<FlightData>>,
260260
},
261261
Sender {
262-
state: Arc<NewState>,
262+
state: Arc<State>,
263263
sender: Sender<Result<FlightData, Status>>,
264264
},
265265
}
266266

267-
impl NewFlightExchange {
268-
pub fn create_sender(sender: Sender<Result<FlightData, Status>>) -> NewFlightExchange {
269-
NewFlightExchange::Sender {
267+
impl FlightExchange {
268+
pub fn create_sender(sender: Sender<Result<FlightData, Status>>) -> FlightExchange {
269+
FlightExchange::Sender {
270270
sender,
271-
state: NewState::create(),
271+
state: State::create(),
272272
}
273273
}
274274

275-
pub fn create_receiver(receiver: Receiver<Result<FlightData>>) -> NewFlightExchange {
276-
NewFlightExchange::Receiver {
275+
pub fn create_receiver(receiver: Receiver<Result<FlightData>>) -> FlightExchange {
276+
FlightExchange::Receiver {
277277
receiver,
278-
state: NewState::create(),
278+
state: State::create(),
279279
}
280280
}
281281

282282
pub fn as_sender(&self) -> FlightSender {
283283
match self {
284-
NewFlightExchange::Sender { state, sender } => {
284+
FlightExchange::Sender { state, sender } => {
285285
state.strong_count.fetch_add(1, Ordering::SeqCst);
286286

287287
FlightSender {
@@ -296,7 +296,7 @@ impl NewFlightExchange {
296296

297297
pub fn as_receiver(&self) -> FlightReceiver {
298298
match self {
299-
NewFlightExchange::Receiver { state, receiver } => {
299+
FlightExchange::Receiver { state, receiver } => {
300300
state.strong_count.fetch_add(1, Ordering::SeqCst);
301301

302302
FlightReceiver {

0 commit comments

Comments
 (0)