diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index a23ff36b32..8572994cb7 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -83,6 +83,11 @@ impl ObjectCache { } } + /// Return whether cache is enabled. + pub(crate) fn enabled(&self) -> bool { + !self.cache_disabled + } + /// Retrieves an Arc [`Manifest`] from the cache /// or retrieves one from FileIO and parses it if not present pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result> { @@ -118,6 +123,20 @@ impl ObjectCache { } } + /// Set a [`ManifestList`] to the cache, if duplicate key already exist, new value will be ignored. + /// No-op if cache disabled. + pub(crate) async fn set_manifest_list( + &self, + cache_key: CachedObjectKey, + manifest_list: Arc, + ) { + if self.cache_disabled { + return; + } + let cached_item = CachedItem::ManifestList(manifest_list); + self.cache.insert(cache_key, cached_item).await; + } + /// Retrieves an Arc [`ManifestList`] from the cache /// or retrieves one from FileIO and parses it if not present pub(crate) async fn get_manifest_list( diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 43808bb249..3778b3ce78 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::str::FromStr; +use std::sync::Arc; use apache_avro::types::Value; use apache_avro::{Reader, Writer, from_value}; @@ -30,6 +31,7 @@ use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEM use self::_serde::{ManifestFileV1, ManifestFileV2}; use super::{FormatVersion, Manifest}; use crate::error::Result; +use crate::io::object_cache::{CachedObjectKey, ObjectCache}; use crate::io::{FileIO, OutputFile}; use crate::{Error, ErrorKind}; @@ -90,6 +92,9 @@ pub struct ManifestListWriter { avro_writer: Writer<'static, Vec>, sequence_number: i64, snapshot_id: i64, + /// Optional write-through cache, should be assigned before all writes. + buffered_manifest_file_entries: Vec, + cache: Option<(CachedObjectKey, Arc)>, } impl std::fmt::Debug for ManifestListWriter { @@ -168,11 +173,43 @@ impl ManifestListWriter { avro_writer, sequence_number, snapshot_id, + buffered_manifest_file_entries: Vec::new(), + cache: None, } } + /// Set write-through cache. + /// + /// Notice: + /// - Cache, if enabled, should be assigned before all write operations. + /// - Cache cannot be repeatedly assigned, otherwise return `Unexpected` error status. + pub(crate) fn set_cache( + &mut self, + key: CachedObjectKey, + cache: Arc, + ) -> Result<()> { + if self.cache.is_some() { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot repeatedly assign object cache to manifest list writer.".to_string(), + )); + } + self.cache = Some((key, cache)); + Ok(()) + } + /// Append manifests to be written. pub fn add_manifests(&mut self, manifests: impl Iterator) -> Result<()> { + let manifests: Vec = manifests.collect(); + + // Buffer manifest file entries if write-through cache enabled. + if self.cache.is_some() { + for cur_manifest_file in &manifests { + self.buffered_manifest_file_entries + .push(cur_manifest_file.clone()); + } + } + match self.format_version { FormatVersion::V1 => { for manifest in manifests { @@ -220,6 +257,15 @@ impl ManifestListWriter { let mut writer = self.output_file.writer().await?; writer.write(Bytes::from(data)).await?; writer.close().await?; + + // Place manifest list into object cache if enabled. + if let Some((key, cache)) = self.cache { + let manifest_list = Arc::new(ManifestList { + entries: self.buffered_manifest_file_entries, + }); + cache.set_manifest_list(key, manifest_list).await; + } + Ok(()) } } diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index f248543df2..3ac630ab45 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -334,5 +334,24 @@ mod tests { manifest.entries()[0].snapshot_id().unwrap() ); assert_eq!(data_file, *manifest.entries()[0].data_file()); + + // Check object cache status. + // - First we delete the manifest list file + // - Load from cache and compare with the expected, to make sure it's indeed cache instead of being loaded ad-hoc. + let manifest_list_filepath = new_snapshot.manifest_list().to_string(); + table + .file_io() + .delete(manifest_list_filepath) + .await + .unwrap(); + + let snapshot_ref = Arc::new(new_snapshot.clone()); + let table_metadata_ref = &table.metadata_ref(); + let loaded_manifest_list = table + .object_cache() + .get_manifest_list(&snapshot_ref, table_metadata_ref) + .await + .unwrap(); + assert_eq!(loaded_manifest_list.entries().len(), 1); } } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 48dc2b5b90..ea19743e10 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -22,6 +22,7 @@ use std::ops::RangeFrom; use uuid::Uuid; use crate::error::Result; +use crate::io::object_cache::CachedObjectKey; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, @@ -409,6 +410,20 @@ impl<'a> SnapshotProducer<'a> { next_seq_num, ), }; + + let object_cache = self.table.object_cache(); + if object_cache.enabled() { + let format_version = self.table.metadata().format_version(); + let schema_id = self.table.metadata().current_schema_id(); + let cache_key = CachedObjectKey::ManifestList(( + manifest_list_path.clone(), + format_version, + schema_id, + )); + let cache = self.table.object_cache(); + manifest_list_writer.set_cache(cache_key, cache)?; + } + manifest_list_writer.add_manifests(new_manifests.into_iter())?; manifest_list_writer.close().await?;