Skip to content

Commit b20a255

Browse files
mbutrovichXuanwo
andauthored
feat(reader): Add Date32 support to RecordBatchTransformer create_column (apache#1792)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Partially address apache#1749. Iceberg Java has a test that does a schema change requires the `RecordBatchTransformer` to add a Date32 column, which it currently does not support. ## What changes are included in this PR? <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> Add match arms for `Date32` type in `create_column`. ## Are these changes tested? <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> New test that mirrors Iceberg Java's `TestSelect.readAndWriteWithBranchAfterSchemaChange` --------- Co-authored-by: Xuanwo <[email protected]>
1 parent 16ddd0e commit b20a255

File tree

1 file changed

+79
-3
lines changed

1 file changed

+79
-3
lines changed

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use std::collections::HashMap;
1919
use std::sync::Arc;
2020

2121
use arrow_array::{
22-
Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array,
23-
Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray,
22+
Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array,
23+
Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray,
2424
};
2525
use arrow_cast::cast;
2626
use arrow_schema::{
@@ -401,6 +401,13 @@ impl RecordBatchTransformer {
401401
let vals: Vec<Option<i32>> = vec![None; num_rows];
402402
Arc::new(Int32Array::from(vals))
403403
}
404+
(DataType::Date32, Some(PrimitiveLiteral::Int(value))) => {
405+
Arc::new(Date32Array::from(vec![*value; num_rows]))
406+
}
407+
(DataType::Date32, None) => {
408+
let vals: Vec<Option<i32>> = vec![None; num_rows];
409+
Arc::new(Date32Array::from(vals))
410+
}
404411
(DataType::Int64, Some(PrimitiveLiteral::Long(value))) => {
405412
Arc::new(Int64Array::from(vec![*value; num_rows]))
406413
}
@@ -453,7 +460,8 @@ mod test {
453460
use std::sync::Arc;
454461

455462
use arrow_array::{
456-
Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
463+
Array, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
464+
StringArray,
457465
};
458466
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
459467
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
@@ -509,6 +517,74 @@ mod test {
509517
assert_eq!(result, expected);
510518
}
511519

520+
#[test]
521+
fn schema_evolution_adds_date_column_with_nulls() {
522+
// Reproduces TestSelect.readAndWriteWithBranchAfterSchemaChange from iceberg-spark.
523+
// When reading old snapshots after adding a DATE column, the transformer must
524+
// populate the new column with NULL values since old files lack this field.
525+
let snapshot_schema = Arc::new(
526+
Schema::builder()
527+
.with_schema_id(1)
528+
.with_fields(vec![
529+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
530+
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
531+
NestedField::optional(3, "date_col", Type::Primitive(PrimitiveType::Date))
532+
.into(),
533+
])
534+
.build()
535+
.unwrap(),
536+
);
537+
let projected_iceberg_field_ids = [1, 2, 3];
538+
539+
let mut transformer =
540+
RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids);
541+
542+
let file_schema = Arc::new(ArrowSchema::new(vec![
543+
simple_field("id", DataType::Int32, false, "1"),
544+
simple_field("name", DataType::Utf8, true, "2"),
545+
]));
546+
547+
let file_batch = RecordBatch::try_new(file_schema, vec![
548+
Arc::new(Int32Array::from(vec![1, 2, 3])),
549+
Arc::new(StringArray::from(vec![
550+
Some("Alice"),
551+
Some("Bob"),
552+
Some("Charlie"),
553+
])),
554+
])
555+
.unwrap();
556+
557+
let result = transformer.process_record_batch(file_batch).unwrap();
558+
559+
assert_eq!(result.num_columns(), 3);
560+
assert_eq!(result.num_rows(), 3);
561+
562+
let id_column = result
563+
.column(0)
564+
.as_any()
565+
.downcast_ref::<Int32Array>()
566+
.unwrap();
567+
assert_eq!(id_column.values(), &[1, 2, 3]);
568+
569+
let name_column = result
570+
.column(1)
571+
.as_any()
572+
.downcast_ref::<StringArray>()
573+
.unwrap();
574+
assert_eq!(name_column.value(0), "Alice");
575+
assert_eq!(name_column.value(1), "Bob");
576+
assert_eq!(name_column.value(2), "Charlie");
577+
578+
let date_column = result
579+
.column(2)
580+
.as_any()
581+
.downcast_ref::<Date32Array>()
582+
.unwrap();
583+
assert!(date_column.is_null(0));
584+
assert!(date_column.is_null(1));
585+
assert!(date_column.is_null(2));
586+
}
587+
512588
pub fn source_record_batch() -> RecordBatch {
513589
RecordBatch::try_new(
514590
arrow_schema_promotion_addition_and_renaming_required(),

0 commit comments

Comments
 (0)