Skip to content

Commit 47f15ed

Browse files
committed
add ut
1 parent 9717473 commit 47f15ed

File tree

2 files changed

+152
-4
lines changed

2 files changed

+152
-4
lines changed

crates/integrations/datafusion/src/physical_plan/write.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion::physical_plan::{
3535
execute_input_stream,
3636
};
3737
use futures::StreamExt;
38-
use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema};
38+
use iceberg::arrow::FieldMatchMode;
3939
use iceberg::spec::{DataFileFormat, TableProperties, serialize_data_file_to_json};
4040
use iceberg::table::Table;
4141
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
@@ -280,7 +280,7 @@ impl ExecutionPlan for IcebergWriteExec {
280280
// Get input data
281281
let data = execute_input_stream(
282282
Arc::clone(&self.input),
283-
Arc::new(schema_to_arrow_schema(&schema).map_err(to_datafusion_error)?),
283+
self.input.schema(), // input schema may have projected column `_partition`
284284
partition,
285285
Arc::clone(&context),
286286
)?;

crates/integrations/datafusion/tests/integration_datafusion_test.rs

Lines changed: 150 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@ use datafusion::execution::context::SessionContext;
2727
use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
2828
use expect_test::expect;
2929
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
30-
use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
30+
use iceberg::spec::{
31+
NestedField, PrimitiveType, Schema, StructType, Transform, Type, UnboundPartitionSpec,
32+
};
3133
use iceberg::test_utils::check_record_batches;
32-
use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation};
34+
use iceberg::{
35+
Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation, TableIdent,
36+
};
3337
use iceberg_datafusion::IcebergCatalogProvider;
3438
use tempfile::TempDir;
3539

@@ -810,3 +814,147 @@ async fn test_insert_into_nested() -> Result<()> {
810814

811815
Ok(())
812816
}
817+
818+
#[tokio::test]
819+
async fn test_insert_into_partitioned() -> Result<()> {
820+
let iceberg_catalog = get_iceberg_catalog().await;
821+
let namespace = NamespaceIdent::new("test_partitioned_write".to_string());
822+
set_test_namespace(&iceberg_catalog, &namespace).await?;
823+
824+
// Create a schema with a partition column
825+
let schema = Schema::builder()
826+
.with_schema_id(0)
827+
.with_fields(vec![
828+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
829+
NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(),
830+
NestedField::required(3, "value", Type::Primitive(PrimitiveType::String)).into(),
831+
])
832+
.build()?;
833+
834+
// Create partition spec with identity transform on category
835+
let partition_spec = UnboundPartitionSpec::builder()
836+
.with_spec_id(0)
837+
.add_partition_field(2, "category", Transform::Identity)?
838+
.build();
839+
840+
// Create the partitioned table
841+
let creation = TableCreation::builder()
842+
.name("partitioned_table".to_string())
843+
.location(temp_path())
844+
.schema(schema)
845+
.partition_spec(partition_spec)
846+
.properties(HashMap::new())
847+
.build();
848+
849+
iceberg_catalog.create_table(&namespace, creation).await?;
850+
851+
let client = Arc::new(iceberg_catalog);
852+
let catalog = Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?);
853+
854+
let ctx = SessionContext::new();
855+
ctx.register_catalog("catalog", catalog);
856+
857+
// Insert data with multiple partition values in a single batch
858+
let df = ctx
859+
.sql(
860+
r#"
861+
INSERT INTO catalog.test_partitioned_write.partitioned_table
862+
VALUES
863+
(1, 'electronics', 'laptop'),
864+
(2, 'electronics', 'phone'),
865+
(3, 'books', 'novel'),
866+
(4, 'books', 'textbook'),
867+
(5, 'clothing', 'shirt')
868+
"#,
869+
)
870+
.await
871+
.unwrap();
872+
873+
let batches = df.collect().await.unwrap();
874+
assert_eq!(batches.len(), 1);
875+
let batch = &batches[0];
876+
let rows_inserted = batch
877+
.column(0)
878+
.as_any()
879+
.downcast_ref::<UInt64Array>()
880+
.unwrap();
881+
assert_eq!(rows_inserted.value(0), 5);
882+
883+
// Refresh catalog to get updated table
884+
let catalog = Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?);
885+
ctx.register_catalog("catalog", catalog);
886+
887+
// Query the table to verify data
888+
let df = ctx
889+
.sql("SELECT * FROM catalog.test_partitioned_write.partitioned_table ORDER BY id")
890+
.await
891+
.unwrap();
892+
893+
let batches = df.collect().await.unwrap();
894+
895+
// Verify the data - note that _partition column should NOT be present
896+
check_record_batches(
897+
batches,
898+
expect![[r#"
899+
Field { name: "id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
900+
Field { name: "category", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
901+
Field { name: "value", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }"#]],
902+
expect![[r#"
903+
id: PrimitiveArray<Int32>
904+
[
905+
1,
906+
2,
907+
3,
908+
4,
909+
5,
910+
],
911+
category: StringArray
912+
[
913+
"electronics",
914+
"electronics",
915+
"books",
916+
"books",
917+
"clothing",
918+
],
919+
value: StringArray
920+
[
921+
"laptop",
922+
"phone",
923+
"novel",
924+
"textbook",
925+
"shirt",
926+
]"#]],
927+
&[],
928+
Some("id"),
929+
);
930+
931+
// Verify that data files exist under correct partition paths
932+
let table_ident = TableIdent::new(namespace.clone(), "partitioned_table".to_string());
933+
let table = client.load_table(&table_ident).await?;
934+
let table_location = table.metadata().location();
935+
let file_io = table.file_io();
936+
937+
// List files under each expected partition path
938+
let electronics_path = format!("{}/data/category=electronics", table_location);
939+
let books_path = format!("{}/data/category=books", table_location);
940+
let clothing_path = format!("{}/data/category=clothing", table_location);
941+
942+
// Verify partition directories exist and contain data files
943+
assert!(
944+
file_io.exists(&electronics_path).await?,
945+
"Expected partition directory: {}",
946+
electronics_path
947+
);
948+
assert!(
949+
file_io.exists(&books_path).await?,
950+
"Expected partition directory: {}",
951+
books_path
952+
);
953+
assert!(
954+
file_io.exists(&clothing_path).await?,
955+
"Expected partition directory: {}",
956+
clothing_path
957+
);
958+
959+
Ok(())
960+
}

0 commit comments

Comments
 (0)