diff --git a/Makefile b/Makefile index 2cc847d7b3..554fa6e1e0 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ cargo-machete: install-cargo-machete cargo machete install-taplo-cli: - cargo install taplo-cli@0.9.3 + cargo install taplo-cli@0.9.3 --force fix-toml: install-taplo-cli taplo fmt diff --git a/crates/iceberg/src/arrow/record_batch_partition_spliter.rs b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs index e9816d89a2..d4a66909eb 100644 --- a/crates/iceberg/src/arrow/record_batch_partition_spliter.rs +++ b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs @@ -28,7 +28,7 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use super::record_batch_projector::RecordBatchProjector; use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type}; use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type}; -use crate::transform::{create_transform_function, BoxedTransformFunction}; +use crate::transform::{BoxedTransformFunction, create_transform_function}; use crate::{Error, ErrorKind, Result}; /// A helper function to split the record batch into multiple record batches using computed partition columns. @@ -468,11 +468,13 @@ mod tests { .with_spec_id(1) .build() .unwrap(); - assert!(RecordBatchPartitionSpliter::new( - &schema_to_arrow_schema(&schema).unwrap(), - Arc::new(schema), - Arc::new(partition_spec), + assert!( + RecordBatchPartitionSpliter::new( + &schema_to_arrow_schema(&schema).unwrap(), + Arc::new(schema), + Arc::new(partition_spec), + ) + .is_err() ) - .is_err()) } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index fc98e1f6ce..9dff327222 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -393,12 +393,14 @@ impl TableScan { // get the [`ManifestFile`]s from the [`ManifestList`], filtering out any // whose partitions cannot match this // scan's filter - let manifest_file_contexts = plan_context.build_manifest_file_contexts( - manifest_list, - manifest_entry_data_ctx_tx, - delete_file_idx.clone(), - manifest_entry_delete_ctx_tx, - ).await?; + let manifest_file_contexts = plan_context + .build_manifest_file_contexts( + manifest_list, + manifest_entry_data_ctx_tx, + delete_file_idx.clone(), + manifest_entry_delete_ctx_tx, + ) + .await?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); @@ -428,7 +430,7 @@ impl TableScan { spawn(async move { Self::process_delete_manifest_entry(manifest_entry_context, tx).await }) - .await + .await }, ) .await; @@ -451,7 +453,7 @@ impl TableScan { spawn(async move { Self::process_data_manifest_entry(manifest_entry_context, tx).await }) - .await + .await }, ) .await; diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 0278f2abe5..97fc78e9e5 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -25,7 +25,7 @@ use crate::error::Result; use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; use crate::table::Table; use crate::transaction::snapshot::{ - generate_unique_snapshot_id, DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, generate_unique_snapshot_id, }; use crate::transaction::{ActionCommit, TransactionAction}; use crate::{Error, ErrorKind}; @@ -195,7 +195,7 @@ mod tests { use std::sync::Arc; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, MAIN_BRANCH, + DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct, }; use crate::transaction::tests::make_v2_minimal_table; use crate::transaction::{Transaction, TransactionAction}; diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 07d864f81b..a4f5b3904c 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -169,7 +169,7 @@ impl Transaction { pub fn update_statistics(&self) -> UpdateStatisticsAction { UpdateStatisticsAction::new() } - + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { @@ -283,8 +283,8 @@ mod tests { use std::collections::HashMap; use std::fs::File; use std::io::BufReader; - use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; + use std::sync::atomic::{AtomicU32, Ordering}; use crate::catalog::MockCatalog; use crate::io::FileIOBuilder; diff --git a/crates/iceberg/src/transaction/remove_snapshots.rs b/crates/iceberg/src/transaction/remove_snapshots.rs index 0e18f9ece7..cda5f8fba9 100644 --- a/crates/iceberg/src/transaction/remove_snapshots.rs +++ b/crates/iceberg/src/transaction/remove_snapshots.rs @@ -25,8 +25,9 @@ use itertools::Itertools; use crate::error::Result; use crate::spec::{ - ancestors_of, SnapshotReference, SnapshotRetention, TableMetadata, MAIN_BRANCH, - MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP_DEFAULT, + MAIN_BRANCH, MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS_DEFAULT, + MIN_SNAPSHOTS_TO_KEEP_DEFAULT, SnapshotReference, SnapshotRetention, TableMetadata, + ancestors_of, }; use crate::table::Table; use crate::transaction::{ActionCommit, TransactionAction}; @@ -388,7 +389,7 @@ mod tests { use std::sync::Arc; use crate::io::FileIOBuilder; - use crate::spec::{TableMetadata, MAIN_BRANCH}; + use crate::spec::{MAIN_BRANCH, TableMetadata}; use crate::table::Table; use crate::transaction::{Transaction, TransactionAction}; use crate::{TableIdent, TableRequirement}; diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 15a21682bd..cc98988899 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -23,11 +23,11 @@ use uuid::Uuid; use crate::error::Result; use crate::spec::{ - update_snapshot_summaries, DataContentType, DataFile, DataFileFormat, FormatVersion, - ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, - ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, - SnapshotSummaryCollector, Struct, StructType, Summary, MAIN_BRANCH, - PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, + DataContentType, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, + Operation, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, + PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention, + SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit; diff --git a/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs b/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs index ecdb617bdd..70f74bed65 100644 --- a/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs @@ -23,11 +23,11 @@ use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; use arrow_schema::SchemaRef as ArrowSchemaRef; use once_cell::sync::Lazy; +use crate::Result; use crate::arrow::schema_to_arrow_schema; use crate::spec::{DataFile, NestedField, PrimitiveType, Schema, SchemaRef, Struct, Type}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; -use crate::Result; /// Builder for `MemoryPositionDeleteWriter`. #[derive(Clone)] @@ -177,16 +177,16 @@ mod test { use tempfile::TempDir; use super::POSITION_DELETE_SCHEMA; + use crate::Result; use crate::io::FileIOBuilder; use crate::spec::{DataContentType, DataFileFormat, Struct}; use crate::writer::base_writer::sort_position_delete_writer::{ PositionDeleteInput, SortPositionDeleteWriterBuilder, }; - use crate::writer::file_writer::location_generator::test::MockLocationGenerator; - use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; - use crate::Result; #[tokio::test] async fn test_position_delete_writer() -> Result<()> { diff --git a/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs index d3b3d63215..874c0368dc 100644 --- a/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs +++ b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs @@ -17,8 +17,8 @@ //! This module contains the fanout partition writer. -use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::collections::hash_map::Entry; use std::sync::Arc; use arrow_array::RecordBatch; @@ -26,10 +26,10 @@ use arrow_row::OwnedRow; use arrow_schema::SchemaRef as ArrowSchemaRef; use itertools::Itertools; -use crate::arrow::{schema_to_arrow_schema, RecordBatchPartitionSpliter}; +use crate::Result; +use crate::arrow::{RecordBatchPartitionSpliter, schema_to_arrow_schema}; use crate::spec::{DataFile, PartitionSpecRef, SchemaRef}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; -use crate::Result; /// The builder for `FanoutPartitionWriter`. #[derive(Clone)] @@ -154,18 +154,18 @@ mod test { use parquet::file::properties::WriterProperties; use tempfile::TempDir; + use crate::Result; use crate::io::FileIOBuilder; use crate::spec::{ DataFileFormat, Literal, NestedField, PartitionSpec, PrimitiveLiteral, PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, }; use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; - use crate::writer::file_writer::location_generator::test::MockLocationGenerator; - use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; - use crate::Result; #[tokio::test] async fn test_fanout_partition_writer() -> Result<()> { diff --git a/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs b/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs index 908475d7dd..f1ae28a5e8 100644 --- a/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs +++ b/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs @@ -17,8 +17,8 @@ //! This module contains the precompute partition writer. -use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::collections::hash_map::Entry; use arrow_array::{RecordBatch, StructArray}; use arrow_row::{OwnedRow, RowConverter, SortField}; @@ -150,18 +150,18 @@ mod test { use parquet::file::properties::WriterProperties; use tempfile::TempDir; + use crate::Result; use crate::io::FileIOBuilder; use crate::spec::{ DataFileFormat, Literal, NestedField, PartitionSpec, PrimitiveLiteral, PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, }; use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; - use crate::writer::file_writer::location_generator::test::MockLocationGenerator; - use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::function_writer::precompute_partition_writer::PrecomputePartitionWriterBuilder; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; - use crate::Result; #[tokio::test] async fn test_precompute_partition_writer() -> Result<()> { diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index d6b9ec15f9..8c32af4633 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -204,8 +204,8 @@ pub mod function_writer; use arrow_array::RecordBatch; -use crate::spec::{DataFile, SchemaRef}; use crate::Result; +use crate::spec::{DataFile, SchemaRef}; type DefaultInput = RecordBatch; type DefaultOutput = Vec; diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index 7755b745dd..31a307d121 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog; @@ -131,12 +131,12 @@ async fn test_append_data_file() { // commit result again let tx = Transaction::new(&table); - + // Create a new data file for the second commit let mut data_file_writer_2 = data_file_writer_builder.clone().build().await.unwrap(); data_file_writer_2.write(batch.clone()).await.unwrap(); let data_file_2 = data_file_writer_2.close().await.unwrap(); - + let append_action = tx.fast_append().add_data_files(data_file_2); let tx = append_action.apply(tx).unwrap(); let table = tx.commit(&rest_catalog).await.unwrap(); diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 14e90c9aaa..36a1c3643f 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -25,10 +25,10 @@ use iceberg::spec::{Literal, PrimitiveLiteral, Struct, Transform, UnboundPartiti use iceberg::table::Table; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog; diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index fcc5a3f21f..551116bf91 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog; diff --git a/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs index fddb2dc37e..cc006dc783 100644 --- a/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs +++ b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs @@ -22,10 +22,10 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog; @@ -79,33 +79,35 @@ async fn test_expire_snapshots_by_count() { for i in 0..10 { // Create a new data file writer for each iteration let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap(); - + // Create different data for each iteration let col1 = StringArray::from(vec![ - Some(format!("foo_{}", i)), - Some(format!("bar_{}", i)), - None, - Some(format!("baz_{}", i)) + Some(format!("foo_{}", i)), + Some(format!("bar_{}", i)), + None, + Some(format!("baz_{}", i)), + ]); + let col2 = Int32Array::from(vec![Some(i), Some(i + 1), Some(i + 2), Some(i + 3)]); + let col3 = BooleanArray::from(vec![ + Some(i % 2 == 0), + Some(i % 2 == 1), + None, + Some(i % 3 == 0), ]); - let col2 = Int32Array::from(vec![Some(i), Some(i+1), Some(i+2), Some(i+3)]); - let col3 = BooleanArray::from(vec![Some(i % 2 == 0), Some(i % 2 == 1), None, Some(i % 3 == 0)]); let batch = RecordBatch::try_new(schema.clone(), vec![ Arc::new(col1) as ArrayRef, Arc::new(col2) as ArrayRef, Arc::new(col3) as ArrayRef, ]) .unwrap(); - + // Write the unique data and get the data file data_file_writer.write(batch.clone()).await.unwrap(); let data_file = data_file_writer.close().await.unwrap(); - + let tx = Transaction::new(&table); let append_action = tx.fast_append(); - let tx = append_action - .add_data_files(data_file) - .apply(tx) - .unwrap(); + let tx = append_action.add_data_files(data_file).apply(tx).unwrap(); table = tx.commit(&rest_catalog).await.unwrap(); } diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 0d9a0f65ab..51f4093927 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -30,15 +30,15 @@ use arrow_schema::{DataType, Field, Fields}; use futures::TryStreamExt; use iceberg::arrow::{DEFAULT_MAP_FIELD_NAME, UTC_TIME_ZONE}; use iceberg::spec::{ - ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type, LIST_FIELD_NAME, - MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, + LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedField, + PrimitiveType, Schema, StructType, Type, }; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog;