Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/iceberg/src/spec/manifest/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ impl ManifestEntry {
pub fn data_file(&self) -> &DataFile {
&self.data_file
}

/// File sequence number indicating when the file was added. Inherited when null and status is 1 (added).
#[inline]
pub fn file_sequence_number(&self) -> Option<i64> {
self.file_sequence_number
}
}

/// Used to track additions and deletions in ManifestEntry.
Expand Down
148 changes: 147 additions & 1 deletion crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,22 @@ use crate::error::Result;
use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
use crate::table::Table;
use crate::transaction::snapshot::{
generate_unique_snapshot_id, DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
generate_unique_snapshot_id, DefaultManifestProcess, MergeManifestProcess,
SnapshotProduceOperation, SnapshotProducer,
};
use crate::transaction::{ActionCommit, TransactionAction};
use crate::{Error, ErrorKind};

/// Target size of manifest file when merging manifests.
pub const MANIFEST_TARGET_SIZE_BYTES: &str = "commit.manifest.target-size-bytes";
const MANIFEST_TARGET_SIZE_BYTES_DEFAULT: u32 = 8 * 1024 * 1024; // 8 MB
/// Minimum number of manifests to merge.
pub const MANIFEST_MIN_MERGE_COUNT: &str = "commit.manifest.min-count-to-merge";
const MANIFEST_MIN_MERGE_COUNT_DEFAULT: u32 = 100;
/// Whether allow to merge manifests.
pub const MANIFEST_MERGE_ENABLED: &str = "commit.manifest-merge.enabled";
const MANIFEST_MERGE_ENABLED_DEFAULT: bool = false;

/// FastAppendAction is a transaction action for fast append data files to the table.
pub struct FastAppendAction {
check_duplicate: bool,
Expand Down Expand Up @@ -189,6 +200,141 @@ impl SnapshotProduceOperation for FastAppendOperation {
}
}

/// MergeAppendAction is a transaction action similar to fast append except that it will merge manifests
/// based on the target size.
pub struct MergeAppendAction {
// snapshot_produce_action: SnapshotProducer<'_>,
target_size_bytes: u32,
min_count_to_merge: u32,
merge_enabled: bool,

check_duplicate: bool,
// below are properties used to create SnapshotProducer when commit
commit_uuid: Option<Uuid>,
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
added_delete_files: Vec<DataFile>,
snapshot_id: Option<i64>,
}

impl MergeAppendAction {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new() -> Self {
Self {
target_size_bytes: MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
min_count_to_merge: MANIFEST_MIN_MERGE_COUNT_DEFAULT,
merge_enabled: MANIFEST_MERGE_ENABLED_DEFAULT,
check_duplicate: true,
commit_uuid: None,
key_metadata: None,
snapshot_properties: HashMap::default(),
added_data_files: vec![],
added_delete_files: vec![],
snapshot_id: None,
}
}

pub fn set_target_size_bytes(mut self, v: u32) -> Self {
self.target_size_bytes = v;
self
}

pub fn set_min_count_to_merge(mut self, v: u32) -> Self {
self.min_count_to_merge = v;
self
}

pub fn set_merge_enabled(mut self, v: bool) -> Self {
self.merge_enabled = v;
self
}

pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap<String, String>) -> Self {
let target_size_bytes: u32 = snapshot_properties
.get(MANIFEST_TARGET_SIZE_BYTES)
.and_then(|s| s.parse().ok())
.unwrap_or(MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
let min_count_to_merge: u32 = snapshot_properties
.get(MANIFEST_MIN_MERGE_COUNT)
.and_then(|s| s.parse().ok())
.unwrap_or(MANIFEST_MIN_MERGE_COUNT_DEFAULT);
let merge_enabled = snapshot_properties
.get(MANIFEST_MERGE_ENABLED)
.and_then(|s| s.parse().ok())
.unwrap_or(MANIFEST_MERGE_ENABLED_DEFAULT);

self.snapshot_properties = snapshot_properties;
self.target_size_bytes = target_size_bytes;
self.min_count_to_merge = min_count_to_merge;
self.merge_enabled = merge_enabled;

self
}

/// Add data files to the snapshot.
pub fn add_data_files(mut self, data_files: impl IntoIterator<Item = DataFile>) -> Self {
self.added_data_files.extend(data_files);
self
}
}

#[async_trait]
impl TransactionAction for MergeAppendAction {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
let snapshot_id = if let Some(snapshot_id) = self.snapshot_id {
if table
.metadata()
.snapshots()
.any(|s| s.snapshot_id() == snapshot_id)
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Snapshot id {} already exists", snapshot_id),
));
}
snapshot_id
} else {
generate_unique_snapshot_id(table)
};

let snapshot_producer = SnapshotProducer::new(
table,
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
self.key_metadata.clone(),
self.snapshot_properties.clone(),
self.added_data_files.clone(),
self.added_delete_files.clone(),
snapshot_id,
);

// validate added files
snapshot_producer.validate_added_data_files(&self.added_data_files)?;
snapshot_producer.validate_added_data_files(&self.added_delete_files)?;

// Checks duplicate files
if self.check_duplicate {
snapshot_producer
.validate_duplicate_files(&self.added_data_files)
.await?;

snapshot_producer
.validate_duplicate_files(&self.added_delete_files)
.await?;
}

if self.merge_enabled {
let process =
MergeManifestProcess::new(self.target_size_bytes, self.min_count_to_merge);
snapshot_producer.commit(FastAppendOperation, process).await
} else {
snapshot_producer
.commit(FastAppendOperation, DefaultManifestProcess)
.await
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
15 changes: 10 additions & 5 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use std::collections::HashMap;

pub use action::*;
mod append;
pub mod remove_snapshots;
mod remove_snapshots;
mod snapshot;
mod sort_order;
mod update_location;
Expand All @@ -67,7 +67,9 @@ mod upgrade_format_version;
use std::sync::Arc;
use std::time::Duration;

pub use append::{MANIFEST_MERGE_ENABLED, MANIFEST_MIN_MERGE_COUNT, MANIFEST_TARGET_SIZE_BYTES};
use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
use remove_snapshots::RemoveSnapshotAction;

use crate::error::Result;
use crate::spec::{
Expand All @@ -77,9 +79,7 @@ use crate::spec::{
PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
};
use crate::table::Table;
use crate::transaction::action::BoxedTransactionAction;
use crate::transaction::append::FastAppendAction;
use crate::transaction::remove_snapshots::RemoveSnapshotAction;
use crate::transaction::append::{FastAppendAction, MergeAppendAction};
use crate::transaction::sort_order::ReplaceSortOrderAction;
use crate::transaction::update_location::UpdateLocationAction;
use crate::transaction::update_properties::UpdatePropertiesAction;
Expand Down Expand Up @@ -150,6 +150,11 @@ impl Transaction {
FastAppendAction::new()
}

/// Creates a merge append action.
pub fn merge_append(&self) -> MergeAppendAction {
MergeAppendAction::new()
}

/// Creates replace sort order action.
pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
ReplaceSortOrderAction::new()
Expand All @@ -169,7 +174,7 @@ impl Transaction {
pub fn update_statistics(&self) -> UpdateStatisticsAction {
UpdateStatisticsAction::new()
}

/// Commit transaction.
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
if self.actions.is_empty() {
Expand Down
Loading
Loading