Skip to content

Commit a5593b4

Browse files
committed
Support Datafusion insert_into
1 parent a50bb22 commit a5593b4

File tree

11 files changed

+751
-6
lines changed

11 files changed

+751
-6
lines changed

crates/iceberg/src/arrow/nan_val_cnt_visitor.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ impl NanValueCountVisitor {
159159
let arrow_arr_partner_accessor = ArrowArrayAccessor {};
160160

161161
let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef;
162+
println!("----StructArray from record stream: {:?}", struct_arr);
163+
println!("----Schema.as_struct from table: {:?}", schema.as_struct());
162164
visit_struct_with_partner(
163165
schema.as_struct(),
164166
&struct_arr,

crates/iceberg/src/arrow/value.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,10 +440,12 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
440440
Ok(schema_partner)
441441
}
442442

443+
// todo generate field_pos in datafusion instead of passing to here
443444
fn field_partner<'a>(
444445
&self,
445446
struct_partner: &'a ArrayRef,
446447
field: &NestedField,
448+
field_pos: Option<usize>,
447449
) -> Result<&'a ArrayRef> {
448450
let struct_array = struct_partner
449451
.as_any()
@@ -455,6 +457,13 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
455457
)
456458
})?;
457459

460+
println!(
461+
"!!!Accessor struct array from struct partner: {:?}",
462+
struct_array
463+
);
464+
465+
println!("!!!field: {:?}", field);
466+
458467
let field_pos = struct_array
459468
.fields()
460469
.iter()
@@ -463,12 +472,12 @@ impl PartnerAccessor<ArrayRef> for ArrowArrayAccessor {
463472
.map(|id| id == field.id)
464473
.unwrap_or(false)
465474
})
466-
.ok_or_else(|| {
475+
.unwrap_or(field_pos.ok_or_else(|| {
467476
Error::new(
468477
ErrorKind::DataInvalid,
469478
format!("Field id {} not found in struct array", field.id),
470479
)
471-
})?;
480+
})?);
472481

473482
Ok(struct_array.column(field_pos))
474483
}

crates/iceberg/src/spec/manifest/_serde.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,10 @@ impl ManifestEntryV1 {
9696
}
9797
}
9898

99+
/// todo doc
99100
#[serde_as]
100101
#[derive(Serialize, Deserialize)]
101-
pub(super) struct DataFileSerde {
102+
pub struct DataFileSerde {
102103
#[serde(default)]
103104
content: i32,
104105
file_path: String,
@@ -126,6 +127,7 @@ pub(super) struct DataFileSerde {
126127
}
127128

128129
impl DataFileSerde {
130+
/// todo doc
129131
pub fn try_from(
130132
value: super::DataFile,
131133
partition_type: &StructType,
@@ -160,6 +162,7 @@ impl DataFileSerde {
160162
})
161163
}
162164

165+
/// todo doc
163166
pub fn try_into(
164167
self,
165168
partition_spec_id: i32,

crates/iceberg/src/spec/manifest/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
// todo fix encapsulation
1819
mod _serde;
20+
pub use _serde::*;
1921

2022
mod data_file;
2123
pub use data_file::*;

crates/iceberg/src/spec/schema/visitor.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,12 @@ pub trait PartnerAccessor<P> {
192192
/// Get the struct partner from schema partner.
193193
fn struct_parner<'a>(&self, schema_partner: &'a P) -> Result<&'a P>;
194194
/// Get the field partner from struct partner.
195-
fn field_partner<'a>(&self, struct_partner: &'a P, field: &NestedField) -> Result<&'a P>;
195+
fn field_partner<'a>(
196+
&self,
197+
struct_partner: &'a P,
198+
field: &NestedField,
199+
field_pos: Option<usize>,
200+
) -> Result<&'a P>;
196201
/// Get the list element partner from list partner.
197202
fn list_element_partner<'a>(&self, list_partner: &'a P) -> Result<&'a P>;
198203
/// Get the map key partner from map partner.
@@ -253,8 +258,8 @@ pub fn visit_struct_with_partner<P, V: SchemaWithPartnerVisitor<P>, A: PartnerAc
253258
accessor: &A,
254259
) -> Result<V::T> {
255260
let mut results = Vec::with_capacity(s.fields().len());
256-
for field in s.fields() {
257-
let field_partner = accessor.field_partner(partner, field)?;
261+
for (pos, field) in s.fields().iter().enumerate() {
262+
let field_partner = accessor.field_partner(partner, field, Some(pos))?;
258263
visitor.before_struct_field(field, field_partner)?;
259264
let result = visit_type_with_partner(&field.field_type, field_partner, visitor, accessor)?;
260265
visitor.after_struct_field(field, field_partner)?;

crates/integrations/datafusion/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,12 @@ async-trait = { workspace = true }
3434
datafusion = { workspace = true }
3535
futures = { workspace = true }
3636
iceberg = { workspace = true }
37+
parquet = { workspace = true }
3738
tokio = { workspace = true }
39+
serde_json = { workspace = true }
3840

3941
[dev-dependencies]
4042
expect-test = { workspace = true }
43+
iceberg-catalog-memory = { workspace = true }
4144
parquet = { workspace = true }
4245
tempfile = { workspace = true }

0 commit comments

Comments
 (0)