Skip to content

Commit 6cdd95f

Browse files
committed
graphql: Better query cache
1 parent 536f4ff commit 6cdd95f

File tree

7 files changed

+86
-37
lines changed

7 files changed

+86
-37
lines changed

graphql/src/execution/execution.rs

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ use lazy_static::lazy_static;
55
use stable_hash::crypto::SetHasher;
66
use stable_hash::prelude::*;
77
use stable_hash::utils::stable_hash;
8-
use std::collections::{BTreeMap, HashMap, HashSet};
8+
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
99
use std::iter;
1010
use std::sync::atomic::AtomicBool;
11+
use std::sync::RwLock;
1112
use std::time::Instant;
1213

1314
use graph::prelude::*;
@@ -19,13 +20,18 @@ use crate::prelude::*;
1920
use crate::query::ast as qast;
2021
use crate::schema::ast as sast;
2122
use crate::values::coercion;
22-
use lru_time_cache::LruCache;
23-
use std::sync::Mutex;
2423

2524
type QueryHash = <SetHasher as StableHasher>::Out;
2625

26+
#[derive(Debug)]
27+
struct CacheByBlock {
28+
block: EthereumBlockPointer,
29+
cache: BTreeMap<QueryHash, BTreeMap<String, q::Value>>,
30+
}
31+
2732
lazy_static! {
2833
// Comma separated subgraph ids to cache queries for.
34+
// If `*` is present in the list, queries are cached for all subgraphs.
2935
static ref CACHED_SUBGRAPH_IDS: Vec<String> = {
3036
std::env::var("GRAPH_CACHED_SUBGRAPH_IDS")
3137
.unwrap_or_default()
@@ -34,18 +40,22 @@ lazy_static! {
3440
.collect()
3541
};
3642

37-
// Cache expiry time, 1 minute by default.
38-
static ref CACHE_EXPIRY: Duration = {
39-
std::env::var("GRAPH_CACHE_EXPIRY_SECS")
40-
.unwrap_or("60".to_string())
41-
.parse::<u64>()
42-
.map(|s| Duration::from_secs(s))
43-
.unwrap()
43+
static ref CACHE_ALL: bool = CACHED_SUBGRAPH_IDS.contains(&"*".to_string());
44+
45+
// How many blocks should be kept in the query cache. When the limit is reached, older blocks
46+
// are evicted. This should be kept small since a lookup to the cache is O(n) on this value, and
47+
// the cache memory usage also increases with larger number. Set to 0 to disable the cache,
48+
// defaults to disabled.
49+
static ref QUERY_CACHE_BLOCKS: usize = {
50+
std::env::var("GRAPH_QUERY_CACHE_BLOCKS")
51+
.unwrap_or("0".to_string())
52+
.parse::<usize>()
53+
.expect("Invalid value for GRAPH_QUERY_CACHE_BLOCKS environment variable")
4454
};
4555

46-
// This cache might serve stale data.
47-
static ref QUERY_CACHE: Mutex<LruCache<QueryHash, BTreeMap<String, q::Value>>>
48-
= Mutex::new(LruCache::with_expiry_duration(*CACHE_EXPIRY));
56+
// News blocks go on the front, so the oldest block will be at the back.
57+
// This `VecDeque` works as a ring buffer with a capacity of `QUERY_CACHE_BLOCKS`.
58+
static ref QUERY_CACHE: RwLock<VecDeque<CacheByBlock>> = RwLock::new(VecDeque::new());
4959
}
5060

5161
struct HashableQuery<'a> {
@@ -178,25 +188,34 @@ pub fn execute_root_selection_set(
178188
ctx: &ExecutionContext<impl Resolver>,
179189
selection_set: &q::SelectionSet,
180190
root_type: &s::ObjectType,
191+
block_ptr: Option<EthereumBlockPointer>,
181192
) -> Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>> {
182193
// Cache the cache key to not have to calculate it twice - once for lookup
183194
// and once for insert.
184195
let mut key: Option<QueryHash> = None;
185196

186-
if CACHED_SUBGRAPH_IDS.contains(&ctx.query.schema.id) {
187-
// Calculate the hash outside of the lock
188-
key = Some(cache_key(ctx, selection_set));
189-
190-
// Check if the response is cached.
191-
let mut cache = QUERY_CACHE.lock().unwrap();
192-
193-
// Remove expired entries.
194-
cache.notify_iter();
197+
if *CACHE_ALL || CACHED_SUBGRAPH_IDS.contains(&ctx.query.schema.id) {
198+
if let Some(block_ptr) = block_ptr {
199+
// JSONB and metadata queries use `BLOCK_NUMBER_MAX`. Ignore this case for two reasons:
200+
// - Metadata queries are not cacheable.
201+
// - Caching `BLOCK_NUMBER_MAX` would make this cache think all other blocks are old.
202+
if block_ptr.number != BLOCK_NUMBER_MAX as u64 {
203+
// Calculate the hash outside of the lock
204+
let cache_key = cache_key(ctx, selection_set);
205+
206+
// Check if the response is cached.
207+
let cache = QUERY_CACHE.read().unwrap();
208+
209+
// Iterate from the most recent block looking for a block that matches.
210+
if let Some(cache_by_block) = cache.iter().find(|c| c.block == block_ptr) {
211+
if let Some(response) = cache_by_block.cache.get(&cache_key) {
212+
ctx.cached.store(true, std::sync::atomic::Ordering::SeqCst);
213+
return Ok(response.clone());
214+
}
215+
}
195216

196-
// Peek because we want even hot entries to invalidate after the expiry period.
197-
if let Some(response) = cache.peek(key.as_ref().unwrap()) {
198-
ctx.cached.store(true, std::sync::atomic::Ordering::SeqCst);
199-
return Ok(response.clone());
217+
key = Some(cache_key);
218+
}
200219
}
201220
}
202221

@@ -244,10 +263,36 @@ pub fn execute_root_selection_set(
244263
)?);
245264
}
246265

247-
if let Some(key) = key {
248-
// Insert into the cache.
249-
let mut cache = QUERY_CACHE.lock().unwrap();
250-
cache.insert(key, values.clone());
266+
// Check if this query should be cached.
267+
if let (Some(key), Some(block_ptr)) = (key, block_ptr) {
268+
let mut cache = QUERY_CACHE.write().unwrap();
269+
270+
// If there is already a cache by the block of this query, just add it there.
271+
if let Some(cache_by_block) = cache.iter_mut().find(|c| c.block == block_ptr) {
272+
cache_by_block.cache.insert(key, values.clone());
273+
} else if *QUERY_CACHE_BLOCKS > 0 {
274+
// We're creating a new `CacheByBlock` if:
275+
// - There are none yet, this is the first query being cached, or
276+
// - `block_ptr` is of higher or equal number than the most recent block in the cache.
277+
// Otherwise this is a historical query which will not be cached.
278+
let should_insert = match cache.iter().next() {
279+
None => true,
280+
Some(highest) if highest.block.number <= block_ptr.number => true,
281+
Some(_) => false,
282+
};
283+
284+
if should_insert {
285+
if cache.len() == *QUERY_CACHE_BLOCKS {
286+
// At capacity, so pop the oldest block.
287+
cache.pop_back();
288+
}
289+
290+
cache.push_front(CacheByBlock {
291+
block: block_ptr,
292+
cache: BTreeMap::from_iter(iter::once((key, values.clone()))),
293+
});
294+
}
295+
}
251296
}
252297

253298
Ok(values)

graphql/src/query/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use graph::prelude::{info, o, Logger, QueryExecutionError};
1+
use graph::prelude::{info, o, EthereumBlockPointer, Logger, QueryExecutionError};
22
use graphql_parser::query as q;
33
use std::collections::BTreeMap;
44
use std::sync::{atomic::AtomicBool, Arc};
@@ -36,6 +36,7 @@ where
3636
pub fn execute_query<R>(
3737
query: Arc<Query>,
3838
selection_set: Option<&q::SelectionSet>,
39+
block_ptr: Option<EthereumBlockPointer>,
3940
options: QueryExecutionOptions<R>,
4041
) -> Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>>
4142
where
@@ -72,7 +73,7 @@ where
7273

7374
// Execute top-level `query { ... }` and `{ ... }` expressions.
7475
let start = Instant::now();
75-
let result = execute_root_selection_set(&ctx, selection_set, query_type);
76+
let result = execute_root_selection_set(&ctx, selection_set, query_type, block_ptr);
7677
if *graph::log::LOG_GQL_TIMING {
7778
info!(
7879
query_logger,
@@ -81,6 +82,7 @@ where
8182
"variables" => &query.variables_text,
8283
"query_time_ms" => start.elapsed().as_millis(),
8384
"cached" => ctx.cached.load(std::sync::atomic::Ordering::SeqCst),
85+
"block" => block_ptr.map(|b| b.number).unwrap_or(0),
8486
);
8587
}
8688
result

graphql/src/runner.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,12 @@ where
115115
let mut values = BTreeMap::new();
116116
let mut errors = Vec::new();
117117
for (bc, selection_set) in query.block_constraint()? {
118-
let (resolver, _block_ptr) =
118+
let (resolver, block_ptr) =
119119
StoreResolver::at_block(&self.logger, self.store.clone(), bc, &query.schema.id)?;
120120
match execute_query(
121121
query.clone(),
122122
Some(&selection_set),
123+
Some(block_ptr),
123124
QueryExecutionOptions {
124125
logger: self.logger.clone(),
125126
resolver,

graphql/src/subscription/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ async fn execute_subscription_event(
210210
// once, from flooding the blocking thread pool and the DB connection pool.
211211
let _permit = SUBSCRIPTION_QUERY_SEMAPHORE.acquire();
212212
let result = graph::spawn_blocking_allow_panic(async move {
213-
execute_root_selection_set(&ctx, &ctx.query.selection_set, &subscription_type)
213+
execute_root_selection_set(&ctx, &ctx.query.selection_set, &subscription_type, None)
214214
})
215215
.await
216216
.map_err(|e| vec![QueryExecutionError::Panic(e.to_string())])

graphql/tests/introspection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,8 +560,8 @@ fn introspection_query(schema: Schema, query: &str) -> QueryResult {
560560
max_first: std::u32::MAX,
561561
};
562562

563-
let result =
564-
PreparedQuery::new(query, None, 100).and_then(|query| execute_query(query, None, options));
563+
let result = PreparedQuery::new(query, None, 100)
564+
.and_then(|query| execute_query(query, None, None, options));
565565
QueryResult::from(result)
566566
}
567567

server/index-node/src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ where
119119
max_first: std::u32::MAX,
120120
};
121121
let result = PreparedQuery::new(query, None, 100)
122-
.and_then(|query| execute_query(query, None, options));
122+
.and_then(|query| execute_query(query, None, None, options));
123123

124124
futures03::future::ok(QueryResult::from(result))
125125
})

store/test-store/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ fn execute_subgraph_query_internal(
397397
match execute_query(
398398
query.clone(),
399399
Some(&selection_set),
400+
None,
400401
QueryExecutionOptions {
401402
logger,
402403
resolver,

0 commit comments

Comments
 (0)