Skip to content

Commit c2b403d

Browse files
authored
Merge pull request #10514 from BohuTANG/dev-fast-parquet-splits
feat: improve the parquet get splits to parallel
2 parents d28aa6a + 487477e commit c2b403d

File tree

10 files changed

+119
-88
lines changed

10 files changed

+119
-88
lines changed

src/common/base/src/runtime/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ mod thread_pool;
2323
pub use catch_unwind::catch_unwind;
2424
pub use catch_unwind::CatchUnwindFuture;
2525
pub use global_runtime::GlobalIORuntime;
26+
pub use runtime::execute_futures_in_parallel;
2627
pub use runtime::match_join_handle;
2728
pub use runtime::Dropper;
2829
pub use runtime::Runtime;

src/common/base/src/runtime/runtime.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::time::Instant;
2121

2222
use common_exception::ErrorCode;
2323
use common_exception::Result;
24+
use futures::future;
2425
use tokio::runtime::Builder;
2526
use tokio::runtime::Handle;
2627
use tokio::sync::oneshot;
@@ -325,3 +326,32 @@ pub async fn match_join_handle<T>(handle: JoinHandle<Result<T>>) -> Result<T> {
325326
},
326327
}
327328
}
329+
330+
/// Run multiple futures parallel
331+
/// using a semaphore to limit the parallelism number, and a specified thread pool to run the futures.
332+
/// It waits for all futures to complete and returns their results.
333+
pub async fn execute_futures_in_parallel<Fut>(
334+
futures: impl IntoIterator<Item = Fut>,
335+
thread_nums: usize,
336+
permit_nums: usize,
337+
thread_name: String,
338+
) -> Result<Vec<Fut::Output>>
339+
where
340+
Fut: Future + Send + 'static,
341+
Fut::Output: Send + 'static,
342+
{
343+
// 1. build the runtime.
344+
let semaphore = Semaphore::new(permit_nums);
345+
let runtime = Arc::new(Runtime::with_worker_threads(
346+
thread_nums,
347+
Some(thread_name),
348+
)?);
349+
350+
// 2. spawn all the tasks to the runtime with semaphore.
351+
let join_handlers = runtime.try_spawn_batch(semaphore, futures).await?;
352+
353+
// 3. get all the result.
354+
future::try_join_all(join_handlers)
355+
.await
356+
.map_err(|e| ErrorCode::Internal(format!("try join all futures failure, {}", e)))
357+
}

src/query/pipeline/sources/src/input_formats/impls/input_format_parquet.rs

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use common_arrow::parquet::metadata::ColumnChunkMetaData;
3434
use common_arrow::parquet::metadata::RowGroupMetaData;
3535
use common_arrow::parquet::read::read_metadata;
3636
use common_arrow::read_columns_async;
37+
use common_base::runtime::execute_futures_in_parallel;
3738
use common_catalog::plan::StageFileInfo;
3839
use common_exception::ErrorCode;
3940
use common_exception::Result;
@@ -63,18 +64,10 @@ use crate::input_formats::SplitInfo;
6364

6465
pub struct InputFormatParquet;
6566

