Skip to content

Commit 5f01cb3

Browse files
authored
feat: Supports reading table data from Iceberg ORC (#17898)
* feat: Supports reading table data from Iceberg ORC Signed-off-by: Kould <[email protected]> * test: add ci logic test for iceberg Signed-off-by: Kould <[email protected]> * feat: impl Projection on Iceberg Orc Signed-off-by: Kould <[email protected]> * fix: ci fail Signed-off-by: Kould <[email protected]> * chore: codefmt Signed-off-by: Kould <[email protected]> * chore clean `iceberg_test` ci * feat: Supports reading table data from Iceberg ORC Signed-off-by: Kould <[email protected]> * test: add ci logic test for iceberg Signed-off-by: Kould <[email protected]> * feat: impl Projection on Iceberg Orc Signed-off-by: Kould <[email protected]> * fix: ci fail Signed-off-by: Kould <[email protected]> * chore: codefmt Signed-off-by: Kould <[email protected]> * chore clean `iceberg_test` ci * fix: Parquet schema is inconsistent when querying iceberg table * feat: impl `RecordBatchTransformer` to convert the `RecordBatch` after reading Parquet into a Block consistent with the Schema of `ParquetSource` * chore: `RecordBatchTransformer` match by name on `StageTable` * chore: `RecordBatchTransformer` match by name on `StageTable` * chore: resolve conflicts * fix: getting wrong column_stats when reading in Iceberg * chore: remove stage test on base.test --------- Signed-off-by: Kould <[email protected]>
1 parent f637c23 commit 5f01cb3

File tree

24 files changed

+979
-84
lines changed

24 files changed

+979
-84
lines changed

.github/actions/test_sqllogic_iceberg_tpch/action.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ runs:
3131
3232
pip install pyspark
3333
python3 tests/sqllogictests/scripts/prepare_iceberg_tpch_data.py
34+
python3 tests/sqllogictests/scripts/prepare_iceberg_test_data.py
3435
3536
3637
- name: Run sqllogic Tests with Standalone lib

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/storages/iceberg/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ databend-common-meta-app = { workspace = true }
2020
databend-common-meta-types = { workspace = true }
2121
databend-common-pipeline-core = { workspace = true }
2222
databend-common-pipeline-sources = { workspace = true }
23+
databend-common-pipeline-transforms = { workspace = true }
2324
databend-common-storage = { workspace = true }
25+
databend-common-storages-orc = { workspace = true }
2426
databend-common-storages-parquet = { workspace = true }
2527
databend-storages-common-cache = { workspace = true }
28+
databend-storages-common-stage = { workspace = true }
2629
databend-storages-common-table-meta = { workspace = true }
2730
educe = { workspace = true }
2831
fastrace = { workspace = true }

src/query/storages/iceberg/src/partition.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,30 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use databend_common_catalog::plan::PartInfo;
1516
use databend_common_storages_parquet::ParquetFilePart;
1617
use databend_common_storages_parquet::ParquetPart;
18+
use databend_storages_common_stage::SingleFilePartition;
19+
use iceberg::spec::DataFileFormat;
1720

18-
pub(crate) fn convert_file_scan_task(task: iceberg::scan::FileScanTask) -> ParquetPart {
19-
let file = ParquetFilePart {
20-
file: task.data_file_path.clone(),
21-
compressed_size: task.length,
22-
estimated_uncompressed_size: task.length * 5,
23-
dedup_key: format!("{}_{}", task.data_file_path, task.length),
24-
};
25-
ParquetPart::File(file)
21+
pub(crate) fn convert_file_scan_task(task: iceberg::scan::FileScanTask) -> Box<dyn PartInfo> {
22+
match task.data_file_format {
23+
DataFileFormat::Orc => {
24+
let part = SingleFilePartition {
25+
path: task.data_file_path.clone(),
26+
size: task.length as usize,
27+
};
28+
Box::new(part)
29+
}
30+
DataFileFormat::Parquet => {
31+
let file = ParquetFilePart {
32+
file: task.data_file_path.clone(),
33+
compressed_size: task.length,
34+
estimated_uncompressed_size: task.length * 5,
35+
dedup_key: format!("{}_{}", task.data_file_path, task.length),
36+
};
37+
Box::new(ParquetPart::File(file))
38+
}
39+
_ => unimplemented!(),
40+
}
2641
}

src/query/storages/iceberg/src/statistics.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,7 @@ fn get_column_stats(
199199
upper: &HashMap<i32, Datum>,
200200
null_counts: &HashMap<i32, u64>,
201201
) -> BasicColumnStatistics {
202-
// The column id in iceberg is 1-based while the column id in Databend is 0-based.
203-
let iceberg_col_id = field.column_id as i32 + 1;
202+
let iceberg_col_id = field.column_id as i32;
204203
BasicColumnStatistics {
205204
min: lower
206205
.get(&iceberg_col_id)

src/query/storages/iceberg/src/table.rs

Lines changed: 176 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ use std::collections::BTreeMap;
1717
use std::collections::HashMap;
1818
use std::sync::Arc;
1919

20+
use arrow_schema::Schema;
2021
use async_trait::async_trait;
2122
use chrono::Utc;
2223
use databend_common_catalog::catalog::StorageDescription;
2324
use databend_common_catalog::plan::DataSourcePlan;
2425
use databend_common_catalog::plan::InternalColumnType;
2526
use databend_common_catalog::plan::ParquetReadOptions;
26-
use databend_common_catalog::plan::PartInfo;
2727
use databend_common_catalog::plan::PartStatistics;
2828
use databend_common_catalog::plan::Partitions;
2929
use databend_common_catalog::plan::PartitionsShuffleKind;
@@ -39,12 +39,18 @@ use databend_common_catalog::table_context::AbortChecker;
3939
use databend_common_catalog::table_context::TableContext;
4040
use databend_common_exception::ErrorCode;
4141
use databend_common_exception::Result;
42+
use databend_common_expression::ColumnId;
43+
use databend_common_expression::DataSchema;
44+
use databend_common_expression::TableField;
4245
use databend_common_expression::TableSchema;
4346
use databend_common_meta_app::schema::CatalogInfo;
4447
use databend_common_meta_app::schema::TableIdent;
4548
use databend_common_meta_app::schema::TableInfo;
4649
use databend_common_meta_app::schema::TableMeta;
4750
use databend_common_pipeline_core::Pipeline;
51+
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
52+
use databend_common_storages_orc::ORCSource;
53+
use databend_common_storages_orc::StripeDecoder;
4854
use databend_common_storages_parquet::ParquetReaderBuilder;
4955
use databend_common_storages_parquet::ParquetSource;
5056
use databend_common_storages_parquet::ParquetSourceType;
@@ -58,6 +64,10 @@ use crate::predicate::PredicateBuilder;
5864
use crate::statistics;
5965
use crate::statistics::IcebergStatistics;
6066

67+
const ICEBERG_TABLE_FORMAT_OPT: &str = "write.format.default";
68+
const ICEBERG_TABLE_FORMAT_OPT_ORC: &str = "orc";
69+
const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id";
70+
6171
pub const ICEBERG_ENGINE: &str = "ICEBERG";
6272

6373
/// accessor wrapper as a table
@@ -113,7 +123,24 @@ impl IcebergTable {
113123
let arrow_schema = schema_to_arrow_schema(meta.current_schema().as_ref()).map_err(|e| {
114124
ErrorCode::ReadTableDataError(format!("Cannot convert table metadata: {e:?}"))
115125
})?;
116-
TableSchema::try_from(&arrow_schema)
126+
let mut fields = Vec::with_capacity(arrow_schema.fields().len());
127+
128+
for arrow_f in arrow_schema.fields().iter() {
129+
let field_id: ColumnId = arrow_f
130+
.metadata()
131+
.get(PARQUET_FIELD_ID_META_KEY)
132+
.map(|s| s.parse::<ColumnId>())
133+
.transpose()?
134+
.unwrap_or(0);
135+
let mut field = TableField::try_from(arrow_f.as_ref())?;
136+
field.column_id = field_id;
137+
fields.push(field);
138+
}
139+
Ok(TableSchema {
140+
fields,
141+
metadata: arrow_schema.metadata().clone().into_iter().collect(),
142+
next_column_id: 0,
143+
})
117144
}
118145

119146
/// build_engine_options will generate `engine_options` from [`iceberg::table::Table`] so that
@@ -275,51 +302,90 @@ impl IcebergTable {
275302
.collect::<Vec<_>>()
276303
})
277304
.unwrap_or_default();
278-
let need_row_number = internal_columns.contains(&InternalColumnType::FileRowNumber);
279305
let table_schema = self.info.schema();
280-
let arrow_schema = table_schema.as_ref().into();
281306

282307
let max_threads = ctx.get_settings().get_max_threads()? as usize;
283-
let topk = plan
284-
.push_downs
285-
.as_ref()
286-
.and_then(|p| p.top_k(&self.schema()));
287-
288-
let read_options = ParquetReadOptions::default()
289-
.with_prune_row_groups(true)
290-
.with_prune_pages(false);
291-
292-
let op = self.table.file_io().clone();
293-
let mut builder = ParquetReaderBuilder::create(
294-
ctx.clone(),
295-
Arc::new(op),
296-
table_schema.clone(),
297-
arrow_schema,
298-
)?
299-
.with_options(read_options)
300-
.with_push_downs(plan.push_downs.as_ref());
301-
302-
if !need_row_number {
303-
builder = builder.with_topk(topk.as_ref());
304-
}
308+
let op = self.table.file_io();
309+
if let Some(true) = self
310+
.table
311+
.metadata()
312+
.properties()
313+
.get(ICEBERG_TABLE_FORMAT_OPT)
314+
.map(|format| format.as_str() == ICEBERG_TABLE_FORMAT_OPT_ORC)
315+
{
316+
let projection =
317+
PushDownInfo::projection_of_push_downs(&table_schema, plan.push_downs.as_ref());
318+
let data_schema: DataSchema = Arc::new(projection.project_schema(&table_schema)).into();
319+
let arrow_schema = Arc::new(Self::convert_orc_schema(&Schema::from(&data_schema)));
320+
let data_schema = Arc::new(data_schema);
321+
pipeline.add_source(
322+
|output| {
323+
ORCSource::try_create(
324+
output,
325+
ctx.clone(),
326+
Arc::new(op.clone()),
327+
arrow_schema.clone(),
328+
None,
329+
projection.clone(),
330+
)
331+
},
332+
max_threads,
333+
)?;
334+
pipeline.try_resize(max_threads)?;
335+
pipeline.add_accumulating_transformer(|| {
336+
StripeDecoder::new(ctx.clone(), data_schema.clone(), arrow_schema.clone())
337+
});
338+
} else {
339+
let arrow_schema: Schema = table_schema.as_ref().into();
340+
let need_row_number = internal_columns.contains(&InternalColumnType::FileRowNumber);
341+
let topk = plan
342+
.push_downs
343+
.as_ref()
344+
.and_then(|p| p.top_k(&self.schema()));
345+
let read_options = ParquetReadOptions::default()
346+
.with_prune_row_groups(true)
347+
.with_prune_pages(false);
348+
let op = Arc::new(op.clone());
349+
let mut builder = ParquetReaderBuilder::create(
350+
ctx.clone(),
351+
op.clone(),
352+
table_schema.clone(),
353+
arrow_schema.clone(),
354+
)?
355+
.with_options(read_options)
356+
.with_push_downs(plan.push_downs.as_ref());
357+
358+
if !need_row_number {
359+
builder = builder.with_topk(topk.as_ref());
360+
}
305361

306-
let row_group_reader = Arc::new(builder.build_row_group_reader(need_row_number)?);
307-
308-
let topk = Arc::new(topk);
309-
pipeline.add_source(
310-
|output| {
311-
ParquetSource::create(
312-
ctx.clone(),
313-
ParquetSourceType::Iceberg,
314-
output,
315-
row_group_reader.clone(),
316-
None,
317-
topk.clone(),
318-
internal_columns.clone(),
319-
)
320-
},
321-
max_threads,
322-
)
362+
let row_group_reader = Arc::new(builder.build_row_group_reader(need_row_number)?);
363+
364+
let topk = Arc::new(topk);
365+
let projection =
366+
PushDownInfo::projection_of_push_downs(&table_schema, plan.push_downs.as_ref());
367+
let output_schema = Arc::new(projection.project_schema(&table_schema));
368+
369+
pipeline.add_source(
370+
|output| {
371+
ParquetSource::create(
372+
ctx.clone(),
373+
ParquetSourceType::Iceberg,
374+
output,
375+
row_group_reader.clone(),
376+
None,
377+
topk.clone(),
378+
internal_columns.clone(),
379+
plan.push_downs.clone(),
380+
table_schema.clone(),
381+
output_schema.clone(),
382+
op.clone(),
383+
)
384+
},
385+
max_threads,
386+
)?
387+
}
388+
Ok(())
323389
}
324390

325391
#[fastrace::trace]
@@ -371,8 +437,7 @@ impl IcebergTable {
371437
.map(|v: iceberg::scan::FileScanTask| {
372438
read_rows += v.record_count.unwrap_or_default() as usize;
373439
read_bytes += v.length as usize;
374-
let part = convert_file_scan_task(v);
375-
Arc::new(Box::new(part) as Box<dyn PartInfo>)
440+
Arc::new(convert_file_scan_task(v))
376441
})
377442
.collect();
378443

@@ -381,6 +446,72 @@ impl IcebergTable {
381446
Partitions::create(PartitionsShuffleKind::Mod, parts),
382447
))
383448
}
449+
450+
fn convert_orc_schema(schema: &Schema) -> Schema {
451+
fn visit_field(field: &arrow_schema::FieldRef) -> arrow_schema::FieldRef {
452+
Arc::new(
453+
arrow_schema::Field::new(
454+
field.name(),
455+
visit_type(field.data_type()),
456+
field.is_nullable(),
457+
)
458+
.with_metadata(field.metadata().clone()),
459+
)
460+
}
461+
462+
// orc-rust is not compatible with UTF8 View
463+
fn visit_type(ty: &arrow_schema::DataType) -> arrow_schema::DataType {
464+
match ty {
465+
arrow_schema::DataType::Utf8View => arrow_schema::DataType::Utf8,
466+
arrow_schema::DataType::List(field) => {
467+
arrow_schema::DataType::List(visit_field(field))
468+
}
469+
arrow_schema::DataType::ListView(field) => {
470+
arrow_schema::DataType::ListView(visit_field(field))
471+
}
472+
arrow_schema::DataType::FixedSizeList(field, len) => {
473+
arrow_schema::DataType::FixedSizeList(visit_field(field), *len)
474+
}
475+
arrow_schema::DataType::LargeList(field) => {
476+
arrow_schema::DataType::LargeList(visit_field(field))
477+
}
478+
arrow_schema::DataType::LargeListView(field) => {
479+
arrow_schema::DataType::LargeListView(visit_field(field))
480+
}
481+
arrow_schema::DataType::Struct(fields) => {
482+
let visited_fields = fields.iter().map(visit_field).collect::<Vec<_>>();
483+
arrow_schema::DataType::Struct(arrow_schema::Fields::from(visited_fields))
484+
}
485+
arrow_schema::DataType::Union(fields, mode) => {
486+
let (ids, fields): (Vec<_>, Vec<_>) = fields
487+
.iter()
488+
.map(|(i, field)| (i, visit_field(field)))
489+
.unzip();
490+
arrow_schema::DataType::Union(
491+
arrow_schema::UnionFields::new(ids, fields),
492+
*mode,
493+
)
494+
}
495+
arrow_schema::DataType::Dictionary(key, value) => {
496+
arrow_schema::DataType::Dictionary(
497+
Box::new(visit_type(key)),
498+
Box::new(visit_type(value)),
499+
)
500+
}
501+
arrow_schema::DataType::Map(field, v) => {
502+
arrow_schema::DataType::Map(visit_field(field), *v)
503+
}
504+
ty => {
505+
debug_assert!(!ty.is_nested());
506+
ty.clone()
507+
}
508+
}
509+
}
510+
511+
let fields = schema.fields().iter().map(visit_field).collect::<Vec<_>>();
512+
513+
Schema::new(fields).with_metadata(schema.metadata().clone())
514+
}
384515
}
385516

386517
#[async_trait]

src/query/storages/orc/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,6 @@ mod table;
3434
mod utils;
3535

3636
pub use copy_into_table::OrcTableForCopy;
37+
pub use processors::decoder::StripeDecoder;
38+
pub use processors::source::ORCSource;
3739
pub use table::OrcTable;

0 commit comments

Comments
 (0)