Skip to content

Commit a633547

Browse files
authored
feat: cacheless amt iteration (#2189)
* feat: cacheless amt iteration * docs, comments * doc: remove flush hint * revert doc fix
1 parent 93299d5 commit a633547

File tree

3 files changed

+152
-0
lines changed

3 files changed

+152
-0
lines changed

ipld/amt/src/amt.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,4 +590,39 @@ where
590590

591591
Ok(())
592592
}
593+
594+
/// Iterates over each value in the Amt and runs a function on the values. This is a
595+
/// non-caching version of [`Self::for_each`]. It can potentially be more efficient, especially memory-wise,
596+
/// for large AMTs or when the iteration occurs only once.
597+
///
598+
/// # Examples
599+
///
600+
/// ```
601+
/// use fvm_ipld_amt::Amt;
602+
///
603+
/// let store = fvm_ipld_blockstore::MemoryBlockstore::default();
604+
///
605+
/// let mut map: Amt<String, _> = Amt::new(&store);
606+
/// map.set(1, "One".to_owned()).unwrap();
607+
/// map.set(4, "Four".to_owned()).unwrap();
608+
///
609+
/// let mut values: Vec<(u64, String)> = Vec::new();
610+
/// map.for_each_cacheless(|i, v| {
611+
/// values.push((i, v.clone()));
612+
/// Ok(())
613+
/// }).unwrap();
614+
/// assert_eq!(&values, &[(1, "One".to_owned()), (4, "Four".to_owned())]);
615+
/// ```
616+
pub fn for_each_cacheless<F>(&self, mut f: F) -> Result<(), Error>
617+
where
618+
F: FnMut(u64, &V) -> anyhow::Result<()>,
619+
{
620+
self.root.node.for_each_cacheless(
621+
&self.block_store,
622+
self.height(),
623+
self.bit_width(),
624+
0,
625+
&mut f,
626+
)
627+
}
593628
}

ipld/amt/src/node.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,52 @@ where
420420
}
421421
}
422422

423+
/// Non-caching iteration over the values in the node.
424+
pub(super) fn for_each_cacheless<S, F>(
425+
&self,
426+
bs: &S,
427+
height: u32,
428+
bit_width: u32,
429+
offset: u64,
430+
f: &mut F,
431+
) -> Result<(), Error>
432+
where
433+
F: FnMut(u64, &V) -> anyhow::Result<()>,
434+
S: Blockstore,
435+
{
436+
match self {
437+
Node::Leaf { vals } => {
438+
for (i, v) in (0..).zip(vals.iter()) {
439+
if let Some(v) = v {
440+
let _ = f(offset + i, v);
441+
}
442+
}
443+
}
444+
Node::Link { links } => {
445+
for (i, l) in (0..).zip(links.iter()) {
446+
if let Some(link) = l {
447+
let offs = offset + (i * nodes_for_height(bit_width, height));
448+
match link {
449+
Link::Dirty(sub) => {
450+
sub.for_each_cacheless(bs, height - 1, bit_width, offs, f)?;
451+
}
452+
Link::Cid { cid, cache: _ } => {
453+
let node = bs
454+
.get_cbor::<CollapsedNode<V>>(cid)?
455+
.ok_or_else(|| Error::CidNotFound(cid.to_string()))?
456+
.expand(bit_width)?;
457+
458+
node.for_each_cacheless(bs, height - 1, bit_width, offs, f)?;
459+
}
460+
}
461+
}
462+
}
463+
}
464+
}
465+
466+
Ok(())
467+
}
468+
423469
/// Returns a `(keep_going, did_mutate)` pair. `keep_going` will be `false` iff
424470
/// a closure call returned `Ok(false)`, indicating that a `break` has happened.
425471
/// `did_mutate` will be `true` iff any of the values in the node was actually

ipld/amt/tests/amt_tests.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,77 @@ fn for_each() {
390390
assert_eq!(*db.stats.borrow(), BSStats {r: 1431, w: 1431, br: 88649, bw: 88649});
391391
}
392392

393+
#[test]
394+
fn for_each_cacheless() {
395+
let mem = MemoryBlockstore::default();
396+
let db = TrackingBlockstore::new(&mem);
397+
let mut a = Amt::new(&db);
398+
399+
let mut indexes = Vec::new();
400+
for i in 0..10000 {
401+
if (i + 1) % 3 == 0 {
402+
indexes.push(i);
403+
}
404+
}
405+
406+
// Set all indices in the Amt
407+
for i in indexes.iter() {
408+
a.set(*i, tbytes(b"value")).unwrap();
409+
}
410+
411+
// Ensure all values were added into the amt
412+
for i in indexes.iter() {
413+
assert_eq!(a.get(*i).unwrap(), Some(&tbytes(b"value")));
414+
}
415+
416+
assert_eq!(a.count(), indexes.len() as u64);
417+
418+
// Iterate over amt with dirty cache. This should not touch the database at all.
419+
let mut x = 0;
420+
a.for_each_cacheless(|_, _: &BytesDe| {
421+
x += 1;
422+
Ok(())
423+
})
424+
.unwrap();
425+
#[rustfmt::skip]
426+
assert_eq!(*db.stats.borrow(), BSStats {r: 0, w: 0, br: 0, bw: 0});
427+
assert_eq!(x, indexes.len());
428+
429+
// Flush and regenerate amt
430+
let c = a.flush().unwrap();
431+
let new_amt = Amt::load(&c, &db).unwrap();
432+
assert_eq!(new_amt.count(), indexes.len() as u64);
433+
434+
let mut x = 0;
435+
new_amt
436+
.for_each_cacheless(|i, _: &BytesDe| {
437+
if i != indexes[x] {
438+
panic!(
439+
"for each cacheless found wrong index: expected {} got {}",
440+
indexes[x], i
441+
);
442+
}
443+
x += 1;
444+
Ok(())
445+
})
446+
.unwrap();
447+
assert_eq!(x, indexes.len());
448+
449+
// After 1st pass, the amount of block reads should be the same as in the caching iteration.
450+
#[rustfmt::skip]
451+
assert_eq!(*db.stats.borrow(), BSStats {r: 1431, w: 1431, br: 88649, bw: 88649});
452+
453+
new_amt.for_each_cacheless(|_, _: &BytesDe| Ok(())).unwrap();
454+
assert_eq!(
455+
c.to_string().as_str(),
456+
"bafy2bzaceanqxtbsuyhqgxubiq6vshtbhktmzp2if4g6kxzttxmzkdxmtipcm"
457+
);
458+
459+
// 2nd pass, no caching so block reads roughly doubled.
460+
#[rustfmt::skip]
461+
assert_eq!(*db.stats.borrow(), BSStats {r: 2861, w: 1431, br: 177158, bw: 88649});
462+
}
463+
393464
#[test]
394465
fn for_each_ranged() {
395466
let mem = MemoryBlockstore::default();

0 commit comments

Comments
 (0)