Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ arrow-array = "56.2"
arrow-buffer = "56.2"
arrow-cast = "56.2"
arrow-ord = "56.2"
arrow-row = "56.2"
arrow-schema = "56.2"
arrow-select = "56.2"
arrow-string = "56.2"
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl FastAppendAction {
self
}

/// Set target branch for the snapshot.
pub fn set_target_branch(mut self, target_branch: String) -> Self {
self.target_branch = Some(target_branch);
self
Expand Down Expand Up @@ -106,6 +107,7 @@ impl FastAppendAction {
self
}

/// Set snapshot id for the snapshot.
pub fn set_snapshot_id(mut self, snapshot_id: i64) -> Self {
self.snapshot_id = Some(snapshot_id);
self
Expand Down
4 changes: 3 additions & 1 deletion crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ mod action;

pub use action::*;
mod append;

pub use append::FastAppendAction;
mod manifest_filter;

pub use manifest_filter::*;
Expand All @@ -77,7 +79,7 @@ use rewrite_files::RewriteFilesAction;
use crate::error::Result;
use crate::spec::TableProperties;
use crate::table::Table;
use crate::transaction::append::{FastAppendAction, MergeAppendAction};
use crate::transaction::append::MergeAppendAction;
use crate::transaction::overwrite_files::OverwriteFilesAction;
use crate::transaction::sort_order::ReplaceSortOrderAction;
use crate::transaction::update_location::UpdateLocationAction;
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,14 +708,17 @@ impl<'a> SnapshotProducer<'a> {
Ok(ActionCommit::new(updates, requirements))
}

/// Set the new data file sequence number for this snapshot
pub fn set_new_data_file_sequence_number(&mut self, new_data_file_sequence_number: i64) {
self.new_data_file_sequence_number = Some(new_data_file_sequence_number);
}

/// Set the target branch for this snapshot
pub fn set_target_branch(&mut self, target_branch: String) {
self.target_branch = target_branch;
}

/// Get the target branch for this snapshot
pub fn target_branch(&self) -> &str {
&self.target_branch
}
Expand Down
8 changes: 6 additions & 2 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,15 @@ where
}

fn current_row_num(&self) -> usize {
self.inner.as_ref().unwrap().current_row_num()
self.inner
.as_ref()
.map_or(0, |inner| inner.current_row_num())
}

fn current_written_size(&self) -> usize {
self.inner.as_ref().unwrap().current_written_size()
self.inner
.as_ref()
.map_or(0, |inner| inner.current_written_size())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ static DELETE_FILE_POS: Lazy<NestedFieldRef> = Lazy::new(|| {
Type::Primitive(PrimitiveType::Long),
))
});
static POSITION_DELETE_SCHEMA: Lazy<Schema> = Lazy::new(|| {
/// Iceberg schema used for position delete files (file_path, pos).
pub static POSITION_DELETE_SCHEMA: Lazy<Schema> = Lazy::new(|| {
Schema::builder()
.with_fields(vec![DELETE_FILE_PATH.clone(), DELETE_FILE_POS.clone()])
.build()
Expand Down Expand Up @@ -199,11 +200,15 @@ where
}

fn current_row_num(&self) -> usize {
self.inner.as_ref().unwrap().current_row_num()
self.inner
.as_ref()
.map_or(0, |inner| inner.current_row_num())
}

fn current_written_size(&self) -> usize {
self.inner.as_ref().unwrap().current_written_size()
self.inner
.as_ref()
.map_or(0, |inner| inner.current_written_size())
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::CurrentFileStatus;
use crate::Result;
use crate::spec::DataFileBuilder;

mod parquet_writer;
pub mod parquet_writer;
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};

use crate::io::OutputFile;
Expand Down
8 changes: 6 additions & 2 deletions crates/iceberg/src/writer/file_writer/rolling_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,15 @@ impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> CurrentFi
}

fn current_row_num(&self) -> usize {
self.inner.as_ref().unwrap().current_row_num()
self.inner
.as_ref()
.map_or(0, |inner| inner.current_row_num())
}

fn current_written_size(&self) -> usize {
self.inner.as_ref().unwrap().current_written_size()
self.inner
.as_ref()
.map_or(0, |inner| inner.current_written_size())
}
}

Expand Down
Loading
Loading