Skip to content

Commit 17a85d0

Browse files
committed
feat: Add write-through cache for manifest list
1 parent dc34928 commit 17a85d0

File tree

4 files changed

+99
-0
lines changed

4 files changed

+99
-0
lines changed

crates/iceberg/src/io/object_cache.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ impl ObjectCache {
8383
}
8484
}
8585

86+
/// Return whether cache is enabled.
87+
pub(crate) fn enabled(&self) -> bool {
88+
!self.cache_disabled
89+
}
90+
8691
/// Retrieves an Arc [`Manifest`] from the cache
8792
/// or retrieves one from FileIO and parses it if not present
8893
pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> {
@@ -118,6 +123,20 @@ impl ObjectCache {
118123
}
119124
}
120125

126+
/// Set a [`ManifestList`] to the cache, if duplicate key already exist, new value will be ignored.
127+
/// No-op if cache disabled.
128+
pub(crate) async fn set_manifest_list(
129+
&self,
130+
cache_key: CachedObjectKey,
131+
manifest_list: Arc<ManifestList>,
132+
) {
133+
if self.cache_disabled {
134+
return;
135+
}
136+
let cached_item = CachedItem::ManifestList(manifest_list);
137+
self.cache.insert(cache_key, cached_item).await;
138+
}
139+
121140
/// Retrieves an Arc [`ManifestList`] from the cache
122141
/// or retrieves one from FileIO and parses it if not present
123142
pub(crate) async fn get_manifest_list(

crates/iceberg/src/spec/manifest_list.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
use std::collections::HashMap;
2121
use std::str::FromStr;
22+
use std::sync::Arc;
2223

2324
use apache_avro::types::Value;
2425
use apache_avro::{Reader, Writer, from_value};
@@ -30,6 +31,7 @@ use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEM
3031
use self::_serde::{ManifestFileV1, ManifestFileV2};
3132
use super::{FormatVersion, Manifest};
3233
use crate::error::Result;
34+
use crate::io::object_cache::{CachedObjectKey, ObjectCache};
3335
use crate::io::{FileIO, OutputFile};
3436
use crate::{Error, ErrorKind};
3537

@@ -90,6 +92,9 @@ pub struct ManifestListWriter {
9092
avro_writer: Writer<'static, Vec<u8>>,
9193
sequence_number: i64,
9294
snapshot_id: i64,
95+
/// Optional write-through cache, should be assigned before all writes.
96+
buffered_manifest_file_entries: Vec<ManifestFile>,
97+
cache: Option<(CachedObjectKey, Arc<ObjectCache>)>,
9398
}
9499

95100
impl std::fmt::Debug for ManifestListWriter {
@@ -168,11 +173,43 @@ impl ManifestListWriter {
168173
avro_writer,
169174
sequence_number,
170175
snapshot_id,
176+
buffered_manifest_file_entries: Vec::new(),
177+
cache: None,
171178
}
172179
}
173180

181+
/// Set write-through cache.
182+
///
183+
/// Notice:
184+
/// - Cache, if enabled, should be assigned before all write operations.
185+
/// - Cache cannot be repeatedly assigned, otherwise return `Unexpected` error status.
186+
pub(crate) fn set_cache(
187+
&mut self,
188+
key: CachedObjectKey,
189+
cache: Arc<ObjectCache>,
190+
) -> Result<()> {
191+
if self.cache.is_some() {
192+
return Err(Error::new(
193+
ErrorKind::Unexpected,
194+
format!("Cannot repeatedly assign object cache to manifest list writer."),
195+
));
196+
}
197+
self.cache = Some((key, cache));
198+
Ok(())
199+
}
200+
174201
/// Append manifests to be written.
175202
pub fn add_manifests(&mut self, manifests: impl Iterator<Item = ManifestFile>) -> Result<()> {
203+
let manifests: Vec<ManifestFile> = manifests.collect();
204+
205+
// Buffer manifest file entries if write-through cache enabled.
206+
if self.cache.is_some() {
207+
for cur_manifest_file in &manifests {
208+
self.buffered_manifest_file_entries
209+
.push(cur_manifest_file.clone());
210+
}
211+
}
212+
176213
match self.format_version {
177214
FormatVersion::V1 => {
178215
for manifest in manifests {
@@ -220,6 +257,15 @@ impl ManifestListWriter {
220257
let mut writer = self.output_file.writer().await?;
221258
writer.write(Bytes::from(data)).await?;
222259
writer.close().await?;
260+
261+
// Place manifest list into object cache if enabled.
262+
if let Some((key, cache)) = self.cache {
263+
let manifest_list = Arc::new(ManifestList {
264+
entries: self.buffered_manifest_file_entries,
265+
});
266+
cache.set_manifest_list(key, manifest_list).await;
267+
}
268+
223269
Ok(())
224270
}
225271
}

crates/iceberg/src/transaction/append.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,5 +334,24 @@ mod tests {
334334
manifest.entries()[0].snapshot_id().unwrap()
335335
);
336336
assert_eq!(data_file, *manifest.entries()[0].data_file());
337+
338+
// Check object cache status.
339+
// - First we delete the manifest list file
340+
// - Load from cache and compare with the expected, to make sure it's indeed cache instead of being loaded ad-hoc.
341+
let manifest_list_filepath = new_snapshot.manifest_list().to_string();
342+
table
343+
.file_io()
344+
.delete(manifest_list_filepath)
345+
.await
346+
.unwrap();
347+
348+
let snapshot_ref = Arc::new(new_snapshot.clone());
349+
let table_metadata_ref = &table.metadata_ref();
350+
let loaded_manifest_list = table
351+
.object_cache()
352+
.get_manifest_list(&snapshot_ref, table_metadata_ref)
353+
.await
354+
.unwrap();
355+
assert_eq!(loaded_manifest_list.entries().len(), 1);
337356
}
338357
}

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::ops::RangeFrom;
2222
use uuid::Uuid;
2323

2424
use crate::error::Result;
25+
use crate::io::object_cache::CachedObjectKey;
2526
use crate::spec::{
2627
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry,
2728
ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation,
@@ -409,6 +410,20 @@ impl<'a> SnapshotProducer<'a> {
409410
next_seq_num,
410411
),
411412
};
413+
414+
let object_cache = self.table.object_cache();
415+
if object_cache.enabled() {
416+
let format_version = self.table.metadata().format_version();
417+
let schema_id = self.table.metadata().current_schema_id();
418+
let cache_key = CachedObjectKey::ManifestList((
419+
manifest_list_path.clone(),
420+
format_version,
421+
schema_id,
422+
));
423+
let cache = self.table.object_cache();
424+
manifest_list_writer.set_cache(cache_key, cache)?;
425+
}
426+
412427
manifest_list_writer.add_manifests(new_manifests.into_iter())?;
413428
manifest_list_writer.close().await?;
414429

0 commit comments

Comments
 (0)