Skip to content

Commit 9f2130f

Browse files
authored
Merge pull request #10695 from youngsofun/fix
refactor(copy): calc suitable parallelism for reading parquet splits.
2 parents 97a27b8 + 8963447 commit 9f2130f

File tree

2 files changed

+109
-50
lines changed

2 files changed

+109
-50
lines changed

src/query/pipeline/sources/src/input_formats/impls/input_format_parquet.rs

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use common_arrow::parquet::metadata::ColumnChunkMetaData;
3434
use common_arrow::parquet::metadata::FileMetaData;
3535
use common_arrow::parquet::metadata::RowGroupMetaData;
3636
use common_arrow::parquet::read::read_metadata;
37-
use common_arrow::read_columns_async;
3837
use common_exception::ErrorCode;
3938
use common_exception::Result;
4039
use common_expression::DataBlock;
@@ -48,7 +47,9 @@ use common_settings::Settings;
4847
use common_storage::read_parquet_metas_in_parallel;
4948
use common_storage::StageFileInfo;
5049
use futures::AsyncRead;
50+
use futures::AsyncReadExt;
5151
use futures::AsyncSeek;
52+
use futures::AsyncSeekExt;
5253
use opendal::Operator;
5354
use serde::Deserializer;
5455
use serde::Serializer;
@@ -174,16 +175,9 @@ impl InputFormatPipe for ParquetFormatPipe {
174175
) -> Result<Self::RowBatch> {
175176
let meta = Self::get_split_meta(&split_info).expect("must success");
176177
let op = ctx.source.get_operator()?;
177-
let mut reader = op.reader(&split_info.file.path).await?;
178178
let input_fields = Arc::new(get_used_fields(&meta.file.fields, &ctx.schema)?);
179179

180-
RowGroupInMemory::read_async(
181-
split_info.to_string(),
182-
&mut reader,
183-
meta.meta.clone(),
184-
input_fields,
185-
)
186-
.await
180+
RowGroupInMemory::read_async(split_info.clone(), op, meta.meta.clone(), input_fields).await
187181
}
188182
}
189183

@@ -269,25 +263,48 @@ impl RowGroupInMemory {
269263
})
270264
}
271265

272-
async fn read_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
273-
split_info: String,
274-
reader: &mut R,
266+
async fn read_field_async(
267+
op: Operator,
268+
path: String,
269+
col_metas: Vec<ColumnChunkMetaData>,
270+
index: usize,
271+
) -> Result<(usize, Vec<Vec<u8>>)> {
272+
let mut cols = Vec::with_capacity(col_metas.len());
273+
let mut reader = op.reader(&path).await?;
274+
for meta in &col_metas {
275+
cols.push(read_single_column_async(&mut reader, meta).await?)
276+
}
277+
Ok((index, cols))
278+
}
279+
280+
async fn read_async(
281+
split_info: Arc<SplitInfo>,
282+
operator: Operator,
275283
meta: RowGroupMetaData,
276284
fields: Arc<Vec<Field>>,
277285
) -> Result<Self> {
278286
let field_names = fields.iter().map(|x| x.name.as_str()).collect::<Vec<_>>();
279287
let field_meta_indexes = split_column_metas_by_field(meta.columns(), &field_names);
280-
let mut filed_arrays = vec![];
281-
for field_name in field_names {
282-
let meta_data = read_columns_async(reader, meta.columns(), field_name).await?;
283-
let data = meta_data.into_iter().map(|t| t.1).collect::<Vec<_>>();
284-
filed_arrays.push(data)
288+
let mut join_handlers = Vec::with_capacity(field_names.len());
289+
for (i, field_name) in field_names.iter().enumerate() {
290+
let col_metas = get_field_columns(meta.columns(), field_name)
291+
.into_iter()
292+
.cloned()
293+
.collect::<Vec<_>>();
294+
let op = operator.clone();
295+
let path = split_info.file.path.clone();
296+
join_handlers.push(async move { Self::read_field_async(op, path, col_metas, i).await });
285297
}
298+
299+
let mut field_arrays = futures::future::try_join_all(join_handlers).await?;
300+
field_arrays.sort();
301+
let field_arrays = field_arrays.into_iter().map(|t| t.1).collect::<Vec<_>>();
302+
286303
Ok(Self {
287-
split_info,
304+
split_info: split_info.to_string(),
288305
meta,
289306
field_meta_indexes,
290-
field_arrays: filed_arrays,
307+
field_arrays,
291308
fields_to_read: fields,
292309
})
293310
}
@@ -489,3 +506,27 @@ pub fn split_column_metas_by_field(
489506
});
490507
r
491508
}
509+
510+
fn get_field_columns<'a>(
511+
columns: &'a [ColumnChunkMetaData],
512+
field_name: &str,
513+
) -> Vec<&'a ColumnChunkMetaData> {
514+
columns
515+
.iter()
516+
.filter(|x| x.descriptor().path_in_schema[0] == field_name)
517+
.collect()
518+
}
519+
520+
async fn read_single_column_async<R>(
521+
reader: &mut R,
522+
meta: &ColumnChunkMetaData,
523+
) -> Result<Vec<u8>>
524+
where
525+
R: AsyncRead + AsyncSeek + Send + Unpin,
526+
{
527+
let (start, len) = meta.byte_range();
528+
reader.seek(std::io::SeekFrom::Start(start)).await?;
529+
let mut chunk = vec![0; len as usize];
530+
reader.read_exact(&mut chunk).await?;
531+
Ok(chunk)
532+
}

