Skip to content

Commit 487477e

Browse files
committed
change all to runtime::execute_futures_in_parallel
1 parent 7cc3c19 commit 487477e

File tree

8 files changed

+36
-76
lines changed

8 files changed

+36
-76
lines changed

src/query/pipeline/sources/src/input_formats/impls/input_format_parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ impl InputFormat for InputFormatParquet {
148148
.map(|location| Self::get_split_batch(location.to_vec(), op.clone()))
149149
});
150150

151+
// TODO: Get from ctx.
151152
let thread_nums = 16;
152153
let permit_nums = 64;
153154
let result = execute_futures_in_parallel(

src/query/storages/fuse/src/io/files.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_base::runtime::execute_futures_in_parallel;
1718
use common_catalog::table_context::TableContext;
1819
use common_exception::Result;
1920
use opendal::Operator;
2021

21-
use crate::io::execute_futures_in_parallel;
22-
2322
// File related operations.
2423
pub struct Files {
2524
ctx: Arc<dyn TableContext>,
@@ -52,9 +51,12 @@ impl Files {
5251
.map(|location| Self::delete_files(self.operator.clone(), location.to_vec()))
5352
});
5453

54+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
55+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
5556
execute_futures_in_parallel(
56-
self.ctx.clone(),
5757
tasks,
58+
threads_nums,
59+
permit_nums,
5860
"batch-remove-files-worker".to_owned(),
5961
)
6062
.await?;

src/query/storages/fuse/src/io/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@
1414

1515
mod files;
1616
mod locations;
17-
mod parallel;
1817
mod read;
1918
mod segments;
2019
mod snapshots;
2120
mod write;
2221

2322
pub use files::Files;
2423
pub use locations::TableMetaLocationGenerator;
25-
pub use parallel::execute_futures_in_parallel;
2624
pub use read::BlockReader;
2725
pub use read::BloomBlockFilterReader;
2826
pub use read::MergeIOReadResult;

src/query/storages/fuse/src/io/parallel.rs

Lines changed: 0 additions & 56 deletions
This file was deleted.

src/query/storages/fuse/src/io/segments.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_base::runtime::execute_futures_in_parallel;
1718
use common_catalog::table_context::TableContext;
1819
use common_exception::Result;
1920
use common_expression::TableSchemaRef;
@@ -23,7 +24,6 @@ use storages_common_table_meta::meta::Location;
2324
use storages_common_table_meta::meta::SegmentInfo;
2425
use tracing::Instrument;
2526

26-
use crate::io::execute_futures_in_parallel;
2727
use crate::io::MetaReaders;
2828

2929
// Read segment related operations.
@@ -88,9 +88,12 @@ impl SegmentsIO {
8888
}
8989
});
9090

91+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
92+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
9193
execute_futures_in_parallel(
92-
self.ctx.clone(),
9394
tasks,
95+
threads_nums,
96+
permit_nums,
9497
"fuse-req-segments-worker".to_owned(),
9598
)
9699
.await
@@ -143,9 +146,12 @@ impl SegmentsIO {
143146
})
144147
});
145148

149+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
150+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
146151
execute_futures_in_parallel(
147-
self.ctx.clone(),
148152
tasks,
153+
threads_nums,
154+
permit_nums,
149155
"fuse-req-segments-worker".to_owned(),
150156
)
151157
.await

src/query/storages/fuse/src/io/snapshots.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::time::Instant;
2020

2121
use chrono::DateTime;
2222
use chrono::Utc;
23+
use common_base::runtime::execute_futures_in_parallel;
2324
use common_catalog::table_context::TableContext;
2425
use common_exception::ErrorCode;
2526
use common_exception::Result;
@@ -37,7 +38,6 @@ use tracing::info;
3738
use tracing::warn;
3839
use tracing::Instrument;
3940

40-
use crate::io::execute_futures_in_parallel;
4141
use crate::io::MetaReaders;
4242
use crate::io::SnapshotHistoryReader;
4343
use crate::io::TableMetaLocationGenerator;
@@ -168,9 +168,12 @@ impl SnapshotsIO {
168168
})
169169
});
170170

