Skip to content

Commit 1ea2e2f

Browse files
committed
feat: ParquetTable support injecting file list.
1 parent 128b0ef commit 1ea2e2f

File tree

6 files changed

+50
-15
lines changed

6 files changed

+50
-15
lines changed

src/query/catalog/src/plan/datasource/datasource_info/parquet.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use common_arrow::arrow::datatypes::Schema as ArrowSchema;
1818
use common_expression::TableSchema;
1919
use common_meta_app::principal::StageInfo;
2020
use common_meta_app::schema::TableInfo;
21+
use common_storage::StageFileInfo;
2122
use common_storage::StageFilesInfo;
2223

2324
use crate::plan::datasource::datasource_info::parquet_read_options::ParquetReadOptions;
@@ -30,6 +31,7 @@ pub struct ParquetTableInfo {
3031

3132
pub table_info: TableInfo,
3233
pub arrow_schema: ArrowSchema,
34+
pub files_to_read: Option<Vec<StageFileInfo>>,
3335
}
3436

3537
impl ParquetTableInfo {

src/query/sql/src/planner/binder/table.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use common_functions::scalars::BUILTIN_FUNCTIONS;
4646
use common_meta_app::principal::StageFileFormatType;
4747
use common_meta_app::principal::StageInfo;
4848
use common_storage::DataOperator;
49+
use common_storage::StageFileInfo;
4950
use common_storage::StageFilesInfo;
5051
use common_storages_parquet::ParquetTable;
5152
use common_storages_result_cache::ResultCacheMetaManager;
@@ -380,7 +381,7 @@ impl Binder {
380381
pattern: options.pattern.clone(),
381382
files: options.files.clone(),
382383
};
383-
self.bind_stage_table(bind_context, stage_info, files_info, alias)
384+
self.bind_stage_table(bind_context, stage_info, files_info, alias, None)
384385
.await
385386
}
386387
}
@@ -392,14 +393,17 @@ impl Binder {
392393
stage_info: StageInfo,
393394
files_info: StageFilesInfo,
394395
alias: &Option<TableAlias>,
396+
files_to_copy: Option<Vec<StageFileInfo>>,
395397
) -> Result<(SExpr, BindContext)> {
396398
if matches!(
397399
stage_info.file_format_options.format,
398400
StageFileFormatType::Parquet
399401
) {
400402
let read_options = ParquetReadOptions::default();
401403

402-
let table = ParquetTable::create(stage_info.clone(), files_info, read_options).await?;
404+
let table =
405+
ParquetTable::create(stage_info.clone(), files_info, read_options, files_to_copy)
406+
.await?;
403407

404408
let table_alias_name = if let Some(table_alias) = alias {
405409
Some(normalize_identifier(&table_alias.name, &self.name_resolution_ctx).name)

src/query/storages/parquet/src/parquet_table/blocking.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use common_catalog::table::Table;
2121
use common_exception::ErrorCode;
2222
use common_exception::Result;
2323
use common_meta_app::principal::StageInfo;
24+
use common_storage::StageFileInfo;
2425
use common_storage::StageFilesInfo;
2526
use opendal::Operator;
2627

@@ -33,9 +34,14 @@ impl ParquetTable {
3334
read_options: ParquetReadOptions,
3435
stage_info: StageInfo,
3536
files_info: StageFilesInfo,
37+
files_to_read: Option<Vec<StageFileInfo>>,
3638
) -> Result<Arc<dyn Table>> {
37-
let first_file = files_info.blocking_first_file(&operator)?;
38-
let arrow_schema = Self::blocking_prepare_metas(&first_file.path, operator.clone())?;
39+
let first_file = match &files_to_read {
40+
Some(files) => files[0].path.clone(),
41+
None => files_info.blocking_first_file(&operator)?.path,
42+
};
43+
44+
let arrow_schema = Self::blocking_prepare_metas(&first_file, operator.clone())?;
3945

4046
let table_info = create_parquet_table_info(arrow_schema.clone());
4147

@@ -46,6 +52,7 @@ impl ParquetTable {
4652
read_options,
4753
stage_info,
4854
files_info,
55+
files_to_read,
4956
}))
5057
}
5158

src/query/storages/parquet/src/parquet_table/non_blocking.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use common_exception::ErrorCode;
2222
use common_exception::Result;
2323
use common_meta_app::principal::StageInfo;
2424
use common_storage::init_stage_operator;
25+
use common_storage::StageFileInfo;
2526
use common_storage::StageFilesInfo;
2627
use opendal::Operator;
2728

@@ -33,14 +34,24 @@ impl ParquetTable {
3334
stage_info: StageInfo,
3435
files_info: StageFilesInfo,
3536
read_options: ParquetReadOptions,
37+
files_to_read: Option<Vec<StageFileInfo>>,
3638
) -> Result<Arc<dyn Table>> {
3739
let operator = init_stage_operator(&stage_info)?;
3840
if operator.info().can_blocking() {
39-
return Self::blocking_create(operator, read_options, stage_info, files_info);
41+
return Self::blocking_create(
42+
operator,
43+
read_options,
44+
stage_info,
45+
files_info,
46+
files_to_read,
47+
);
4048
}
41-
let first_file = files_info.first_file(&operator).await?;
49+
let first_file = match &files_to_read {
50+
Some(files) => files[0].path.clone(),
51+
None => files_info.first_file(&operator).await?.path.clone(),
52+
};
4253

43-
let arrow_schema = Self::prepare_metas(&first_file.path, operator.clone()).await?;
54+
let arrow_schema = Self::prepare_metas(&first_file, operator.clone()).await?;
4455

4556
let table_info = create_parquet_table_info(arrow_schema.clone());
4657

@@ -51,6 +62,7 @@ impl ParquetTable {
5162
read_options,
5263
stage_info,
5364
files_info,
65+
files_to_read,
5466
}))
5567
}
5668

