Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ impl ObjectCache {
}
}

/// Return whether cache is enabled.
pub(crate) fn enabled(&self) -> bool {
!self.cache_disabled
}

/// Retrieves an Arc [`Manifest`] from the cache
/// or retrieves one from FileIO and parses it if not present
pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> {
Expand Down Expand Up @@ -118,6 +123,20 @@ impl ObjectCache {
}
}

/// Set a [`ManifestList`] to the cache, if duplicate key already exist, new value will be ignored.
/// No-op if cache disabled.
pub(crate) async fn set_manifest_list(
&self,
cache_key: CachedObjectKey,
manifest_list: Arc<ManifestList>,
) {
if self.cache_disabled {
return;
}
let cached_item = CachedItem::ManifestList(manifest_list);
self.cache.insert(cache_key, cached_item).await;
}

/// Retrieves an Arc [`ManifestList`] from the cache
/// or retrieves one from FileIO and parses it if not present
pub(crate) async fn get_manifest_list(
Expand Down
46 changes: 46 additions & 0 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use apache_avro::types::Value;
use apache_avro::{Reader, Writer, from_value};
Expand All @@ -30,6 +31,7 @@ use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEM
use self::_serde::{ManifestFileV1, ManifestFileV2};
use super::{FormatVersion, Manifest};
use crate::error::Result;
use crate::io::object_cache::{CachedObjectKey, ObjectCache};
use crate::io::{FileIO, OutputFile};
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -90,6 +92,9 @@ pub struct ManifestListWriter {
avro_writer: Writer<'static, Vec<u8>>,
sequence_number: i64,
snapshot_id: i64,
/// Optional write-through cache, should be assigned before all writes.
buffered_manifest_file_entries: Vec<ManifestFile>,
cache: Option<(CachedObjectKey, Arc<ObjectCache>)>,
}

impl std::fmt::Debug for ManifestListWriter {
Expand Down Expand Up @@ -168,11 +173,43 @@ impl ManifestListWriter {
avro_writer,
sequence_number,
snapshot_id,
buffered_manifest_file_entries: Vec::new(),
cache: None,
}
}

/// Set write-through cache.
///
/// Notice:
/// - Cache, if enabled, should be assigned before all write operations.
/// - Cache cannot be repeatedly assigned, otherwise return `Unexpected` error status.
pub(crate) fn set_cache(
&mut self,
key: CachedObjectKey,
cache: Arc<ObjectCache>,
) -> Result<()> {
if self.cache.is_some() {
return Err(Error::new(
ErrorKind::Unexpected,
"Cannot repeatedly assign object cache to manifest list writer.".to_string(),
));
}
self.cache = Some((key, cache));
Ok(())
}

/// Append manifests to be written.
pub fn add_manifests(&mut self, manifests: impl Iterator<Item = ManifestFile>) -> Result<()> {
let manifests: Vec<ManifestFile> = manifests.collect();

// Buffer manifest file entries if write-through cache enabled.
if self.cache.is_some() {
for cur_manifest_file in &manifests {
self.buffered_manifest_file_entries
.push(cur_manifest_file.clone());
}
}

match self.format_version {
FormatVersion::V1 => {
for manifest in manifests {
Expand Down Expand Up @@ -220,6 +257,15 @@ impl ManifestListWriter {
let mut writer = self.output_file.writer().await?;
writer.write(Bytes::from(data)).await?;
writer.close().await?;

// Place manifest list into object cache if enabled.
if let Some((key, cache)) = self.cache {
let manifest_list = Arc::new(ManifestList {
entries: self.buffered_manifest_file_entries,
});
cache.set_manifest_list(key, manifest_list).await;
}

Ok(())
}
}
Expand Down
19 changes: 19 additions & 0 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,5 +334,24 @@ mod tests {
manifest.entries()[0].snapshot_id().unwrap()
);
assert_eq!(data_file, *manifest.entries()[0].data_file());

// Check object cache status.
// - First we delete the manifest list file
// - Load from cache and compare with the expected, to make sure it's indeed cache instead of being loaded ad-hoc.
let manifest_list_filepath = new_snapshot.manifest_list().to_string();
table
.file_io()
.delete(manifest_list_filepath)
.await
.unwrap();

let snapshot_ref = Arc::new(new_snapshot.clone());
let table_metadata_ref = &table.metadata_ref();
let loaded_manifest_list = table
.object_cache()
.get_manifest_list(&snapshot_ref, table_metadata_ref)
.await
.unwrap();
assert_eq!(loaded_manifest_list.entries().len(), 1);
}
}
15 changes: 15 additions & 0 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::ops::RangeFrom;
use uuid::Uuid;

use crate::error::Result;
use crate::io::object_cache::CachedObjectKey;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation,
Expand Down Expand Up @@ -409,6 +410,20 @@ impl<'a> SnapshotProducer<'a> {
next_seq_num,
),
};

let object_cache = self.table.object_cache();
if object_cache.enabled() {
let format_version = self.table.metadata().format_version();
let schema_id = self.table.metadata().current_schema_id();
let cache_key = CachedObjectKey::ManifestList((
manifest_list_path.clone(),
format_version,
schema_id,
));
let cache = self.table.object_cache();
manifest_list_writer.set_cache(cache_key, cache)?;
}

manifest_list_writer.add_manifests(new_manifests.into_iter())?;
manifest_list_writer.close().await?;

Expand Down
Loading