Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ cargo-machete: install-cargo-machete
cargo machete

install-taplo-cli:
cargo install [email protected]
cargo install [email protected] --force

fix-toml: install-taplo-cli
taplo fmt
Expand Down
14 changes: 8 additions & 6 deletions crates/iceberg/src/arrow/record_batch_partition_spliter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use super::record_batch_projector::RecordBatchProjector;
use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type};
use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type};
use crate::transform::{create_transform_function, BoxedTransformFunction};
use crate::transform::{BoxedTransformFunction, create_transform_function};
use crate::{Error, ErrorKind, Result};

/// A helper function to split the record batch into multiple record batches using computed partition columns.
Expand Down Expand Up @@ -468,11 +468,13 @@ mod tests {
.with_spec_id(1)
.build()
.unwrap();
assert!(RecordBatchPartitionSpliter::new(
&schema_to_arrow_schema(&schema).unwrap(),
Arc::new(schema),
Arc::new(partition_spec),
assert!(
RecordBatchPartitionSpliter::new(
&schema_to_arrow_schema(&schema).unwrap(),
Arc::new(schema),
Arc::new(partition_spec),
)
.is_err()
)
.is_err())
}
}
18 changes: 10 additions & 8 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,14 @@ impl TableScan {
// get the [`ManifestFile`]s from the [`ManifestList`], filtering out any
// whose partitions cannot match this
// scan's filter
let manifest_file_contexts = plan_context.build_manifest_file_contexts(
manifest_list,
manifest_entry_data_ctx_tx,
delete_file_idx.clone(),
manifest_entry_delete_ctx_tx,
).await?;
let manifest_file_contexts = plan_context
.build_manifest_file_contexts(
manifest_list,
manifest_entry_data_ctx_tx,
delete_file_idx.clone(),
manifest_entry_delete_ctx_tx,
)
.await?;

let mut channel_for_manifest_error = file_scan_task_tx.clone();

Expand Down Expand Up @@ -428,7 +430,7 @@ impl TableScan {
spawn(async move {
Self::process_delete_manifest_entry(manifest_entry_context, tx).await
})
.await
.await
},
)
.await;
Expand All @@ -451,7 +453,7 @@ impl TableScan {
spawn(async move {
Self::process_data_manifest_entry(manifest_entry_context, tx).await
})
.await
.await
},
)
.await;
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::error::Result;
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
use crate::table::Table;
use crate::transaction::snapshot::{
generate_unique_snapshot_id, DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, generate_unique_snapshot_id,
};
use crate::transaction::{ActionCommit, TransactionAction};
use crate::{Error, ErrorKind};
Expand Down Expand Up @@ -195,7 +195,7 @@ mod tests {
use std::sync::Arc;

use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct, MAIN_BRANCH,
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
};
use crate::transaction::tests::make_v2_minimal_table;
use crate::transaction::{Transaction, TransactionAction};
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl Transaction {
pub fn update_statistics(&self) -> UpdateStatisticsAction {
UpdateStatisticsAction::new()
}

/// Commit transaction.
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
if self.actions.is_empty() {
Expand Down Expand Up @@ -283,8 +283,8 @@ mod tests {
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};

use crate::catalog::MockCatalog;
use crate::io::FileIOBuilder;
Expand Down
7 changes: 4 additions & 3 deletions crates/iceberg/src/transaction/remove_snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use itertools::Itertools;

use crate::error::Result;
use crate::spec::{
ancestors_of, SnapshotReference, SnapshotRetention, TableMetadata, MAIN_BRANCH,
MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP_DEFAULT,
MAIN_BRANCH, MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS_DEFAULT,
MIN_SNAPSHOTS_TO_KEEP_DEFAULT, SnapshotReference, SnapshotRetention, TableMetadata,
ancestors_of,
};
use crate::table::Table;
use crate::transaction::{ActionCommit, TransactionAction};
Expand Down Expand Up @@ -388,7 +389,7 @@ mod tests {
use std::sync::Arc;

use crate::io::FileIOBuilder;
use crate::spec::{TableMetadata, MAIN_BRANCH};
use crate::spec::{MAIN_BRANCH, TableMetadata};
use crate::table::Table;
use crate::transaction::{Transaction, TransactionAction};
use crate::{TableIdent, TableRequirement};
Expand Down
10 changes: 5 additions & 5 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use uuid::Uuid;

use crate::error::Result;
use crate::spec::{
update_snapshot_summaries, DataContentType, DataFile, DataFileFormat, FormatVersion,
ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter,
ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention,
SnapshotSummaryCollector, Struct, StructType, Summary, MAIN_BRANCH,
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
DataContentType, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType,
ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder,
Operation, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT,
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, Snapshot, SnapshotReference, SnapshotRetention,
SnapshotSummaryCollector, Struct, StructType, Summary, update_snapshot_summaries,
};
use crate::table::Table;
use crate::transaction::ActionCommit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use once_cell::sync::Lazy;

use crate::Result;
use crate::arrow::schema_to_arrow_schema;
use crate::spec::{DataFile, NestedField, PrimitiveType, Schema, SchemaRef, Struct, Type};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::Result;

/// Builder for `MemoryPositionDeleteWriter`.
#[derive(Clone)]
Expand Down Expand Up @@ -177,16 +177,16 @@ mod test {
use tempfile::TempDir;

use super::POSITION_DELETE_SCHEMA;
use crate::Result;
use crate::io::FileIOBuilder;
use crate::spec::{DataContentType, DataFileFormat, Struct};
use crate::writer::base_writer::sort_position_delete_writer::{
PositionDeleteInput, SortPositionDeleteWriterBuilder,
};
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::ParquetWriterBuilder;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::Result;

#[tokio::test]
async fn test_position_delete_writer() -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

//! This module contains the fanout partition writer.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_row::OwnedRow;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use itertools::Itertools;

use crate::arrow::{schema_to_arrow_schema, RecordBatchPartitionSpliter};
use crate::Result;
use crate::arrow::{RecordBatchPartitionSpliter, schema_to_arrow_schema};
use crate::spec::{DataFile, PartitionSpecRef, SchemaRef};
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::Result;

/// The builder for `FanoutPartitionWriter`.
#[derive(Clone)]
Expand Down Expand Up @@ -154,18 +154,18 @@ mod test {
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;

use crate::Result;
use crate::io::FileIOBuilder;
use crate::spec::{
DataFileFormat, Literal, NestedField, PartitionSpec, PrimitiveLiteral, PrimitiveType,
Schema, Struct, Transform, Type, UnboundPartitionField,
};
use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::ParquetWriterBuilder;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder;
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::Result;

#[tokio::test]
async fn test_fanout_partition_writer() -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! This module contains the precompute partition writer.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::hash_map::Entry;

use arrow_array::{RecordBatch, StructArray};
use arrow_row::{OwnedRow, RowConverter, SortField};
Expand Down Expand Up @@ -150,18 +150,18 @@ mod test {
use parquet::file::properties::WriterProperties;
use tempfile::TempDir;

use crate::Result;
use crate::io::FileIOBuilder;
use crate::spec::{
DataFileFormat, Literal, NestedField, PartitionSpec, PrimitiveLiteral, PrimitiveType,
Schema, Struct, Transform, Type, UnboundPartitionField,
};
use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::ParquetWriterBuilder;
use crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
use crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use crate::writer::function_writer::precompute_partition_writer::PrecomputePartitionWriterBuilder;
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::Result;

#[tokio::test]
async fn test_precompute_partition_writer() -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ pub mod function_writer;

use arrow_array::RecordBatch;

use crate::spec::{DataFile, SchemaRef};
use crate::Result;
use crate::spec::{DataFile, SchemaRef};

type DefaultInput = RecordBatch;
type DefaultOutput = Vec<DataFile>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, TableCreation};
use iceberg_catalog_rest::RestCatalog;
Expand Down Expand Up @@ -131,12 +131,12 @@ async fn test_append_data_file() {

// commit result again
let tx = Transaction::new(&table);

// Create a new data file for the second commit
let mut data_file_writer_2 = data_file_writer_builder.clone().build().await.unwrap();
data_file_writer_2.write(batch.clone()).await.unwrap();
let data_file_2 = data_file_writer_2.close().await.unwrap();

let append_action = tx.fast_append().add_data_files(data_file_2);
let tx = append_action.apply(tx).unwrap();
let table = tx.commit(&rest_catalog).await.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use iceberg::spec::{Literal, PrimitiveLiteral, Struct, Transform, UnboundPartiti
use iceberg::table::Table;
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, TableCreation};
use iceberg_catalog_rest::RestCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, TableCreation};
use iceberg_catalog_rest::RestCatalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, TableCreation};
use iceberg_catalog_rest::RestCatalog;
Expand Down Expand Up @@ -79,33 +79,35 @@ async fn test_expire_snapshots_by_count() {
for i in 0..10 {
// Create a new data file writer for each iteration
let mut data_file_writer = data_file_writer_builder.clone().build().await.unwrap();

// Create different data for each iteration
let col1 = StringArray::from(vec![
Some(format!("foo_{}", i)),
Some(format!("bar_{}", i)),
None,
Some(format!("baz_{}", i))
Some(format!("foo_{}", i)),
Some(format!("bar_{}", i)),
None,
Some(format!("baz_{}", i)),
]);
let col2 = Int32Array::from(vec![Some(i), Some(i + 1), Some(i + 2), Some(i + 3)]);
let col3 = BooleanArray::from(vec![
Some(i % 2 == 0),
Some(i % 2 == 1),
None,
Some(i % 3 == 0),
]);
let col2 = Int32Array::from(vec![Some(i), Some(i+1), Some(i+2), Some(i+3)]);
let col3 = BooleanArray::from(vec![Some(i % 2 == 0), Some(i % 2 == 1), None, Some(i % 3 == 0)]);
let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(col1) as ArrayRef,
Arc::new(col2) as ArrayRef,
Arc::new(col3) as ArrayRef,
])
.unwrap();

// Write the unique data and get the data file
data_file_writer.write(batch.clone()).await.unwrap();
let data_file = data_file_writer.close().await.unwrap();

let tx = Transaction::new(&table);
let append_action = tx.fast_append();
let tx = append_action
.add_data_files(data_file)
.apply(tx)
.unwrap();
let tx = append_action.add_data_files(data_file).apply(tx).unwrap();
table = tx.commit(&rest_catalog).await.unwrap();
}

Expand Down
Loading
Loading