66-
fn col_offset(meta: &ColumnChunkMetaData) -> i64 {
67-
meta.data_page_offset()
68-
}
69-
70-
#[async_trait::async_trait]
71-
impl InputFormat for InputFormatParquet {
72-
async fn get_splits(
73-
&self,
67+
impl InputFormatParquet {
68+
async fn get_split_batch(
7469
file_infos: Vec<StageFileInfo>,
75-
_stage_info: &StageInfo,
76-
op: &Operator,
77-
_settings: &Arc<Settings>,
70+
op: Operator,
7871
) -> Result<Vec<Arc<SplitInfo>>> {
7972
let mut infos = vec![];
8073
let mut schema = None;
@@ -124,8 +117,56 @@ impl InputFormat for InputFormatParquet {
124117
}
125118
}
126119
}
120+
127121
Ok(infos)
128122
}
123+
}
124+
125+
fn col_offset(meta: &ColumnChunkMetaData) -> i64 {
126+
meta.data_page_offset()
127+
}
128+
129+
#[async_trait::async_trait]
130+
impl InputFormat for InputFormatParquet {
131+
async fn get_splits(
132+
&self,
133+
file_infos: Vec<StageFileInfo>,
134+
_stage_info: &StageInfo,
135+
op: &Operator,
136+
_settings: &Arc<Settings>,
137+
) -> Result<Vec<Arc<SplitInfo>>> {
138+
let batch_size = 1000;
139+
140+
if file_infos.len() <= batch_size {
141+
Self::get_split_batch(file_infos, op.clone()).await
142+
} else {
143+
let mut chunks = file_infos.chunks(batch_size);
144+
145+
let tasks = std::iter::from_fn(move || {
146+
chunks
147+
.next()
148+
.map(|location| Self::get_split_batch(location.to_vec(), op.clone()))
149+
});
150+
151+
// TODO: Get from ctx.
152+
let thread_nums = 16;
153+
let permit_nums = 64;
154+
let result = execute_futures_in_parallel(
155+
tasks,
156+
thread_nums,
157+
permit_nums,
158+
"get-parquet-splits-worker".to_owned(),
159+
)
160+
.await?
161+
.into_iter()
162+
.collect::<Result<Vec<Vec<_>>>>()?
163+
.into_iter()
164+
.flatten()
165+
.collect();
166+
167+
Ok(result)
168+
}
169+
}
129170

130171
async fn infer_schema(&self, path: &str, op: &Operator) -> Result<TableSchemaRef> {
131172
let mut reader = op.reader(path).await?;

src/query/storages/fuse/src/io/files.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,11 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_base::runtime::execute_futures_in_parallel;
1718
use common_catalog::table_context::TableContext;
1819
use common_exception::Result;
1920
use opendal::Operator;
2021

21-
use crate::io::execute_futures_in_parallel;
22-
2322
// File related operations.
2423
pub struct Files {
2524
ctx: Arc<dyn TableContext>,
@@ -41,7 +40,7 @@ impl Files {
4140
let batch_size = 1000;
4241
let locations = Vec::from_iter(file_locations.into_iter().map(|v| v.as_ref().to_string()));
4342

44-
if locations.len() < batch_size {
43+
if locations.len() <= batch_size {
4544
Self::delete_files(self.operator.clone(), locations).await?;
4645
} else {
4746
let mut chunks = locations.chunks(batch_size);
@@ -52,9 +51,12 @@ impl Files {
5251
.map(|location| Self::delete_files(self.operator.clone(), location.to_vec()))
5352
});
5453

54+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
55+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
5556
execute_futures_in_parallel(
56-
self.ctx.clone(),
5757
tasks,
58+
threads_nums,
59+
permit_nums,
5860
"batch-remove-files-worker".to_owned(),
5961
)
6062
.await?;

src/query/storages/fuse/src/io/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@
1414

1515
mod files;
1616
mod locations;
17-
mod parallel;
1817
mod read;
1918
mod segments;
2019
mod snapshots;
2120
mod write;
2221

2322
pub use files::Files;
2423
pub use locations::TableMetaLocationGenerator;
25-
pub use parallel::execute_futures_in_parallel;
2624
pub use read::BlockReader;
2725
pub use read::BloomBlockFilterReader;
2826
pub use read::MergeIOReadResult;

src/query/storages/fuse/src/io/parallel.rs

Lines changed: 0 additions & 56 deletions
This file was deleted.

src/query/storages/fuse/src/io/segments.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use common_base::runtime::execute_futures_in_parallel;
1718
use common_catalog::table_context::TableContext;
1819
use common_exception::Result;
1920
use common_expression::TableSchemaRef;
@@ -23,7 +24,6 @@ use storages_common_table_meta::meta::Location;
2324
use storages_common_table_meta::meta::SegmentInfo;
2425
use tracing::Instrument;
2526

26-
use crate::io::execute_futures_in_parallel;
2727
use crate::io::MetaReaders;
2828

2929
// Read segment related operations.
@@ -88,9 +88,12 @@ impl SegmentsIO {
8888
}
8989
});
9090

91+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
92+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
9193
execute_futures_in_parallel(
92-
self.ctx.clone(),
9394
tasks,
95+
threads_nums,
96+
permit_nums,
9497
"fuse-req-segments-worker".to_owned(),
9598
)
9699
.await
@@ -143,9 +146,12 @@ impl SegmentsIO {
143146
})
144147
});
145148