src/query/storages/parquet/src/parquet_table/partition.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,20 @@ impl ParquetTable {
107107
None
108108
};
109109

110-
let file_locations = if self.operator.info().can_blocking() {
111-
self.files_info.blocking_list(&self.operator, false)
112-
} else {
113-
self.files_info.list(&self.operator, false).await
114-
}?
115-
.into_iter()
116-
.map(|f| (f.path, f.metadata.content_length()))
117-
.collect::<Vec<_>>();
110+
let file_locations = match &self.files_to_read {
111+
Some(files) => files
112+
.iter()
113+
.map(|f| (f.path.clone(), f.size))
114+
.collect::<Vec<_>>(),
115+
None => if self.operator.info().can_blocking() {
116+
self.files_info.blocking_list(&self.operator, false)
117+
} else {
118+
self.files_info.list(&self.operator, false).await
119+
}?
120+
.into_iter()
121+
.map(|f| (f.path, f.metadata.content_length()))
122+
.collect::<Vec<_>>(),
123+
};
118124

119125
let pruner = PartitionPruner {
120126
schema,

src/query/storages/parquet/src/parquet_table/table.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use common_meta_app::schema::TableInfo;
3838
use common_meta_app::schema::TableMeta;
3939
use common_pipeline_core::Pipeline;
4040
use common_storage::init_stage_operator;
41+
use common_storage::StageFileInfo;
4142
use common_storage::StageFilesInfo;
4243
use opendal::Operator;
4344

@@ -50,6 +51,7 @@ pub struct ParquetTable {
5051

5152
pub(super) table_info: TableInfo,
5253
pub(super) arrow_schema: ArrowSchema,
54+
pub(super) files_to_read: Option<Vec<StageFileInfo>>,
5355
}
5456

5557
impl ParquetTable {
@@ -63,6 +65,7 @@ impl ParquetTable {
6365
read_options: info.read_options,
6466
stage_info: info.stage_info.clone(),
6567
files_info: info.files_info.clone(),
68+
files_to_read: info.files_to_read.clone(),
6669
}))
6770
}
6871
}
@@ -100,6 +103,7 @@ impl Table for ParquetTable {
100103
read_options: self.read_options,
101104
stage_info: self.stage_info.clone(),
102105
files_info: self.files_info.clone(),
106+
files_to_read: self.files_to_read.clone(),
103107
})
104108
}
105109

0 commit comments

Comments
 (0)