Skip to content

Commit 0738016

Browse files
MathieuDutSikma2bd
authored andcommitted
Remove the cached_entries from the ReentrantCollectionView. (#4628)
## Motivation Having a cache can be a good thing. But of course it can go OOM. Ideally, an LRU cache should keep track of its memory usage. Next best would be to have an LRU cache that keeps track of the number of entries or something related to the memory usage. The `cached_entries` of the `ReentrantCollectionView` had none of those things. ## Proposal The `cached_entries` is removed altogether from the `ReentrantCollectionView`. This is not a problem. The caching should be done in only one place, and that's in the LRU cache. ## Test Plan The CI. No change of API from this work. ## Release Plan It is fully possible to backport this change to the TestNet / DevNet. ## Links None.
1 parent 9eff75e commit 0738016

File tree

1 file changed

+62
-136
lines changed

1 file changed

+62
-136
lines changed

linera-views/src/views/reentrant_collection_view.rs

Lines changed: 62 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
io::Write,
88
marker::PhantomData,
99
mem,
10-
sync::{Arc, Mutex},
10+
sync::Arc,
1111
};
1212

1313
use async_lock::{RwLock, RwLockReadGuardArc, RwLockWriteGuardArc};
@@ -81,8 +81,6 @@ pub struct ReentrantByteCollectionView<C, W> {
8181
delete_storage_first: bool,
8282
/// Entries that may have staged changes.
8383
updates: BTreeMap<Vec<u8>, Update<Arc<RwLock<W>>>>,
84-
/// Entries cached in memory that have the exact same state as in the persistent storage.
85-
cached_entries: Mutex<BTreeMap<Vec<u8>, Arc<RwLock<W>>>>,
8684
}
8785

8886
impl<W, C2> ReplaceContext<C2> for ReentrantByteCollectionView<W::Context, W>
@@ -97,7 +95,6 @@ where
9795
ctx: impl FnOnce(&Self::Context) -> C2 + Clone,
9896
) -> Self::Target {
9997
let mut updates: BTreeMap<_, Update<Arc<RwLock<W::Target>>>> = BTreeMap::new();
100-
let mut cached_entries = BTreeMap::new();
10198
for (key, update) in &self.updates {
10299
let new_value = match update {
103100
Update::Removed => Update::Removed,
@@ -107,20 +104,10 @@ where
107104
};
108105
updates.insert(key.clone(), new_value);
109106
}
110-
let old_cached_entries = self.cached_entries.lock().unwrap().clone();
111-
for (key, entry) in old_cached_entries {
112-
cached_entries.insert(
113-
key,
114-
Arc::new(RwLock::new(
115-
entry.write().await.with_context(ctx.clone()).await,
116-
)),
117-
);
118-
}
119107
ReentrantByteCollectionView {
120108
context: ctx(self.context()),
121109
delete_storage_first: self.delete_storage_first,
122110
updates,
123-
cached_entries: Mutex::new(cached_entries),
124111
}
125112
}
126113
}
@@ -158,7 +145,6 @@ impl<W: View> View for ReentrantByteCollectionView<W::Context, W> {
158145
context,
159146
delete_storage_first: false,
160147
updates: BTreeMap::new(),
161-
cached_entries: Mutex::new(BTreeMap::new()),
162148
})
163149
}
164150

@@ -219,7 +205,6 @@ impl<W: View> View for ReentrantByteCollectionView<W::Context, W> {
219205
fn clear(&mut self) {
220206
self.delete_storage_first = true;
221207
self.updates.clear();
222-
self.cached_entries.get_mut().unwrap().clear();
223208
}
224209
}
225210

