Skip to content

Commit 1b41889

Browse files
committed
feat(query): support read nested struct in iceberg tables
1 parent 457b4fd commit 1b41889

File tree

12 files changed

+216
-378
lines changed

12 files changed

+216
-378
lines changed

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -312,13 +312,13 @@ hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio"
312312
lru = "0.12"
313313

314314
## in branch dev
315-
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403", features = [
315+
iceberg = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "a5e780ae", features = [
316316
"storage-all",
317317
] }
318-
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
319-
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
320-
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
321-
iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "32b1403" }
318+
iceberg-catalog-glue = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "a5e780ae" }
319+
iceberg-catalog-hms = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "a5e780ae" }
320+
iceberg-catalog-rest = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "a5e780ae" }
321+
iceberg-catalog-s3tables = { version = "0.4.0", git = "https://github.com/databendlabs/iceberg-rust", rev = "a5e780ae" }
322322

323323
# Explicitly specify compatible AWS SDK versions
324324
aws-config = "1.5.18"

pyproject.toml

Lines changed: 0 additions & 15 deletions
This file was deleted.

src/query/service/src/interpreters/interpreter_copy_into_location.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use std::sync::Arc;
1616

1717
use databend_common_base::runtime::GlobalIORuntime;
1818
use databend_common_exception::Result;
19-
use databend_common_expression::infer_table_schema;
2019
use databend_common_meta_app::schema::UpdateStreamMetaReq;
2120
use databend_common_pipeline::core::ExecutionInfo;
2221
use databend_storages_common_stage::CopyIntoLocationInfo;
@@ -88,8 +87,7 @@ impl CopyIntoLocationInterpreter {
8887
let (query_interpreter, update_stream_meta_req) = self.build_query(query).await?;
8988
let query_physical_plan = query_interpreter.build_physical_plan().await?;
9089
let query_result_schema = query_interpreter.get_result_schema();
91-
let table_schema = infer_table_schema(&query_result_schema)?;
92-
90+
let table_schema = query_interpreter.get_result_table_schema()?;
9391
let mut physical_plan = PhysicalPlan::new(CopyIntoLocation {
9492
input: query_physical_plan,
9593
project_columns: query_interpreter.get_result_columns(),

src/query/service/src/interpreters/interpreter_select.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ use databend_common_exception::Result;
2424
use databend_common_expression::DataField;
2525
use databend_common_expression::DataSchemaRef;
2626
use databend_common_expression::DataSchemaRefExt;
27+
use databend_common_expression::TableField;
2728
use databend_common_expression::TableSchemaRef;
29+
use databend_common_expression::TableSchemaRefExt;
30+
use databend_common_expression::infer_schema_type;
2831
use databend_common_expression::infer_table_schema;
2932
use databend_common_meta_app::schema::UpdateMultiTableMetaReq;
3033
use databend_common_meta_store::MetaStore;
@@ -36,6 +39,7 @@ use databend_common_pipeline::core::PipeItem;
3639
use databend_common_pipeline::core::Pipeline;
3740
use databend_common_pipeline_transforms::processors::TransformDummy;
3841
use databend_common_sql::ColumnBinding;
42+
use databend_common_sql::ColumnEntry;
3943
use databend_common_sql::MetadataRef;
4044
use databend_common_sql::executor::physical_plans::FragmentKind;
4145
use databend_common_sql::parse_result_scan_args;
@@ -113,6 +117,30 @@ impl SelectInterpreter {
113117
DataSchemaRefExt::create(fields)
114118
}
115119

120+
pub fn get_result_table_schema(&self) -> Result<TableSchemaRef> {
121+
let metadata = self.metadata.read();
122+
let mut fields = Vec::with_capacity(self.bind_context.columns.len());
123+
for column_binding in &self.bind_context.columns {
124+
let table_data_type = if column_binding.index < metadata.columns().len() {
125+
match metadata.column(column_binding.index) {
126+
ColumnEntry::BaseTableColumn(base) => base.data_type.clone(),
127+
ColumnEntry::VirtualColumn(virtual_column) => virtual_column.data_type.clone(),
128+
ColumnEntry::DerivedColumn(derived) => infer_schema_type(&derived.data_type)?,
129+
ColumnEntry::InternalColumn(internal) => {
130+
infer_schema_type(&internal.internal_column.data_type())?
131+
}
132+
}
133+
} else {
134+
infer_schema_type(column_binding.data_type.as_ref())?
135+
};
136+
fields.push(TableField::new(
137+
&column_binding.column_name,
138+
table_data_type,
139+
));
140+
}
141+
Ok(TableSchemaRefExt::create(fields))
142+
}
143+
116144
#[fastrace::trace(name = "SelectInterpreter::build_physical_plan")]
117145
#[async_backtrace::framed]
118146
pub async fn build_physical_plan(&self) -> Result<PhysicalPlan> {

src/query/storages/iceberg/src/table.rs

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_catalog::plan::ParquetReadOptions;
2727
use databend_common_catalog::plan::PartStatistics;
2828
use databend_common_catalog::plan::Partitions;
2929
use databend_common_catalog::plan::PartitionsShuffleKind;
30+
use databend_common_catalog::plan::Projection;
3031
use databend_common_catalog::plan::PushDownInfo;
3132
use databend_common_catalog::table::ColumnStatisticsProvider;
3233
use databend_common_catalog::table::DistributionLevel;
@@ -40,7 +41,9 @@ use databend_common_exception::ErrorCode;
4041
use databend_common_exception::Result;
4142
use databend_common_expression::ColumnId;
4243
use databend_common_expression::DataSchema;
44+
use databend_common_expression::FieldIndex;
4345
use databend_common_expression::TableField;
46+
use databend_common_expression::TableDataType;
4447
use databend_common_expression::TableSchema;
4548
use databend_common_meta_app::schema::CatalogInfo;
4649
use databend_common_meta_app::schema::TableIdent;
@@ -407,13 +410,9 @@ impl IcebergTable {
407410

408411
if let Some(push_downs) = &push_downs {
409412
if let Some(projection) = &push_downs.projection {
410-
scan = scan.select(
411-
projection
412-
.project_schema(&self.schema())
413-
.fields
414-
.iter()
415-
.map(|v| v.name.clone()),
416-
);
413+
let select_fields =
414+
Self::projection_to_iceberg_select_fields(projection, &self.schema())?;
415+
scan = scan.select(select_fields);
417416
}
418417
if let Some(filter) = &push_downs.filters {
419418
let (_, predicate) = PredicateBuilder::build(&filter.filter);
@@ -450,6 +449,84 @@ impl IcebergTable {
450449
))
451450
}
452451

452+
fn projection_to_iceberg_select_fields(
453+
projection: &Projection,
454+
schema: &TableSchema,
455+
) -> Result<Vec<String>> {
456+
match projection {
457+
Projection::Columns(_) => Ok(projection
458+
.project_schema(schema)
459+
.fields
460+
.iter()
461+
.map(|v| v.name.clone())
462+
.collect()),
463+
Projection::InnerColumns(path_indices) => {
464+
let fields = schema.fields();
465+
let mut names = Vec::with_capacity(path_indices.len());
466+
for path in path_indices.values() {
467+
names.push(Self::inner_column_path_to_name(fields, path)?);
468+
}
469+
Ok(names)
470+
}
471+
}
472+
}
473+
474+
fn inner_column_path_to_name(
475+
fields: &[TableField],
476+
path: &[FieldIndex],
477+
) -> Result<String> {
478+
if path.is_empty() {
479+
return Err(ErrorCode::BadArguments(
480+
"Inner column path should not be empty".to_string(),
481+
));
482+
}
483+
484+
let field = fields.get(path[0]).ok_or_else(|| {
485+
ErrorCode::BadArguments(format!(
486+
"Inner column path {:?} is out of range",
487+
path
488+
))
489+
})?;
490+
let mut name_parts = Vec::with_capacity(path.len());
491+
name_parts.push(field.name().clone());
492+
493+
let mut current_type = field.data_type().remove_nullable();
494+
for index in path.iter().skip(1) {
495+
match &current_type {
496+
TableDataType::Tuple {
497+
fields_name,
498+
fields_type,
499+
} => {
500+
let inner_name = fields_name.get(*index).ok_or_else(|| {
501+
ErrorCode::BadArguments(format!(
502+
"Inner column path {:?} is out of range for {}",
503+
path,
504+
name_parts.join(".")
505+
))
506+
})?;
507+
name_parts.push(inner_name.clone());
508+
let inner_type = fields_type.get(*index).ok_or_else(|| {
509+
ErrorCode::BadArguments(format!(
510+
"Inner column path {:?} is out of range for {}",
511+
path,
512+
name_parts.join(".")
513+
))
514+
})?;
515+
current_type = inner_type.remove_nullable();
516+
}
517+
_ => {
518+
return Err(ErrorCode::BadArguments(format!(
519+
"Inner column path {:?} is invalid for non-tuple field {}",
520+
path,
521+
name_parts.join(".")
522+
)));
523+
}
524+
}
525+
}
526+
527+
Ok(name_parts.join("."))
528+
}
529+
453530
fn convert_orc_schema(schema: &Schema) -> Schema {
454531
fn visit_field(field: &arrow_schema::FieldRef) -> arrow_schema::FieldRef {
455532
Arc::new(

src/query/storages/parquet/src/parquet_reader/reader/builder.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,8 @@ impl<'a> ParquetReaderBuilder<'a> {
240240
.map(|(proj, _, _, paths)| (proj.clone(), paths.clone()))
241241
.unwrap();
242242

243-
let (_, _, output_schema, _) = self.built_output.as_ref().unwrap();
244-
let transformer = source_type
245-
.need_transformer()
243+
let (_, _, output_schema, output_field_paths) = self.built_output.as_ref().unwrap();
244+
let transformer = (source_type.need_transformer() && output_field_paths.is_none())
246245
.then(|| RecordBatchTransformer::build(output_schema.clone()));
247246
Ok(ParquetWholeFileReader {
248247
op_registry: self.op_registry.clone(),
@@ -277,8 +276,10 @@ impl<'a> ParquetReaderBuilder<'a> {
277276
let transformer = source_type
278277
.need_transformer()
279278
.then(|| {
280-
self.built_output.as_ref().map(|(_, _, output_schema, _)| {
281-
RecordBatchTransformer::build(output_schema.clone())
279+
self.built_output.as_ref().and_then(|(_, _, output_schema, output_field_paths)| {
280+
output_field_paths
281+
.is_none()
282+
.then(|| RecordBatchTransformer::build(output_schema.clone()))
282283
})
283284
})
284285
.flatten();

src/query/storages/parquet/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ enum State {
6666
ReadFiles(Vec<(Bytes, String)>),
6767
}
6868

69-
#[derive(Debug, Clone, Copy)]
69+
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
7070
pub enum ParquetSourceType {
7171
StageTable,
7272
ResultCache,

tests/sqllogictests/scripts/prepare_iceberg_test_data.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from pyspark.sql import SparkSession
2+
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
23

34
spark = (
45
SparkSession.builder.appName("CSV to Iceberg REST Catalog")
@@ -57,4 +58,54 @@
5758
f"""INSERT INTO iceberg.test.t1_orc VALUES (0, 0, 'a'), (1, 1, 'b'), (2, 2, 'c'), (3, 3, 'd'), (4, null, null);"""
5859
)
5960

61+
# create nested table
62+
spark.sql("DROP TABLE IF EXISTS iceberg.test.t_nested")
63+
data = [
64+
(1, ("Alice", 30), (("A1", 1), 10)),
65+
(2, ("Bob", 25), (("B1", 2), 20)),
66+
(3, ("Charlie", 35), (("C1", 3), 30)),
67+
(4, None, None),
68+
]
69+
70+
# Create DataFrame and write
71+
schema = StructType(
72+
[
73+
StructField("id", IntegerType(), True),
74+
StructField(
75+
"item",
76+
StructType(
77+
[
78+
StructField("name", StringType(), True),
79+
StructField("age", IntegerType(), True),
80+
]
81+
),
82+
True,
83+
),
84+
StructField(
85+
"item_2",
86+
StructType(
87+
[
88+
StructField(
89+
"item",
90+
StructType(
91+
[
92+
StructField("name", StringType(), True),
93+
StructField("level", IntegerType(), True),
94+
]
95+
),
96+
True,
97+
),
98+
StructField("level", IntegerType(), True),
99+
]
100+
),
101+
True,
102+
),
103+
]
104+
)
105+
106+
df = spark.createDataFrame(data, schema)
107+
df.writeTo("iceberg.test.t_nested").using("iceberg").createOrReplace()
108+
109+
print("Table iceberg.test.t_nested created with sample data")
110+
60111
spark.stop()

tests/sqllogictests/suites/base/03_common/03_0028_copy_into_stage.test

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,21 @@ select sum(number) from @hello;
5656
----
5757
45
5858

59+
60+
statement ok
61+
remove @hello
62+
63+
statement ok
64+
create or replace table abc(number int, item Tuple(name String, age int)) as select number, ('aa', number) from numbers(10);
65+
66+
statement ok
67+
COPY INTO @hello from (select * from abc) FILE_FORMAT = (type = parquet)
68+
69+
query TI
70+
select max(item['name']), sum(item['age']) from @hello;
71+
----
72+
aa 45
73+
5974
statement ok
6075
CREATE TABLE world(c1 int , c2 int);
6176

@@ -66,7 +81,10 @@ statement ok
6681
DROP STAGE IF EXISTS hello
6782

6883
statement ok
69-
drop table world
84+
drop table if EXISTS abc
85+
86+
statement ok
87+
drop table if EXISTS world
7088

7189
statement ok
7290
DROP DATABASE db1

0 commit comments

Comments
 (0)