Skip to content

Commit ecc6016

Browse files
committed
graphql: Simplify herd cache
1 parent fff620c commit ecc6016

File tree

6 files changed

+76
-183
lines changed

6 files changed

+76
-183
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

graphql/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ uuid = { version = "0.8.1", features = ["v4"] }
1414
lru_time_cache = "0.10"
1515
stable-hash = { git = "https://github.com/graphprotocol/stable-hash" }
1616
once_cell = "1.4.0"
17+
defer = "0.1"
1718

1819
[dev-dependencies]
1920
pretty_assertions = "0.6.1"

graphql/src/execution/cache.rs

Lines changed: 32 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -3,73 +3,16 @@ use once_cell::sync::OnceCell;
33
use stable_hash::crypto::SetHasher;
44
use stable_hash::prelude::*;
55
use std::collections::hash_map::Entry;
6-
use std::collections::{HashMap, VecDeque};
7-
use std::ops::Deref;
8-
use std::sync::{Arc, Condvar, Mutex, Weak};
6+
use std::collections::HashMap;
7+
use std::sync::{Arc, Condvar, Mutex};
98

109
type Hash = <SetHasher as StableHasher>::Out;
1110

12-
/// A queue of items which (may) have expired from the cache.
13-
/// This is kept separate to avoid circular references. The way
14-
/// the code is implemented ensure that this does not grow without
15-
/// bound, and generally cleanup stays ahead of insertion.
16-
#[derive(Default, Clone, Debug)]
17-
struct CleanupQueue {
18-
inner: Arc<Mutex<VecDeque<Hash>>>,
19-
}
20-
21-
impl CleanupQueue {
22-
/// Schedule an item for cleanup later
23-
fn push(&self, value: Hash) {
24-
let mut inner = self.inner.lock().unwrap();
25-
inner.push_back(value);
26-
}
27-
/// Take an item to clean up. The consumer MUST
28-
/// deal with this without fail or memory will leak.
29-
fn pop(&self) -> Option<Hash> {
30-
let mut inner = self.inner.lock().unwrap();
31-
inner.pop_front()
32-
}
33-
}
34-
35-
// Implemented on top of Arc, so this is ok.
36-
impl CheapClone for CleanupQueue {}
37-
38-
/// A handle to a cached item. As long as this handle is kept alive,
39-
/// the value remains in the cache.
40-
///
41-
/// The cached value may not be immediately available when used.
42-
/// In this case this will block until the value is available.
43-
#[derive(Debug)]
44-
pub struct CachedResponse<R> {
45-
inner: Arc<CacheEntryInner<R>>,
46-
}
47-
48-
impl<R> Deref for CachedResponse<R> {
49-
type Target = R;
50-
fn deref(&self) -> &R {
51-
self.inner.wait()
52-
}
53-
}
54-
55-
// Manual impl required because of generic parameter.
56-
impl<R> Clone for CachedResponse<R> {
57-
fn clone(&self) -> Self {
58-
Self {
59-
inner: self.inner.clone(),
60-
}
61-
}
62-
}
63-
64-
// Ok, because implemented on top of Arc
65-
impl<R> CheapClone for CachedResponse<R> {}
66-
6711
/// The 'true' cache entry that lives inside the Arc.
6812
/// When the last Arc is dropped, this is dropped,
6913
/// and the cache is removed.
7014
#[derive(Debug)]
7115
struct CacheEntryInner<R> {
72-
cleanup: CleanupQueue,
7316
hash: Hash,
7417
// Considered using once_cell::sync::Lazy,
7518
// but that quickly becomes a mess of generics
@@ -83,9 +26,8 @@ struct CacheEntryInner<R> {
8326
}
8427

8528
impl<R> CacheEntryInner<R> {
86-
fn new(hash: Hash, cleanup: &CleanupQueue) -> Arc<Self> {
29+
fn new(hash: Hash) -> Arc<Self> {
8730
Arc::new(Self {
88-
cleanup: cleanup.cheap_clone(),
8931
hash,
9032
result: OnceCell::new(),
9133
condvar: Condvar::new(),
@@ -133,13 +75,6 @@ impl<R> CacheEntryInner<R> {
13375
}
13476
}
13577

136-
/// Once the last reference is removed, schedule for cleanup in the cache.
137-
impl<R> Drop for CacheEntryInner<R> {
138-
fn drop(&mut self) {
139-
self.cleanup.push(self.hash);
140-
}
141-
}
142-
14378
/// On drop, call set_panic on self.value,
14479
/// unless set was called.
14580
struct PanicHelper<R> {
@@ -165,79 +100,60 @@ impl<R> PanicHelper<R> {
165100
}
166101
}
167102

168-
/// Cache that keeps a result around as long as it is still in use somewhere.
103+
/// Cache that keeps a result around as long as it is still being processed.
169104
/// The cache ensures that the query is not re-entrant, so multiple consumers
170105
/// of identical queries will not execute them in parallel.
171106
///
172107
/// This has a lot in common with AsyncCache in the network-services repo,
173108
/// but is sync instead of async, and more specialized.
174109
pub struct QueryCache<R> {
175-
cleanup: CleanupQueue,
176-
cache: Arc<Mutex<HashMap<Hash, Weak<CacheEntryInner<R>>>>>,
110+
cache: Arc<Mutex<HashMap<Hash, Arc<CacheEntryInner<R>>>>>,
177111
}
178112

179-
impl<R> QueryCache<R> {
113+
impl<R: Clone> QueryCache<R> {
180114
pub fn new() -> Self {
181115
Self {
182-
cleanup: CleanupQueue::default(),
183116
cache: Arc::new(Mutex::new(HashMap::new())),
184117
}
185118
}
119+
186120
/// Assumption: Whatever F is passed in consistently returns the same
187121
/// value for any input - for all values of F used with this Cache.
188-
pub fn cached_query<F: FnOnce() -> R>(&self, hash: Hash, f: F) -> CachedResponse<R> {
189-
// This holds it's own lock so make sure that this happens outside of
190-
// holding any other lock.
191-
let cleanup = self.cleanup.pop();
192-
193-
let mut cache = self.cache.lock().unwrap();
194-
195-
// Execute the amortized cleanup step, checking that the content is
196-
// still missing since it may have been re-inserted. By always cleaning
197-
// up one item before potentially inserting another item we ensure that
198-
// the memory usage stays bounded. There is no need to stay ahead of
199-
// this work, because this step doesn't actually free any real memory,
200-
// it just ensures the memory doesn't grow unnecessarily when inserting.
201-
if let Some(cleanup) = cleanup {
202-
if let Entry::Occupied(entry) = cache.entry(cleanup) {
203-
if entry.get().strong_count() == 0 {
204-
entry.remove_entry();
122+
pub fn cached_query<F: FnOnce() -> R>(&self, hash: Hash, f: F) -> R {
123+
let work = {
124+
let mut cache = self.cache.lock().unwrap();
125+
126+
// Try to pull the item out of the cache and return it.
127+
// If we get past this expr, it means this thread will do
128+
// the work and fullfil that 'promise' in this work variable.
129+
match cache.entry(hash) {
130+
Entry::Occupied(entry) => {
131+
// Another thread is doing the work, release the lock and wait for it.
132+
let entry = entry.get().cheap_clone();
133+
drop(cache);
134+
return entry.wait().clone();
205135
}
206-
}
207-
}
208-
209-
// Try to pull the item out of the cache and return it.
210-
// If we get past this expr, it means this thread will do
211-
// the work and fullfil that 'promise' in this work variable.
212-
let work = match cache.entry(hash) {
213-
Entry::Occupied(mut entry) => {
214-
// Cache hit!
215-
if let Some(cached) = entry.get().upgrade() {
216-
return CachedResponse { inner: cached };
136+
Entry::Vacant(entry) => {
137+
let uncached = CacheEntryInner::new(hash);
138+
entry.insert(uncached.clone());
139+
uncached
217140
}
218-
// Need to re-add to cache
219-
let uncached = CacheEntryInner::new(hash, &self.cleanup);
220-
*entry.get_mut() = Arc::downgrade(&uncached);
221-
uncached
222-
}
223-
Entry::Vacant(entry) => {
224-
let uncached = CacheEntryInner::new(hash, &self.cleanup);
225-
entry.insert(Arc::downgrade(&uncached));
226-
uncached
227141
}
228142
};
229143

230-
// Don't hold the lock.
231-
drop(cache);
144+
defer::defer(|| {
145+
// Remove this from the list of in-flight work.
146+
self.cache.lock().unwrap().remove(&hash);
147+
});
232148

233149
// Now that we have taken on the responsibility, propagate panics to
234150
// make sure that no threads wait forever on a result that will never
235151
// come.
236152
let work = PanicHelper::new(work);
237153

238-
// After all that ceremony, this part is easy enough.
239-
CachedResponse {
240-
inner: work.set(f()),
241-
}
154+
// Actually compute the value and then share it with waiters.
155+
let value = f();
156+
work.set(value.clone());
157+
value
242158
}
243159
}

graphql/src/execution/execution.rs

Lines changed: 41 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::cache::{CachedResponse, QueryCache};
1+
use super::cache::QueryCache;
22
use graph::prelude::CheapClone;
33
use graphql_parser::query as q;
44
use graphql_parser::schema as s;
@@ -9,7 +9,6 @@ use stable_hash::prelude::*;
99
use stable_hash::utils::stable_hash;
1010
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
1111
use std::iter;
12-
use std::ops::Deref;
1312
use std::sync::atomic::{AtomicBool, Ordering};
1413
use std::sync::RwLock;
1514
use std::time::Instant;
@@ -33,7 +32,7 @@ struct CacheByBlock {
3332
block: EthereumBlockPointer,
3433
max_weight: usize,
3534
weight: usize,
36-
cache: HashMap<QueryHash, CachedResponse<QueryResponse>>,
35+
cache: HashMap<QueryHash, QueryResponse>,
3736
}
3837

3938
impl CacheByBlock {
@@ -47,14 +46,13 @@ impl CacheByBlock {
4746
}
4847

4948
/// Returns `true` if the insert was successful or `false` if the cache was full.
50-
fn insert(&mut self, key: QueryHash, value: &CachedResponse<QueryResponse>) -> bool {
49+
fn insert(&mut self, key: QueryHash, value: QueryResponse) -> bool {
5150
// Unwrap: We never try to insert errors into this cache.
52-
let weight = value.deref().as_ref().ok().unwrap().weight();
53-
51+
let weight = value.as_ref().unwrap().weight();
5452
let fits_in_cache = self.weight + weight <= self.max_weight;
5553
if fits_in_cache {
5654
self.weight += weight;
57-
self.cache.insert(key, value.cheap_clone());
55+
self.cache.insert(key, value);
5856
}
5957
fits_in_cache
6058
}
@@ -100,21 +98,6 @@ lazy_static! {
10098
static ref QUERY_HERD_CACHE: QueryCache<QueryResponse> = QueryCache::new();
10199
}
102100

103-
pub enum MaybeCached<T> {
104-
NotCached(T),
105-
Cached(CachedResponse<T>),
106-
}
107-
108-
impl<T: Clone> MaybeCached<T> {
109-
// Note that this drops any handle to the cache that may exist.
110-
pub fn to_inner(self) -> T {
111-
match self {
112-
MaybeCached::NotCached(t) => t,
113-
MaybeCached::Cached(t) => t.deref().clone(),
114-
}
115-
}
116-
}
117-
118101
struct HashableQuery<'a> {
119102
query_schema_id: &'a SubgraphDeploymentId,
120103
query_variables: &'a HashMap<q::Name, q::Value>,
@@ -317,7 +300,7 @@ pub fn execute_root_selection_set(
317300
selection_set: &q::SelectionSet,
318301
root_type: &s::ObjectType,
319302
block_ptr: Option<EthereumBlockPointer>,
320-
) -> MaybeCached<QueryResponse> {
303+
) -> QueryResponse {
321304
// Cache the cache key to not have to calculate it twice - once for lookup
322305
// and once for insert.
323306
let mut key: Option<QueryHash> = None;
@@ -337,7 +320,7 @@ pub fn execute_root_selection_set(
337320
// Iterate from the most recent block looking for a block that matches.
338321
if let Some(cache_by_block) = cache.iter().find(|c| c.block == block_ptr) {
339322
if let Some(response) = cache_by_block.cache.get(&cache_key) {
340-
return MaybeCached::Cached(response.cheap_clone());
323+
return response.clone();
341324
}
342325
}
343326

@@ -347,51 +330,47 @@ pub fn execute_root_selection_set(
347330
}
348331

349332
let result = if let Some(key) = key {
350-
let cached = QUERY_HERD_CACHE.cached_query(key, || {
333+
QUERY_HERD_CACHE.cached_query(key, || {
351334
execute_root_selection_set_uncached(ctx, selection_set, root_type)
352-
});
353-
MaybeCached::Cached(cached)
335+
})
354336
} else {
355-
let not_cached = execute_root_selection_set_uncached(ctx, selection_set, root_type);
356-
MaybeCached::NotCached(not_cached)
337+
execute_root_selection_set_uncached(ctx, selection_set, root_type)
357338
};
358339

359340
// Check if this query should be cached.
360-
if let (MaybeCached::Cached(cached), Some(key), Some(block_ptr)) = (&result, key, block_ptr) {
361-
// Share errors from the herd cache, but don't store them in generational cache.
362-
// In particular, there is a problem where asking for a block pointer beyond the chain
363-
// head can cause the legitimate cache to be thrown out.
364-
if cached.is_ok() {
365-
let mut cache = QUERY_CACHE.write().unwrap();
366-
367-
// If there is already a cache by the block of this query, just add it there.
368-
if let Some(cache_by_block) = cache.iter_mut().find(|c| c.block == block_ptr) {
369-
let cache_insert = cache_by_block.insert(key, cached);
370-
ctx.cache_insert.store(cache_insert, Ordering::SeqCst);
371-
} else if *QUERY_CACHE_BLOCKS > 0 {
372-
// We're creating a new `CacheByBlock` if:
373-
// - There are none yet, this is the first query being cached, or
374-
// - `block_ptr` is of higher or equal number than the most recent block in the cache.
375-
// Otherwise this is a historical query which will not be cached.
376-
let should_insert = match cache.iter().next() {
377-
None => true,
378-
Some(highest) if highest.block.number <= block_ptr.number => true,
379-
Some(_) => false,
380-
};
381-
382-
if should_insert {
383-
if cache.len() == *QUERY_CACHE_BLOCKS {
384-
// At capacity, so pop the oldest block.
385-
cache.pop_back();
386-
}
341+
// Share errors from the herd cache, but don't store them in generational cache.
342+
// In particular, there is a problem where asking for a block pointer beyond the chain
343+
// head can cause the legitimate cache to be thrown out.
344+
if let (Ok(_), Some(key), Some(block_ptr)) = (&result, key, block_ptr) {
345+
let mut cache = QUERY_CACHE.write().unwrap();
346+
347+
// If there is already a cache by the block of this query, just add it there.
348+
if let Some(cache_by_block) = cache.iter_mut().find(|c| c.block == block_ptr) {
349+
let cache_insert = cache_by_block.insert(key, result.clone());
350+
ctx.cache_insert.store(cache_insert, Ordering::SeqCst);
351+
} else if *QUERY_CACHE_BLOCKS > 0 {
352+
// We're creating a new `CacheByBlock` if:
353+
// - There are none yet, this is the first query being cached, or
354+
// - `block_ptr` is of higher or equal number than the most recent block in the cache.
355+
// Otherwise this is a historical query which will not be cached.
356+
let should_insert = match cache.iter().next() {
357+
None => true,
358+
Some(highest) if highest.block.number <= block_ptr.number => true,
359+
Some(_) => false,
360+
};
387361

388-
// Create a new cache by block, insert this entry, and add it to the QUERY_CACHE.
389-
let max_weight = *QUERY_CACHE_MAX_MEM / *QUERY_CACHE_BLOCKS;
390-
let mut cache_by_block = CacheByBlock::new(block_ptr, max_weight);
391-
let cache_insert = cache_by_block.insert(key, cached);
392-
ctx.cache_insert.store(cache_insert, Ordering::SeqCst);
393-
cache.push_front(cache_by_block);
362+
if should_insert {
363+
if cache.len() == *QUERY_CACHE_BLOCKS {
364+
// At capacity, so pop the oldest block.
365+
cache.pop_back();
394366
}
367+
368+
// Create a new cache by block, insert this entry, and add it to the QUERY_CACHE.
369+
let max_weight = *QUERY_CACHE_MAX_MEM / *QUERY_CACHE_BLOCKS;
370+
let mut cache_by_block = CacheByBlock::new(block_ptr, max_weight);
371+
let cache_insert = cache_by_block.insert(key, result.clone());
372+
ctx.cache_insert.store(cache_insert, Ordering::SeqCst);
373+
cache.push_front(cache_by_block);
395374
}
396375
}
397376
}

graphql/src/query/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,5 +102,5 @@ where
102102
"complexity" => &query.complexity
103103
);
104104
}
105-
result.to_inner()
105+
result
106106
}

0 commit comments

Comments
 (0)