149+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
150+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
146151
execute_futures_in_parallel(
147-
self.ctx.clone(),
148152
tasks,
153+
threads_nums,
154+
permit_nums,
149155
"fuse-req-segments-worker".to_owned(),
150156
)
151157
.await

src/query/storages/fuse/src/io/snapshots.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::time::Instant;
2020

2121
use chrono::DateTime;
2222
use chrono::Utc;
23+
use common_base::runtime::execute_futures_in_parallel;
2324
use common_catalog::table_context::TableContext;
2425
use common_exception::ErrorCode;
2526
use common_exception::Result;
@@ -37,7 +38,6 @@ use tracing::info;
3738
use tracing::warn;
3839
use tracing::Instrument;
3940

40-
use crate::io::execute_futures_in_parallel;
4141
use crate::io::MetaReaders;
4242
use crate::io::SnapshotHistoryReader;
4343
use crate::io::TableMetaLocationGenerator;
@@ -168,9 +168,12 @@ impl SnapshotsIO {
168168
})
169169
});
170170

171+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
172+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
171173
execute_futures_in_parallel(
172-
self.ctx.clone(),
173174
tasks,
175+
threads_nums,
176+
permit_nums,
174177
"fuse-req-snapshots-worker".to_owned(),
175178
)
176179
.await

src/query/storages/fuse/src/operations/merge_into/processors/transform_mutation_aggregator.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
use std::sync::Arc;
1717

18+
use common_base::runtime::execute_futures_in_parallel;
1819
use common_catalog::table_context::TableContext;
1920
use common_exception::ErrorCode;
2021
use common_exception::Result;
@@ -30,7 +31,6 @@ use storages_common_table_meta::meta::Location;
3031
use storages_common_table_meta::meta::SegmentInfo;
3132
use tracing::debug;
3233

33-
use crate::io::execute_futures_in_parallel;
3434
use crate::io::SegmentsIO;
3535
use crate::io::TableMetaLocationGenerator;
3636
use crate::operations::merge_into::mutation_meta::mutation_log::CommitMeta;
@@ -148,10 +148,10 @@ impl TableMutationAggregator {
148148

149149
// TODO use batch_meta_writer
150150
async fn write_segments(&self, segments: Vec<SerializedSegment>) -> Result<()> {
151-
let mut handles = Vec::with_capacity(segments.len());
151+
let mut tasks = Vec::with_capacity(segments.len());
152152
for segment in segments {
153153
let op = self.dal.clone();
154-
handles.push(async move {
154+
tasks.push(async move {
155155
op.write(&segment.path, segment.raw_data).await?;
156156
if let Some(segment_cache) = CacheManager::instance().get_table_segment_cache() {
157157
segment_cache.put(segment.path.clone(), segment.segment.clone());
@@ -160,9 +160,12 @@ impl TableMutationAggregator {
160160
});
161161
}
162162

163+
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
164+
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
163165
execute_futures_in_parallel(
164-
self.ctx.clone(),
165-
handles,
166+
tasks,
167+
threads_nums,
168+
permit_nums,
166169
"mutation-write-segments-worker".to_owned(),
167170
)
168171
.await?

0 commit comments

Comments
 (0)