@@ -247,7 +232,6 @@ impl<W: ClonableView> ClonableView for ReentrantByteCollectionView<W::Context, W
247232
context: self.context.clone(),
248233
delete_storage_first: self.delete_storage_first,
249234
updates: cloned_updates,
250-
cached_entries: Mutex::new(BTreeMap::new()),
251235
}
252236
}
253237
}
@@ -305,13 +289,8 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
305289
}
306290
},
307291
Vacant(entry) => {
308-
let wrapped_view = match self.cached_entries.get_mut().unwrap().remove(short_key) {
309-
Some(view) => view,
310-
None => {
311-
Self::wrapped_view(&self.context, self.delete_storage_first, short_key)
312-
.await?
313-
}
314-
};
292+
let wrapped_view =
293+
Self::wrapped_view(&self.context, self.delete_storage_first, short_key).await?;
315294
entry.insert(Update::Set(wrapped_view.clone()));
316295
wrapped_view
317296
}
@@ -330,26 +309,15 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
330309
} else if self.delete_storage_first {
331310
None
332311
} else {
333-
let view = {
334-
let cached_entries = self.cached_entries.lock().unwrap();
335-
let view = cached_entries.get(short_key);
336-
view.cloned()
337-
};
338-
if let Some(view) = view {
339-
Some(view.clone())
312+
let key_index = self
313+
.context
314+
.base_key()
315+
.base_tag_index(KeyTag::Index as u8, short_key);
316+
if self.context.store().contains_key(&key_index).await? {
317+
let view = Self::wrapped_view(&self.context, false, short_key).await?;
318+
Some(view)
340319
} else {
341-
let key_index = self
342-
.context
343-
.base_key()
344-
.base_tag_index(KeyTag::Index as u8, short_key);
345-
if self.context.store().contains_key(&key_index).await? {
346-
let view = Self::wrapped_view(&self.context, false, short_key).await?;
347-
let mut cached_entries = self.cached_entries.lock().unwrap();
348-
cached_entries.insert(short_key.to_owned(), view.clone());
349-
Some(view)
350-
} else {
351-
None
352-
}
320+
None
353321
}
354322
})
355323
}
@@ -438,8 +406,6 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
438406
}
439407
} else if self.delete_storage_first {
440408
false
441-
} else if self.cached_entries.lock().unwrap().contains_key(short_key) {
442-
true
443409
} else {
444410
let key_index = self
445411
.context
@@ -468,7 +434,6 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
468434
/// # })
469435
/// ```
470436
pub fn remove_entry(&mut self, short_key: Vec<u8>) {
471-
self.cached_entries.get_mut().unwrap().remove(&short_key);
472437
if self.delete_storage_first {
473438
// Optimization: No need to mark `short_key` for deletion as we are going to remove all the keys at once.
474439
self.updates.remove(&short_key);
@@ -508,7 +473,6 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
508473
let view = Arc::new(RwLock::new(view));
509474
let view = Update::Set(view);
510475
self.updates.insert(short_key.to_vec(), view);
511-
self.cached_entries.get_mut().unwrap().remove(short_key);
512476
Ok(())
513477
}
514478

@@ -546,7 +510,6 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
546510
&mut self,
547511
short_keys: Vec<Vec<u8>>,
548512
) -> Result<Vec<WriteGuardedView<W>>, ViewError> {
549-
let cached_entries = self.cached_entries.get_mut().unwrap();
550513
let mut short_keys_to_load = Vec::new();
551514
let mut keys = Vec::new();
552515
for short_key in &short_keys {
@@ -561,17 +524,13 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
561524
let view = W::new(context)?;
562525
let view = Arc::new(RwLock::new(view));
563526
entry.insert(Update::Set(view));
564-
cached_entries.remove(short_key);
565527
}
566528
}
567529
btree_map::Entry::Vacant(entry) => {
568530
if self.delete_storage_first {
569-
cached_entries.remove(short_key);
570531
let view = W::new(context)?;
571532
let view = Arc::new(RwLock::new(view));
572533
entry.insert(Update::Set(view));
573-
} else if let Some(view) = cached_entries.remove(short_key) {
574-
entry.insert(Update::Set(view));
575534
} else {
576535
keys.extend(W::pre_load(&context)?);
577536
short_keys_to_load.push(short_key.to_vec());
@@ -639,23 +598,18 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
639598
let mut keys_to_check = Vec::new();
640599
let mut keys_to_check_metadata = Vec::new();
641600

642-
{
643-
let cached_entries = self.cached_entries.lock().unwrap();
644-
for (position, short_key) in short_keys.into_iter().enumerate() {
645-
if let Some(update) = self.updates.get(&short_key) {
646-
if let Update::Set(view) = update {
647-
results[position] = Some((short_key, view.clone()));
648-
}
649-
} else if let Some(view) = cached_entries.get(&short_key) {
601+
for (position, short_key) in short_keys.into_iter().enumerate() {
602+
if let Some(update) = self.updates.get(&short_key) {
603+
if let Update::Set(view) = update {
650604
results[position] = Some((short_key, view.clone()));
651-
} else if !self.delete_storage_first {
652-
let key_index = self
653-
.context
654-
.base_key()
655-
.base_tag_index(KeyTag::Index as u8, &short_key);
656-
keys_to_check.push(key_index);
657-
keys_to_check_metadata.push((position, short_key));
658605
}
606+
} else if !self.delete_storage_first {
607+
let key_index = self
608+
.context
609+
.base_key()
610+
.base_tag_index(KeyTag::Index as u8, &short_key);
611+
keys_to_check.push(key_index);
612+
keys_to_check_metadata.push((position, short_key));
659613
}
660614
}
661615

@@ -683,13 +637,11 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
683637
.store()
684638
.read_multi_values_bytes(keys_to_load)
685639
.await?;
686-
let mut cached_entries = self.cached_entries.lock().unwrap();
687640
for (loaded_values, (position, short_key, context)) in
688641
values.chunks_exact(W::NUM_INIT_KEYS).zip(entries_to_load)
689642
{
690643
let view = W::post_load(context, loaded_values)?;
691644
let wrapped_view = Arc::new(RwLock::new(view));
692-
cached_entries.insert(short_key.clone(), wrapped_view.clone());
693645
results[position] = Some((short_key, wrapped_view));
694646
}
695647
}
@@ -727,51 +679,48 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
727679
&self,
728680
) -> Result<Vec<(Vec<u8>, ReadGuardedView<W>)>, ViewError> {
729681
let short_keys = self.keys().await?;
682+
let mut loaded_views = vec![None; short_keys.len()];
683+
684+
// Load views that are not in updates and not deleted
730685
if !self.delete_storage_first {
731686
let mut keys = Vec::new();
732-
let mut short_keys_to_load = Vec::new();
733-
{
734-
let cached_entries = self.cached_entries.lock().unwrap();
735-
for short_key in &short_keys {
736-
if !self.updates.contains_key(short_key)
737-
&& !cached_entries.contains_key(short_key)
738-
{
739-
let key = self
740-
.context
741-
.base_key()
742-
.base_tag_index(KeyTag::Subview as u8, short_key);
743-
let context = self.context.clone_with_base_key(key);
744-
keys.extend(W::pre_load(&context)?);
745-
short_keys_to_load.push(short_key.to_vec());
746-
}
747-
}
748-
}
749-
let values = self.context.store().read_multi_values_bytes(keys).await?;
750-
{
751-
let mut cached_entries = self.cached_entries.lock().unwrap();
752-
for (loaded_values, short_key) in values
753-
.chunks_exact(W::NUM_INIT_KEYS)
754-
.zip(short_keys_to_load)
755-
{
687+
let mut short_keys_and_indexes = Vec::new();
688+
for (index, short_key) in short_keys.iter().enumerate() {
689+
if !self.updates.contains_key(short_key) {
756690
let key = self
757691
.context
758692
.base_key()
759-
.base_tag_index(KeyTag::Subview as u8, &short_key);
693+
.base_tag_index(KeyTag::Subview as u8, short_key);
760694
let context = self.context.clone_with_base_key(key);
761-
let view = W::post_load(context, loaded_values)?;
762-
let wrapped_view = Arc::new(RwLock::new(view));
763-
cached_entries.insert(short_key.to_vec(), wrapped_view);
695+
keys.extend(W::pre_load(&context)?);
696+
short_keys_and_indexes.push((short_key.to_vec(), index));
764697
}
765698
}
699+
let values = self.context.store().read_multi_values_bytes(keys).await?;
700+
for (loaded_values, (short_key, index)) in values
701+
.chunks_exact(W::NUM_INIT_KEYS)
702+
.zip(short_keys_and_indexes)
703+
{
704+
let key = self
705+
.context
706+
.base_key()
707+
.base_tag_index(KeyTag::Subview as u8, &short_key);
708+
let context = self.context.clone_with_base_key(key);
709+
let view = W::post_load(context, loaded_values)?;
710+
let wrapped_view = Arc::new(RwLock::new(view));
711+
loaded_views[index] = Some(wrapped_view);
712+
}
766713
}
767-
let cached_entries = self.cached_entries.lock().unwrap();
714+
715+
// Create result from updates and loaded views
768716
short_keys
769717
.into_iter()
770-
.map(|short_key| {
718+
.zip(loaded_views)
719+
.map(|(short_key, loaded_view)| {
771720
let view = if let Some(Update::Set(view)) = self.updates.get(&short_key) {
772721
view.clone()
773-
} else if let Some(view) = cached_entries.get(&short_key) {
774-
view.clone()
722+
} else if let Some(view) = loaded_view {
723+
view
775724
} else {
776725
unreachable!("All entries should have been loaded into memory");
777726
};
@@ -808,24 +757,19 @@ impl<W: View> ReentrantByteCollectionView<W::Context, W> {
808757
if !self.delete_storage_first {
809758
let mut keys = Vec::new();
810759
let mut short_keys_to_load = Vec::new();
811-
{
812-
let cached_entries = self.cached_entries.get_mut().unwrap();
813-
for short_key in &short_keys {
814-
if !self.updates.contains_key(short_key) {
815-
if let Some(view) = cached_entries.remove(short_key) {
816-
self.updates.insert(short_key.to_vec(), Update::Set(view));
817-
} else {
818-
let key = self
819-
.context
820-
.base_key()
821-
.base_tag_index(KeyTag::Subview as u8, short_key);
822-
let context = self.context.clone_with_base_key(key);
823-
keys.extend(W::pre_load(&context)?);
824-
short_keys_to_load.push(short_key.to_vec());
825-
}
826-
}
760+
761+
for short_key in &short_keys {
762+
if !self.updates.contains_key(short_key) {
763+
let key = self
764+
.context
765+
.base_key()
766+
.base_tag_index(KeyTag::Subview as u8, short_key);
767+
let context = self.context.clone_with_base_key(key);
768+
keys.extend(W::pre_load(&context)?);
769+
short_keys_to_load.push(short_key.to_vec());
827770
}
828771
}
772+
829773
let values = self.context.store().read_multi_values_bytes(keys).await?;
830774
for (loaded_values, short_key) in values
831775
.chunks_exact(W::NUM_INIT_KEYS)
@@ -1023,7 +967,6 @@ impl<W: HashableView> HashableView for ReentrantByteCollectionView<W::Context, W
1023967
let keys = self.keys().await?;
1024968
let count = keys.len() as u32;
1025969
hasher.update_with_bcs_bytes(&count)?;
1026-
let cached_entries = self.cached_entries.get_mut().unwrap();
1027970
for key in keys {
1028971
hasher.update_with_bytes(&key)?;
1029972
let hash = if let Some(entry) = self.updates.get_mut(&key) {
@@ -1034,11 +977,6 @@ impl<W: HashableView> HashableView for ReentrantByteCollectionView<W::Context, W
1034977
.try_write_arc()
1035978
.ok_or_else(|| ViewError::TryLockError(key))?;
1036979
view.hash_mut().await?
1037-
} else if let Some(view) = cached_entries.get_mut(&key) {
1038-
let mut view = view
1039-
.try_write_arc()
1040-
.ok_or_else(|| ViewError::TryLockError(key))?;
1041-
view.hash_mut().await?
1042980
} else {
1043981
let key = self
1044982
.context
@@ -1060,14 +998,7 @@ impl<W: HashableView> HashableView for ReentrantByteCollectionView<W::Context, W
1060998
let keys = self.keys().await?;
1061999
let count = keys.len() as u32;
10621000
hasher.update_with_bcs_bytes(&count)?;
1063-
let mut cached_entries_result = Vec::new();
1064-
{
1065-
let cached_entries = self.cached_entries.lock().unwrap();
1066-
for key in &keys {
1067-
cached_entries_result.push(cached_entries.get(key).cloned());
1068-
}
1069-
}
1070-
for (key, cached_entry) in keys.into_iter().zip(cached_entries_result) {
1001+
for key in keys {
10711002
hasher.update_with_bytes(&key)?;
10721003
let hash = if let Some(entry) = self.updates.get(&key) {
10731004
let Update::Set(view) = entry else {
@@ -1077,11 +1008,6 @@ impl<W: HashableView> HashableView for ReentrantByteCollectionView<W::Context, W
10771008
.try_read_arc()
10781009
.ok_or_else(|| ViewError::TryLockError(key))?;
10791010
view.hash().await?
1080-
} else if let Some(view) = cached_entry {
1081-
let view = view
1082-
.try_read_arc()
1083-
.ok_or_else(|| ViewError::TryLockError(key))?;
1084-
view.hash().await?
10851011
} else {
10861012
let key = self
10871013
.context

0 commit comments

Comments
 (0)