src/query/pipeline/sources/src/input_formats/input_pipeline.rs

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::sync::Arc;
1818
use common_base::base::tokio;
1919
use common_base::base::tokio::sync::mpsc::Receiver;
2020
use common_base::base::tokio::sync::mpsc::Sender;
21-
use common_base::runtime::CatchUnwindFuture;
2221
use common_base::runtime::GlobalIORuntime;
2322
use common_base::runtime::TrySpawn;
2423
use common_compress::CompressAlgorithm;
@@ -27,9 +26,8 @@ use common_exception::Result;
2726
use common_expression::DataBlock;
2827
use common_pipeline_core::Pipeline;
2928
use futures::AsyncRead;
30-
use futures_util::stream::FuturesUnordered;
3129
use futures_util::AsyncReadExt;
32-
use futures_util::StreamExt;
30+
use parking_lot::Mutex;
3331

3432
use crate::input_formats::Aligner;
3533
use crate::input_formats::BeyondEndReader;
@@ -191,39 +189,59 @@ pub trait InputFormatPipe: Sized + Send + 'static {
191189
}
192190

193191
fn execute_copy_aligned(ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
194-
let (data_tx, data_rx) = async_channel::bounded(ctx.num_prefetch_splits()?);
192+
let (data_tx, data_rx) = async_channel::bounded(1);
195193
Self::build_pipeline_aligned(&ctx, data_rx, pipeline)?;
196-
197-
let ctx_clone = ctx.clone();
198-
let p = 3;
199-
GlobalIORuntime::instance().spawn(async move {
200-
for splits in ctx_clone.splits.chunks(p) {
201-
let ctx_clone2 = ctx_clone.clone();
202-
let row_batch_tx = data_tx.clone();
203-
let splits = splits.to_owned().clone();
204-
tokio::spawn(async move {
205-
let mut futs = FuturesUnordered::new();
206-
for s in splits.into_iter() {
207-
let fut =
208-
CatchUnwindFuture::create(Self::read_split(ctx_clone2.clone(), s));
209-
futs.push(fut);
210-
}
211-
while let Some(row_batch) = futs.next().await {
212-
match row_batch {
213-
Ok(row_batch) => {
214-
if row_batch_tx.send(row_batch).await.is_err() {
215-
break;
216-
}
217-
}
218-
Err(cause) => {
219-
row_batch_tx.send(Err(cause)).await.ok();
194+
let max_storage_io_requests = ctx.settings.get_max_storage_io_requests()?;
195+
let per_split_io = ctx.schema.fields().len();
196+
let max_splits = max_storage_io_requests as usize / per_split_io;
197+
let mut max_splits = std::cmp::max(max_splits, 1);
198+
let mut sizes = ctx.splits.iter().map(|s| s.size).collect::<Vec<usize>>();
199+
sizes.sort_by(|a, b| b.cmp(a));
200+
let max_memory = ctx.settings.get_max_memory_usage()? as usize;
201+
let mut mem = 0;
202+
for (i, s) in sizes.iter().enumerate() {
203+
let m = mem + s;
204+
if m > max_memory {
205+
max_splits = std::cmp::min(max_splits, std::cmp::max(i, 1));
206+
break;
207+
} else {
208+
mem = m
209+
}
210+
}
211+
tracing::info!(
212+
"copy read {max_splits} splits in parallel, according to max_memory={max_memory}, num_fields={per_split_io}, max_storage_io_requests={max_storage_io_requests}, max_split_size={}",
213+
sizes[0]
214+
);
215+
let splits = ctx.splits.to_vec();
216+
let splits = Arc::new(Mutex::new(splits));
217+
for _ in 0..max_splits {
218+
let splits = splits.clone();
219+
let ctx_clone = ctx.clone();
220+
let data_tx = data_tx.clone();
221+
GlobalIORuntime::instance().spawn(async move {
222+
loop {
223+
let split = {
224+
let mut splits = splits.lock();
225+
if let Some(split) = splits.pop() {
226+
split
227+
} else {
228+
break;
229+
}
230+
};
231+
match Self::read_split(ctx_clone.clone(), split.clone()).await {
232+
Ok(row_batch) => {
233+
if data_tx.send(Ok(row_batch)).await.is_err() {
220234
break;
221235
}
222236
}
237+
Err(cause) => {
238+
data_tx.send(Err(cause)).await.ok();
239+
break;
240+
}
223241
}
224-
});
225-
}
226-
});
242+
}
243+
});
244+
}
227245
Ok(())
228246
}
229247

0 commit comments

Comments
 (0)