Skip to content

Commit 98e39bb

Browse files
committed
graph, graphql, server, store: Added a query herd cache
1 parent d74c030 commit 98e39bb

File tree

10 files changed

+407
-87
lines changed

10 files changed

+407
-87
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

graph/src/data/query/error.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,29 @@ use std::collections::HashMap;
77
use std::error::Error;
88
use std::fmt;
99
use std::string::FromUtf8Error;
10+
use std::sync::Arc;
1011

1112
use crate::components::store::StoreError;
1213
use crate::data::graphql::SerializableValue;
1314
use crate::data::subgraph::*;
1415

15-
/// Error caused while executing a [Query](struct.Query.html).
1616
#[derive(Debug)]
17+
pub struct CloneableFailureError(Arc<failure::Error>);
18+
19+
impl Clone for CloneableFailureError {
20+
fn clone(&self) -> Self {
21+
Self(self.0.clone())
22+
}
23+
}
24+
25+
impl From<failure::Error> for CloneableFailureError {
26+
fn from(f: failure::Error) -> Self {
27+
Self(Arc::new(f))
28+
}
29+
}
30+
31+
/// Error caused while executing a [Query](struct.Query.html).
32+
#[derive(Debug, Clone)]
1733
pub enum QueryExecutionError {
1834
OperationNameRequired,
1935
OperationNotFound(String),
@@ -45,7 +61,7 @@ pub enum QueryExecutionError {
4561
ValueParseError(String, String),
4662
AttributeTypeError(String, String),
4763
EntityParseError(String),
48-
StoreError(failure::Error),
64+
StoreError(CloneableFailureError),
4965
Timeout,
5066
EmptySelectionSet(String),
5167
AmbiguousDerivedFromResult(Pos, String, String, String),
@@ -173,7 +189,7 @@ impl fmt::Display for QueryExecutionError {
173189
write!(f, "Broken entity found in store: {}", s)
174190
}
175191
StoreError(e) => {
176-
write!(f, "Store error: {}", e)
192+
write!(f, "Store error: {}", e.0)
177193
}
178194
Timeout => write!(f, "Query timed out"),
179195
EmptySelectionSet(entity_type) => {
@@ -239,7 +255,7 @@ impl From<bigdecimal::ParseBigDecimalError> for QueryExecutionError {
239255

240256
impl From<StoreError> for QueryExecutionError {
241257
fn from(e: StoreError) -> Self {
242-
QueryExecutionError::StoreError(e.into())
258+
QueryExecutionError::StoreError(CloneableFailureError(Arc::new(e.into())))
243259
}
244260
}
245261

graphql/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ lazy_static = "1.2.0"
1313
uuid = { version = "0.8.1", features = ["v4"] }
1414
lru_time_cache = "0.10"
1515
stable-hash = { git = "https://github.com/graphprotocol/stable-hash" }
16+
once_cell = "1.4.0"
1617

1718
[dev-dependencies]
1819
pretty_assertions = "0.6.1"

graphql/src/execution/cache.rs

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
use graph::prelude::CheapClone;
2+
use once_cell::sync::OnceCell;
3+
use stable_hash::crypto::SetHasher;
4+
use stable_hash::prelude::*;
5+
use std::collections::hash_map::Entry;
6+
use std::collections::{HashMap, VecDeque};
7+
use std::ops::Deref;
8+
use std::sync::{Arc, Condvar, Mutex, Weak};
9+
10+
type Hash = <SetHasher as StableHasher>::Out;
11+
12+
/// A queue of items which (may) have expired from the cache.
13+
/// This is kept separate to avoid circular references. The way
14+
/// the code is implemented ensure that this does not grow without
15+
/// bound, and generally cleanup stays ahead of insertion.
16+
#[derive(Default, Clone, Debug)]
17+
struct CleanupQueue {
18+
inner: Arc<Mutex<VecDeque<Hash>>>,
19+
}
20+
21+
impl CleanupQueue {
22+
/// Schedule an item for cleanup later
23+
fn push(&self, value: Hash) {
24+
let mut inner = self.inner.lock().unwrap();
25+
inner.push_back(value);
26+
}
27+
/// Take an item to clean up. The consumer MUST
28+
/// deal with this without fail or memory will leak.
29+
fn pop(&self) -> Option<Hash> {
30+
let mut inner = self.inner.lock().unwrap();
31+
inner.pop_front()
32+
}
33+
}
34+
35+
// Implemented on top of Arc, so this is ok.
36+
impl CheapClone for CleanupQueue {}
37+
38+
/// A handle to a cached item. As long as this handle is kept alive,
39+
/// the value remains in the cache.
40+
///
41+
/// The cached value may not be immediately available when used.
42+
/// In this case this will block until the value is available.
43+
#[derive(Debug)]
44+
pub struct CachedResponse<R> {
45+
inner: Arc<CacheEntryInner<R>>,
46+
}
47+
48+
impl<R> Deref for CachedResponse<R> {
49+
type Target = R;
50+
fn deref(&self) -> &R {
51+
self.inner.wait()
52+
}
53+
}
54+
55+
// Manual impl required because of generic parameter.
56+
impl<R> Clone for CachedResponse<R> {
57+
fn clone(&self) -> Self {
58+
Self {
59+
inner: self.inner.clone(),
60+
}
61+
}
62+
}
63+
64+
// Ok, because implemented on top of Arc
65+
impl<R> CheapClone for CachedResponse<R> {}
66+
67+
/// The 'true' cache entry that lives inside the Arc.
68+
/// When the last Arc is dropped, this is dropped,
69+
/// and the cache is removed.
70+
#[derive(Debug)]
71+
struct CacheEntryInner<R> {
72+
cleanup: CleanupQueue,
73+
hash: Hash,
74+
// Considered using once_cell::sync::Lazy,
75+
// but that quickly becomes a mess of generics
76+
// or runs into the issue that Box<dyn FnOnce> can't be
77+
// called at all, so doesn't impl FnOnce as Lazy requires.
78+
result: OnceCell<Option<R>>,
79+
80+
// Temporary to implement OnceCell.wait
81+
condvar: Condvar,
82+
lock: Mutex<bool>,
83+
}
84+
85+
impl<R> CacheEntryInner<R> {
86+
fn new(hash: Hash, cleanup: &CleanupQueue) -> Arc<Self> {
87+
Arc::new(Self {
88+
cleanup: cleanup.cheap_clone(),
89+
hash,
90+
result: OnceCell::new(),
91+
condvar: Condvar::new(),
92+
lock: Mutex::new(false),
93+
})
94+
}
95+
96+
fn set_inner(&self, value: Option<R>) {
97+
// Store the cached value
98+
self.result
99+
.set(value)
100+
.unwrap_or_else(|_| panic!("Cache set should only be called once"));
101+
// Wakeup consumers of the cache
102+
let mut is_set = self.lock.lock().unwrap();
103+
*is_set = true;
104+
self.condvar.notify_all();
105+
}
106+
107+
fn set(&self, value: R) {
108+
self.set_inner(Some(value));
109+
}
110+
111+
fn set_panic(&self) {
112+
self.set_inner(None);
113+
}
114+
115+
fn wait(&self) -> &R {
116+
// Happy path - already cached.
117+
if let Some(r) = self.result.get() {
118+
match r.as_ref() {
119+
Some(r) => r,
120+
// TODO: Instead of having an Option,
121+
// retain panic information and propagate it.
122+
None => panic!("Query panicked"),
123+
}
124+
} else {
125+
// Wait for the item to be placed in the cache.
126+
let mut is_set = self.lock.lock().unwrap();
127+
while !*is_set {
128+
is_set = self.condvar.wait(is_set).unwrap();
129+
}
130+
131+
self.wait()
132+
}
133+
}
134+
}
135+
136+
/// Once the last reference is removed, schedule for cleanup in the cache.
137+
impl<R> Drop for CacheEntryInner<R> {
138+
fn drop(&mut self) {
139+
self.cleanup.push(self.hash);
140+
}
141+
}
142+
143+
/// On drop, call set_panic on self.value,
144+
/// unless set was called.
145+
struct PanicHelper<R> {
146+
value: Option<Arc<CacheEntryInner<R>>>,
147+
}
148+
149+
impl<R> Drop for PanicHelper<R> {
150+
fn drop(&mut self) {
151+
if let Some(inner) = self.value.take() {
152+
inner.set_panic();
153+
}
154+
}
155+
}
156+
157+
impl<R> PanicHelper<R> {
158+
fn new(value: Arc<CacheEntryInner<R>>) -> Self {
159+
Self { value: Some(value) }
160+
}
161+
fn set(mut self, r: R) -> Arc<CacheEntryInner<R>> {
162+
let value = self.value.take().unwrap();
163+
value.set(r);
164+
value
165+
}
166+
}
167+
168+
/// Cache that keeps a result around as long as it is still in use somewhere.
169+
/// The cache ensures that the query is not re-entrant, so multiple consumers
170+
/// of identical queries will not execute them in parallel.
171+
///
172+
/// This has a lot in common with AsyncCache in the network-services repo,
173+
/// but is sync instead of async, and more specialized.
174+
pub struct QueryCache<R> {
175+
cleanup: CleanupQueue,
176+
cache: Arc<Mutex<HashMap<Hash, Weak<CacheEntryInner<R>>>>>,
177+
}
178+
179+
impl<R> QueryCache<R> {
180+
pub fn new() -> Self {
181+
Self {
182+
cleanup: CleanupQueue::default(),
183+
cache: Arc::new(Mutex::new(HashMap::new())),
184+
}
185+
}
186+
/// Assumption: Whatever F is passed in consistently returns the same
187+
/// value for any input - for all values of F used with this Cache.
188+
pub fn cached_query<F: FnOnce() -> R>(&self, hash: Hash, f: F) -> CachedResponse<R> {
189+
// This holds it's own lock so make sure that this happens outside of
190+
// holding any other lock.
191+
let cleanup = self.cleanup.pop();
192+
193+
let mut cache = self.cache.lock().unwrap();
194+
195+
// Execute the amortized cleanup step, checking that the content is
196+
// still missing since it may have been re-inserted. By always cleaning
197+
// up one item before potentially inserting another item we ensure that
198+
// the memory usage stays bounded. There is no need to stay ahead of
199+
// this work, because this step doesn't actually free any real memory,
200+
// it just ensures the memory doesn't grow unnecessarily when inserting.
201+
if let Some(cleanup) = cleanup {
202+
if let Entry::Occupied(entry) = cache.entry(cleanup) {
203+
if entry.get().strong_count() == 0 {
204+
entry.remove_entry();
205+
}
206+
}
207+
}
208+
209+
// Try to pull the item out of the cache and return it.
210+
// If we get past this expr, it means this thread will do
211+
// the work and fullfil that 'promise' in this work variable.
212+
let work = match cache.entry(hash) {
213+
Entry::Occupied(mut entry) => {
214+
// Cache hit!
215+
if let Some(cached) = entry.get().upgrade() {
216+
return CachedResponse { inner: cached };
217+
}
218+
// Need to re-add to cache
219+
let uncached = CacheEntryInner::new(hash, &self.cleanup);
220+
*entry.get_mut() = Arc::downgrade(&uncached);
221+
uncached
222+
}
223+
Entry::Vacant(entry) => {
224+
let uncached = CacheEntryInner::new(hash, &self.cleanup);
225+
entry.insert(Arc::downgrade(&uncached));
226+
uncached
227+
}
228+
};
229+
230+
// Don't hold the lock.
231+
drop(cache);
232+
233+
// Now that we have taken on the responsibility, propagate panics to
234+
// make sure that no threads wait forever on a result that will never
235+
// come.
236+
let work = PanicHelper::new(work);
237+
238+
// After all that ceremony, this part is easy enough.
239+
CachedResponse {
240+
inner: work.set(f()),
241+
}
242+
}
243+
}

0 commit comments

Comments
 (0)