171+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
172+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
171173
execute_futures_in_parallel(
172-
self.ctx.clone(),
173174
tasks,
175+
threads_nums,
176+
permit_nums,
174177
"fuse-req-snapshots-worker".to_owned(),
175178
)
176179
.await

src/query/storages/fuse/src/operations/merge_into/processors/transform_mutation_aggregator.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
use std::sync::Arc;
1717

18+
use common_base::runtime::execute_futures_in_parallel;
1819
use common_catalog::table_context::TableContext;
1920
use common_exception::ErrorCode;
2021
use common_exception::Result;
@@ -30,7 +31,6 @@ use storages_common_table_meta::meta::Location;
3031
use storages_common_table_meta::meta::SegmentInfo;
3132
use tracing::debug;
3233

33-
use crate::io::execute_futures_in_parallel;
3434
use crate::io::SegmentsIO;
3535
use crate::io::TableMetaLocationGenerator;
3636
use crate::operations::merge_into::mutation_meta::mutation_log::CommitMeta;
@@ -148,10 +148,10 @@ impl TableMutationAggregator {
148148

149149
// TODO use batch_meta_writer
150150
async fn write_segments(&self, segments: Vec<SerializedSegment>) -> Result<()> {
151-
let mut handles = Vec::with_capacity(segments.len());
151+
let mut tasks = Vec::with_capacity(segments.len());
152152
for segment in segments {
153153
let op = self.dal.clone();
154-
handles.push(async move {
154+
tasks.push(async move {
155155
op.write(&segment.path, segment.raw_data).await?;
156156
if let Some(segment_cache) = CacheManager::instance().get_table_segment_cache() {
157157
segment_cache.put(segment.path.clone(), segment.segment.clone());
@@ -160,9 +160,12 @@ impl TableMutationAggregator {
160160
});
161161
}
162162

163+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
164+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
163165
execute_futures_in_parallel(
164-
self.ctx.clone(),
165-
handles,
166+
tasks,
167+
threads_nums,
168+
permit_nums,
166169
"mutation-write-segments-worker".to_owned(),
167170
)
168171
.await?

src/query/storages/fuse/src/operations/mutation/mutation_transform.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::collections::BTreeMap;
1717
use std::collections::HashMap;
1818
use std::sync::Arc;
1919

20+
use common_base::runtime::execute_futures_in_parallel;
2021
use common_catalog::table_context::TableContext;
2122
use common_exception::ErrorCode;
2223
use common_exception::Result;
@@ -31,7 +32,6 @@ use storages_common_table_meta::meta::Location;
3132
use storages_common_table_meta::meta::SegmentInfo;
3233
use storages_common_table_meta::meta::Statistics;
3334

34-
use crate::io::execute_futures_in_parallel;
3535
use crate::io::SegmentsIO;
3636
use crate::io::TableMetaLocationGenerator;
3737
use crate::operations::mutation::AbortOperation;
@@ -149,10 +149,10 @@ impl MutationTransform {
149149
}
150150

151151
async fn write_segments(&self, segments: Vec<SerializedData>) -> Result<()> {
152-
let mut handles = Vec::with_capacity(segments.len());
152+
let mut tasks = Vec::with_capacity(segments.len());
153153
for segment in segments {
154154
let op = self.dal.clone();
155-
handles.push(async move {
155+
tasks.push(async move {
156156
op.write(&segment.location, segment.data).await?;
157157
if let Some(segment_cache) = CacheManager::instance().get_table_segment_cache() {
158158
segment_cache.put(segment.location.clone(), segment.segment.clone());
@@ -161,9 +161,12 @@ impl MutationTransform {
161161
});
162162
}
163163

164+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
165+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
164166
execute_futures_in_parallel(
165-
self.ctx.clone(),
166-
handles,
167+
tasks,
168+
threads_nums,
169+
permit_nums,
167170
"mutation-write-segments-worker".to_owned(),
168171
)
169172
.await?

0 commit comments

Comments
 (0)