Skip to content

Commit 3a34bbf

Browse files
authored
fix: Schema mismatch issue that may occur due to case insensitivity when loading StageTable (#17949)
* fix: Schema mismatch issue that may occur due to case insensitivity when loading StageTable * chore: codefmt * chore: `RecordBatchTransformer` skips StableTable and has Tuple type * chore: add var `parquet_fast_read_bytes` on `case_sensitive_parquet.test` * chore: `RecordBatchTransformer` skip `StageTable`
1 parent 20e4079 commit 3a34bbf

File tree

12 files changed

+211
-125
lines changed

12 files changed

+211
-125
lines changed

โ€Žsrc/query/storages/delta/src/table.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use databend_common_storages_parquet::ParquetFilePart;
4646
use databend_common_storages_parquet::ParquetPart;
4747
use databend_common_storages_parquet::ParquetPruner;
4848
use databend_common_storages_parquet::ParquetReaderBuilder;
49+
use databend_common_storages_parquet::ParquetSourceType;
4950
use databend_storages_common_pruner::partition_prunner::FetchPartitionScalars;
5051
use databend_storages_common_pruner::partition_prunner::PartitionPruner;
5152
use databend_storages_common_table_meta::table::OPT_KEY_ENGINE_META;
@@ -254,7 +255,8 @@ impl DeltaTable {
254255
.with_pruner(Some(pruner))
255256
.with_partition_columns(self.meta.partition_columns.clone());
256257

257-
let parquet_reader = Arc::new(builder.build_full_reader(false)?);
258+
let parquet_reader =
259+
Arc::new(builder.build_full_reader(ParquetSourceType::DeltaLake, false)?);
258260

259261
let output_schema = Arc::new(DataSchema::from(plan.schema()));
260262
pipeline.add_source(

โ€Žsrc/query/storages/hive/hive/src/hive_table.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use databend_common_storage::init_operator;
5353
use databend_common_storage::DataOperator;
5454
use databend_common_storages_parquet::ParquetPruner;
5555
use databend_common_storages_parquet::ParquetReaderBuilder;
56+
use databend_common_storages_parquet::ParquetSourceType;
5657
use databend_storages_common_pruner::partition_prunner::PartitionPruner;
5758
use databend_storages_common_table_meta::meta::SnapshotId;
5859
use databend_storages_common_table_meta::meta::TableMetaTimestamps;
@@ -196,7 +197,7 @@ impl HiveTable {
196197
.with_pruner(Some(pruner))
197198
.with_partition_columns(partition_keys);
198199

199-
let parquet_reader = Arc::new(builder.build_full_reader(false)?);
200+
let parquet_reader = Arc::new(builder.build_full_reader(ParquetSourceType::Hive, false)?);
200201

201202
let output_schema = Arc::new(DataSchema::from(plan.schema()));
202203
pipeline.add_source(

โ€Žsrc/query/storages/iceberg/src/table.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -359,12 +359,11 @@ impl IcebergTable {
359359
builder = builder.with_topk(topk.as_ref());
360360
}
361361

362-
let row_group_reader = Arc::new(builder.build_row_group_reader(need_row_number)?);
362+
let row_group_reader = Arc::new(
363+
builder.build_row_group_reader(ParquetSourceType::Iceberg, need_row_number)?,
364+
);
363365

364366
let topk = Arc::new(topk);
365-
let projection =
366-
PushDownInfo::projection_of_push_downs(&table_schema, plan.push_downs.as_ref());
367-
let output_schema = Arc::new(projection.project_schema(&table_schema));
368367

369368
pipeline.add_source(
370369
|output| {
@@ -378,7 +377,6 @@ impl IcebergTable {
378377
internal_columns.clone(),
379378
plan.push_downs.clone(),
380379
table_schema.clone(),
381-
output_schema.clone(),
382380
op.clone(),
383381
)
384382
},

โ€Žsrc/query/storages/parquet/src/parquet_reader/reader/builder.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ use crate::parquet_reader::NoPretchPolicyBuilder;
4444
use crate::parquet_reader::ParquetWholeFileReader;
4545
use crate::parquet_reader::PredicateAndTopkPolicyBuilder;
4646
use crate::parquet_reader::TopkOnlyPolicyBuilder;
47+
use crate::transformer::RecordBatchTransformer;
4748
use crate::ParquetPruner;
49+
use crate::ParquetSourceType;
4850

4951
pub struct ParquetReaderBuilder<'a> {
5052
ctx: Arc<dyn TableContext>,
@@ -214,6 +216,7 @@ impl<'a> ParquetReaderBuilder<'a> {
214216

215217
pub fn build_full_reader(
216218
&mut self,
219+
source_type: ParquetSourceType,
217220
need_file_row_number: bool,
218221
) -> Result<ParquetWholeFileReader> {
219222
let batch_size = self.ctx.get_settings().get_parquet_max_block_size()? as usize;
@@ -231,6 +234,8 @@ impl<'a> ParquetReaderBuilder<'a> {
231234
.unwrap();
232235

233236
let (_, _, output_schema, _) = self.built_output.as_ref().unwrap();
237+
let transformer = (!matches!(source_type, ParquetSourceType::StageTable))
238+
.then(|| RecordBatchTransformer::build(output_schema.clone()));
234239
Ok(ParquetWholeFileReader {
235240
op_registry: self.op_registry.clone(),
236241
expect_file_schema: self
@@ -242,12 +247,17 @@ impl<'a> ParquetReaderBuilder<'a> {
242247
projection,
243248
field_paths,
244249
pruner: self.pruner.clone(),
250+
transformer,
245251
need_page_index: self.options.prune_pages(),
246252
batch_size,
247253
})
248254
}
249255

250-
pub fn build_row_group_reader(&mut self, need_file_row_number: bool) -> Result<RowGroupReader> {
256+
pub fn build_row_group_reader(
257+
&mut self,
258+
source_type: ParquetSourceType,
259+
need_file_row_number: bool,
260+
) -> Result<RowGroupReader> {
251261
let batch_size = self.ctx.get_settings().get_max_block_size()? as usize;
252262

253263
if !need_file_row_number {
@@ -256,6 +266,14 @@ impl<'a> ParquetReaderBuilder<'a> {
256266
}
257267
self.build_output()?;
258268

269+
let transformer = (!matches!(source_type, ParquetSourceType::StageTable))
270+
.then(|| {
271+
self.built_output.as_ref().map(|(_, _, output_schema, _)| {
272+
RecordBatchTransformer::build(output_schema.clone())
273+
})
274+
})
275+
.flatten();
276+
259277
let mut policy_builders = default_policy_builders();
260278
let default_policy = match (self.built_predicate.as_ref(), self.built_topk.as_ref()) {
261279
(Some(_), Some(_)) => {
@@ -292,6 +310,7 @@ impl<'a> ParquetReaderBuilder<'a> {
292310
schema_desc: self.schema_desc.clone(),
293311
policy_builders,
294312
default_policy,
313+
transformer,
295314
})
296315
}
297316

โ€Žsrc/query/storages/parquet/src/parquet_reader/reader/full_reader.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use crate::parquet_reader::predicate::ParquetPredicate;
4848
use crate::parquet_reader::utils::transform_record_batch;
4949
use crate::parquet_reader::utils::transform_record_batch_by_field_paths;
5050
use crate::parquet_reader::utils::FieldPaths;
51+
use crate::transformer::RecordBatchTransformer;
5152
use crate::ParquetPruner;
5253

5354
/// The reader to read a whole parquet file.
@@ -71,6 +72,8 @@ pub struct ParquetWholeFileReader {
7172

7273
pub(super) pruner: Option<ParquetPruner>,
7374

75+
pub(super) transformer: Option<RecordBatchTransformer>,
76+
7477
// Options
7578
pub(super) need_page_index: bool,
7679
pub(super) batch_size: usize,
@@ -226,20 +229,27 @@ impl ParquetWholeFileReader {
226229
}
227230
}
228231
let reader = builder.build()?;
232+
let mut transformer = self.transformer.clone();
229233
// Write `if` outside iteration to reduce branches.
230234
if let Some(field_paths) = self.field_paths.as_ref() {
231235
reader
232236
.into_iter()
233237
.map(|batch| {
234-
let batch = batch?;
238+
let mut batch = batch?;
239+
if let Some(transformer) = &mut transformer {
240+
batch = transformer.process_record_batch(batch)?
241+
}
235242
transform_record_batch_by_field_paths(&batch, field_paths)
236243
})
237244
.collect()
238245
} else {
239246
reader
240247
.into_iter()
241248
.map(|batch| {
242-
let batch = batch?;
249+
let mut batch = batch?;
250+
if let Some(transformer) = &mut transformer {
251+
batch = transformer.process_record_batch(batch)?
252+
}
243253
Ok(
244254
DataBlock::from_record_batch(&self.output_schema.as_ref().into(), &batch)?
245255
.0,

โ€Žsrc/query/storages/parquet/src/parquet_reader/reader/row_group_reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub struct RowGroupReader {
4242
pub(super) policy_builders: PolicyBuilders,
4343

4444
pub(super) schema_desc: SchemaDescPtr,
45+
pub(super) transformer: Option<RecordBatchTransformer>,
4546
// Options
4647
pub(super) batch_size: usize,
4748
}
@@ -62,7 +63,6 @@ impl RowGroupReader {
6263
read_settings: &ReadSettings,
6364
part: &ParquetRowGroupPart,
6465
topk_sorter: &mut Option<TopKSorter>,
65-
transformer: RecordBatchTransformer,
6666
) -> Result<Option<ReadPolicyImpl>> {
6767
if let Some((sorter, min_max)) = topk_sorter.as_ref().zip(part.sort_min_max.as_ref()) {
6868
if sorter.never_match(min_max) {
@@ -105,7 +105,7 @@ impl RowGroupReader {
105105
row_group,
106106
selection,
107107
topk_sorter,
108-
Some(transformer),
108+
self.transformer.clone(),
109109
self.batch_size,
110110
)
111111
.await

โ€Žsrc/query/storages/parquet/src/parquet_table/read.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::sync::Arc;
1616

1717
use databend_common_catalog::plan::DataSourcePlan;
1818
use databend_common_catalog::plan::InternalColumnType;
19-
use databend_common_catalog::plan::PushDownInfo;
2019
use databend_common_catalog::table::Table;
2120
use databend_common_catalog::table_context::TableContext;
2221
use databend_common_exception::Result;
@@ -52,9 +51,6 @@ impl ParquetTable {
5251
let need_row_number = internal_columns.contains(&InternalColumnType::FileRowNumber);
5352

5453
let table_schema: TableSchemaRef = self.table_info.schema();
55-
let projection =
56-
PushDownInfo::projection_of_push_downs(&table_schema, plan.push_downs.as_ref());
57-
let output_schema = Arc::new(projection.project_schema(&table_schema));
5854
// If there is a `ParquetFilePart`, we should create pruner for it.
5955
// Although `ParquetFilePart`s are always staying at the end of `parts` when `do_read_partitions`,
6056
// but parts are reshuffled when `redistribute_source_fragment`, so let us check all of them.
@@ -100,9 +96,14 @@ impl ParquetTable {
10096
builder = builder.with_pruner(pruner).with_topk(topk.as_ref());
10197
}
10298

103-
let row_group_reader = Arc::new(builder.build_row_group_reader(need_row_number)?);
99+
let row_group_reader = Arc::new(
100+
builder.build_row_group_reader(ParquetSourceType::StageTable, need_row_number)?,
101+
);
104102
let full_file_reader = if has_file_part {
105-
Some(Arc::new(builder.build_full_reader(need_row_number)?))
103+
Some(Arc::new(builder.build_full_reader(
104+
ParquetSourceType::StageTable,
105+
need_row_number,
106+
)?))
106107
} else {
107108
None
108109
};
@@ -120,7 +121,6 @@ impl ParquetTable {
120121
internal_columns.clone(),
121122
plan.push_downs.clone(),
122123
table_schema.clone(),
123-
output_schema.clone(),
124124
op.clone(),
125125
)
126126
},

โ€Žsrc/query/storages/parquet/src/source.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ use crate::parquet_reader::ParquetWholeFileReader;
5757
use crate::parquet_reader::RowGroupReader;
5858
use crate::partition::ParquetRowGroupPart;
5959
use crate::read_settings::ReadSettings;
60-
use crate::transformer::RecordBatchTransformer;
6160
use crate::ParquetFilePart;
6261
use crate::ParquetPart;
6362
use crate::ParquetReaderBuilder;
@@ -74,7 +73,8 @@ pub enum ParquetSourceType {
7473
StageTable,
7574
ResultCache,
7675
Iceberg,
77-
// DeltaLake,
76+
DeltaLake,
77+
Hive,
7878
}
7979

8080
pub struct ParquetSource {
@@ -107,7 +107,6 @@ pub struct ParquetSource {
107107
push_downs: Option<PushDownInfo>,
108108
topk: Arc<Option<TopK>>,
109109
op_registry: Arc<dyn OperatorRegistry>,
110-
transformer: RecordBatchTransformer,
111110
}
112111

113112
impl ParquetSource {
@@ -122,7 +121,6 @@ impl ParquetSource {
122121
internal_columns: Vec<InternalColumnType>,
123122
push_downs: Option<PushDownInfo>,
124123
table_schema: TableSchemaRef,
125-
output_schema: TableSchemaRef,
126124
op_registry: Arc<dyn OperatorRegistry>,
127125
) -> Result<ProcessorPtr> {
128126
let scan_progress = ctx.get_scan_progress();
@@ -133,7 +131,6 @@ impl ParquetSource {
133131
.as_ref()
134132
.as_ref()
135133
.map(|t| TopKSorter::new(t.limit, t.asc));
136-
let transformer = RecordBatchTransformer::build(output_schema);
137134

138135
Ok(ProcessorPtr::create(Box::new(Self {
139136
source_type,
@@ -153,7 +150,6 @@ impl ParquetSource {
153150
push_downs,
154151
topk,
155152
op_registry,
156-
transformer,
157153
})))
158154
}
159155
}
@@ -284,7 +280,6 @@ impl Processor for ParquetSource {
284280
),
285281
part,
286282
&mut self.topk_sorter,
287-
self.transformer.clone(),
288283
)
289284
.await?
290285
{
@@ -355,7 +350,6 @@ impl ParquetSource {
355350
part.file.as_str(),
356351
)?;
357352
}
358-
self.transformer.match_by_field_name(from_stage_table);
359353
// The schema of the table in iceberg may be inconsistent with the schema in parquet
360354
let reader = if self.row_group_reader.schema_desc().root_schema()
361355
!= meta.file_metadata().schema_descr().root_schema()
@@ -381,7 +375,9 @@ impl ParquetSource {
381375
builder = builder.with_topk(self.topk.as_ref().as_ref());
382376
}
383377

384-
Cow::Owned(Arc::new(builder.build_row_group_reader(need_row_number)?))
378+
Cow::Owned(Arc::new(
379+
builder.build_row_group_reader(self.source_type, need_row_number)?,
380+
))
385381
} else {
386382
Cow::Borrowed(&self.row_group_reader)
387383
};
@@ -408,7 +404,6 @@ impl ParquetSource {
408404
&ReadSettings::from_ctx(&self.ctx)?.with_enable_cache(!from_stage_table),
409405
&part,
410406
&mut self.topk_sorter,
411-
self.transformer.clone(),
412407
)
413408
.await?;
414409

0 commit comments

Comments
ย (0)