Skip to content

Commit 8f6ecca

Browse files
committed
graphql: Wrap responses in an Arc
1 parent ecc6016 commit 8f6ecca

File tree

26 files changed

+266
-179
lines changed

26 files changed

+266
-179
lines changed

graph/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ num-bigint = { version = "^0.2.6", features = ["serde"] }
3333
num-traits = "0.2"
3434
rand = "0.6.1"
3535
semver = "0.10.0"
36-
serde = "1.0"
36+
serde = { version = "1.0", features = ["rc"] }
3737
serde_derive = "1.0"
3838
serde_json = { version = "1.0", features = ["arbitrary_precision"] }
3939
serde_yaml = "0.8"

graph/src/components/graphql.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use graphql_parser::query as q;
1212
use std::sync::Arc;
1313

1414
/// Future for query results.
15-
pub type QueryResultFuture = Box<dyn Future<Item = QueryResult, Error = QueryError> + Send>;
15+
pub type QueryResultFuture = Box<dyn Future<Item = Arc<QueryResult>, Error = QueryError> + Send>;
1616

1717
/// Future for subscription results.
1818
pub type SubscriptionResultFuture =
@@ -42,6 +42,8 @@ pub trait GraphQlRunner: Send + Sync + 'static {
4242
.await
4343
.map_err(move |e| format_err!("Failed to query metadata: {}", e))
4444
.and_then(move |result| {
45+
// Metadata queries are not cached.
46+
let result = Arc::try_unwrap(result).unwrap();
4547
if result.errors.is_some() {
4648
Err(format_err!("Failed to query metadata: {:?}", result.errors))
4749
} else {

graph/src/data/query/error.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -263,19 +263,23 @@ impl From<StoreError> for QueryExecutionError {
263263
#[derive(Debug)]
264264
pub enum QueryError {
265265
EncodingError(FromUtf8Error),
266-
ParseError(q::ParseError),
266+
ParseError(failure::Error),
267267
ExecutionError(QueryExecutionError),
268268
}
269269

270-
impl From<FromUtf8Error> for QueryError {
271-
fn from(e: FromUtf8Error) -> Self {
272-
QueryError::EncodingError(e)
270+
impl Clone for QueryError {
271+
fn clone(&self) -> Self {
272+
match self {
273+
QueryError::EncodingError(e) => QueryError::EncodingError(e.clone()),
274+
QueryError::ParseError(e) => QueryError::ParseError(failure::err_msg(e.to_string())),
275+
QueryError::ExecutionError(e) => QueryError::ExecutionError(e.clone()),
276+
}
273277
}
274278
}
275279

276-
impl From<q::ParseError> for QueryError {
277-
fn from(e: q::ParseError) -> Self {
278-
QueryError::ParseError(e)
280+
impl From<FromUtf8Error> for QueryError {
281+
fn from(e: FromUtf8Error) -> Self {
282+
QueryError::EncodingError(e)
279283
}
280284
}
281285

graph/src/data/query/result.rs

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ where
1313
}
1414

1515
/// The result of running a query, if successful.
16-
#[derive(Debug, Serialize)]
16+
#[derive(Debug, Clone, Serialize)]
1717
pub struct QueryResult {
1818
#[serde(
1919
skip_serializing_if = "Option::is_none",
@@ -30,17 +30,54 @@ pub struct QueryResult {
3030
}
3131

3232
impl QueryResult {
33+
/// A result with an empty object as the data.
34+
pub fn empty() -> Self {
35+
QueryResult {
36+
data: Some(q::Value::Object(BTreeMap::new())),
37+
errors: None,
38+
extensions: None,
39+
}
40+
}
41+
3342
pub fn new(data: Option<q::Value>) -> Self {
3443
QueryResult {
3544
data,
3645
errors: None,
3746
extensions: None,
3847
}
3948
}
49+
4050
pub fn with_extensions(mut self, extensions: BTreeMap<q::Name, q::Value>) -> Self {
4151
self.extensions = Some(q::Value::Object(extensions));
4252
self
4353
}
54+
55+
pub fn has_errors(&self) -> bool {
56+
return self.errors.is_some();
57+
}
58+
59+
pub fn append(&mut self, mut other: QueryResult) {
60+
match (&mut self.data, &mut other.data) {
61+
(Some(q::Value::Object(ours)), Some(q::Value::Object(other))) => ours.append(other),
62+
63+
// Subgraph queries always return objects.
64+
(Some(_), Some(_)) => unreachable!(),
65+
66+
// Only one side has data, use that.
67+
_ => self.data = self.data.take().or(other.data),
68+
}
69+
70+
match (&mut self.errors, &mut other.errors) {
71+
(Some(ours), Some(other)) => ours.append(other),
72+
73+
// Only one side has errors, use that.
74+
_ => self.errors = self.errors.take().or(other.errors),
75+
}
76+
77+
// Currently we don't used extensions, the desired behaviour for merging them is tbd.
78+
assert!(self.extensions.is_none());
79+
assert!(other.extensions.is_none());
80+
}
4481
}
4582

4683
impl From<QueryExecutionError> for QueryResult {
@@ -61,20 +98,17 @@ impl From<Vec<QueryExecutionError>> for QueryResult {
6198
}
6299
}
63100

64-
impl From<Result<q::Value, Vec<QueryExecutionError>>> for QueryResult {
65-
fn from(result: Result<q::Value, Vec<QueryExecutionError>>) -> Self {
66-
match result {
67-
Ok(v) => QueryResult::new(Some(v)),
68-
Err(errors) => QueryResult::from(errors),
69-
}
101+
impl From<BTreeMap<String, q::Value>> for QueryResult {
102+
fn from(val: BTreeMap<String, q::Value>) -> Self {
103+
QueryResult::new(Some(q::Value::Object(val)))
70104
}
71105
}
72106

73-
impl From<Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>>> for QueryResult {
74-
fn from(result: Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>>) -> Self {
107+
impl<V: Into<QueryResult>, E: Into<QueryResult>> From<Result<V, E>> for QueryResult {
108+
fn from(result: Result<V, E>) -> Self {
75109
match result {
76-
Ok(v) => QueryResult::new(Some(q::Value::Object(v))),
77-
Err(errors) => QueryResult::from(errors),
110+
Ok(v) => v.into(),
111+
Err(e) => e.into(),
78112
}
79113
}
80114
}

graph/src/data/subscription/result.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use crate::prelude::QueryResult;
22
use std::marker::Unpin;
3+
use std::sync::Arc;
34

45
/// A stream of query results for a subscription.
5-
pub type QueryResultStream = Box<dyn futures03::stream::Stream<Item = QueryResult> + Send + Unpin>;
6+
pub type QueryResultStream =
7+
Box<dyn futures03::stream::Stream<Item = Arc<QueryResult>> + Send + Unpin>;
68

79
/// The result of running a subscription, if successful.
810
pub type SubscriptionResult = QueryResultStream;

graphql/src/execution/cache.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ pub struct QueryCache<R> {
110110
cache: Arc<Mutex<HashMap<Hash, Arc<CacheEntryInner<R>>>>>,
111111
}
112112

113-
impl<R: Clone> QueryCache<R> {
113+
impl<R: CheapClone> QueryCache<R> {
114114
pub fn new() -> Self {
115115
Self {
116116
cache: Arc::new(Mutex::new(HashMap::new())),
@@ -131,7 +131,7 @@ impl<R: Clone> QueryCache<R> {
131131
// Another thread is doing the work, release the lock and wait for it.
132132
let entry = entry.get().cheap_clone();
133133
drop(cache);
134-
return entry.wait().clone();
134+
return entry.wait().cheap_clone();
135135
}
136136
Entry::Vacant(entry) => {
137137
let uncached = CacheEntryInner::new(hash);
@@ -153,7 +153,7 @@ impl<R: Clone> QueryCache<R> {
153153

154154
// Actually compute the value and then share it with waiters.
155155
let value = f();
156-
work.set(value.clone());
156+
work.set(value.cheap_clone());
157157
value
158158
}
159159
}

graphql/src/execution/execution.rs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ use crate::values::coercion;
2525

2626
type QueryHash = <SetHasher as StableHasher>::Out;
2727

28-
type QueryResponse = Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>>;
28+
pub(crate) type QueryResponse = Result<BTreeMap<String, q::Value>, Vec<QueryExecutionError>>;
2929

3030
#[derive(Debug)]
3131
struct CacheByBlock {
3232
block: EthereumBlockPointer,
3333
max_weight: usize,
3434
weight: usize,
35-
cache: HashMap<QueryHash, QueryResponse>,
35+
cache: HashMap<QueryHash, Arc<QueryResult>>,
3636
}
3737

3838
impl CacheByBlock {
@@ -46,9 +46,10 @@ impl CacheByBlock {
4646
}
4747

4848
/// Returns `true` if the insert was successful or `false` if the cache was full.
49-
fn insert(&mut self, key: QueryHash, value: QueryResponse) -> bool {
50-
// Unwrap: We never try to insert errors into this cache.
51-
let weight = value.as_ref().unwrap().weight();
49+
fn insert(&mut self, key: QueryHash, value: Arc<QueryResult>) -> bool {
50+
// We never try to insert errors into this cache, and always resolve some value.
51+
assert!(value.errors.is_none());
52+
let weight = value.data.as_ref().unwrap().weight();
5253
let fits_in_cache = self.weight + weight <= self.max_weight;
5354
if fits_in_cache {
5455
self.weight += weight;
@@ -95,7 +96,7 @@ lazy_static! {
9596

9697
// This `VecDeque` works as a ring buffer with a capacity of `QUERY_CACHE_BLOCKS`.
9798
static ref QUERY_CACHE: RwLock<VecDeque<CacheByBlock>> = RwLock::new(VecDeque::new());
98-
static ref QUERY_HERD_CACHE: QueryCache<QueryResponse> = QueryCache::new();
99+
static ref QUERY_HERD_CACHE: QueryCache<Arc<QueryResult>> = QueryCache::new();
99100
}
100101

101102
struct HashableQuery<'a> {
@@ -181,7 +182,7 @@ where
181182
pub query: Arc<crate::execution::Query>,
182183

183184
/// The resolver to use.
184-
pub resolver: Arc<R>,
185+
pub resolver: R,
185186

186187
/// Time at which the query times out.
187188
pub deadline: Option<Instant>,
@@ -230,7 +231,7 @@ where
230231

231232
ExecutionContext {
232233
logger: self.logger.cheap_clone(),
233-
resolver: Arc::new(introspection_resolver),
234+
resolver: introspection_resolver,
234235
query: self.query.as_introspection_query(),
235236
deadline: self.deadline,
236237
max_first: std::u32::MAX,
@@ -295,17 +296,17 @@ pub fn execute_root_selection_set_uncached(
295296
}
296297

297298
/// Executes the root selection set of a query.
298-
pub fn execute_root_selection_set(
299-
ctx: &ExecutionContext<impl Resolver>,
299+
pub fn execute_root_selection_set<R: Resolver>(
300+
ctx: &ExecutionContext<R>,
300301
selection_set: &q::SelectionSet,
301302
root_type: &s::ObjectType,
302303
block_ptr: Option<EthereumBlockPointer>,
303-
) -> QueryResponse {
304+
) -> Arc<QueryResult> {
304305
// Cache the cache key to not have to calculate it twice - once for lookup
305306
// and once for insert.
306307
let mut key: Option<QueryHash> = None;
307308

308-
if *CACHE_ALL || CACHED_SUBGRAPH_IDS.contains(&ctx.query.schema.id) {
309+
if R::CACHEABLE && (*CACHE_ALL || CACHED_SUBGRAPH_IDS.contains(&ctx.query.schema.id)) {
309310
if let Some(block_ptr) = block_ptr {
310311
// JSONB and metadata queries use `BLOCK_NUMBER_MAX`. Ignore this case for two reasons:
311312
// - Metadata queries are not cacheable.
@@ -331,22 +332,30 @@ pub fn execute_root_selection_set(
331332

332333
let result = if let Some(key) = key {
333334
QUERY_HERD_CACHE.cached_query(key, || {
334-
execute_root_selection_set_uncached(ctx, selection_set, root_type)
335+
Arc::new(QueryResult::from(execute_root_selection_set_uncached(
336+
ctx,
337+
selection_set,
338+
root_type,
339+
)))
335340
})
336341
} else {
337-
execute_root_selection_set_uncached(ctx, selection_set, root_type)
342+
Arc::new(QueryResult::from(execute_root_selection_set_uncached(
343+
ctx,
344+
selection_set,
345+
root_type,
346+
)))
338347
};
339348

340349
// Check if this query should be cached.
341350
// Share errors from the herd cache, but don't store them in generational cache.
342351
// In particular, there is a problem where asking for a block pointer beyond the chain
343352
// head can cause the legitimate cache to be thrown out.
344-
if let (Ok(_), Some(key), Some(block_ptr)) = (&result, key, block_ptr) {
353+
if let (false, Some(key), Some(block_ptr)) = (result.has_errors(), key, block_ptr) {
345354
let mut cache = QUERY_CACHE.write().unwrap();
346355

347356
// If there is already a cache by the block of this query, just add it there.
348357
if let Some(cache_by_block) = cache.iter_mut().find(|c| c.block == block_ptr) {
349-
let cache_insert = cache_by_block.insert(key, result.clone());
358+
let cache_insert = cache_by_block.insert(key, result.cheap_clone());
350359
ctx.cache_insert.store(cache_insert, Ordering::SeqCst);
351360
} else if *QUERY_CACHE_BLOCKS > 0 {
352361
// We're creating a new `CacheByBlock` if:
@@ -368,7 +377,7 @@ pub fn execute_root_selection_set(
368377
// Create a new cache by block, insert this entry, and add it to the QUERY_CACHE.
369378
let max_weight = *QUERY_CACHE_MAX_MEM / *QUERY_CACHE_BLOCKS;
370379
let mut cache_by_block = CacheByBlock::new(block_ptr, max_weight);
371-
let cache_insert = cache_by_block.insert(key, result.clone());
380+
let cache_insert = cache_by_block.insert(key, result.cheap_clone());
372381
ctx.cache_insert.store(cache_insert, Ordering::SeqCst);
373382
cache.push_front(cache_by_block);
374383
}

graphql/src/execution/query.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,8 @@ impl Query {
122122
Ok(Arc::new(query))
123123
}
124124

125-
/// Return the block constraint for the toplevel query field(s) Since,
126-
/// syntactically, each toplevel field can have its own block constraint,
127-
/// we check that they are all identical and report an error otherwise
125+
/// Return the block constraint for the toplevel query field(s), merging the
126+
/// selection sets of fields that have the same block constraint.
128127
pub fn block_constraint(
129128
&self,
130129
) -> Result<HashMap<BlockConstraint, q::SelectionSet>, Vec<QueryExecutionError>> {

graphql/src/execution/resolver.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ impl<'a> ObjectOrInterface<'a> {
6161
}
6262

6363
/// A GraphQL resolver that can resolve entities, enum values, scalar types and interfaces/unions.
64-
pub trait Resolver: Clone + Send + Sync {
64+
pub trait Resolver: Sized + Send + Sync {
65+
const CACHEABLE: bool;
66+
6567
/// Prepare for executing a query by prefetching as much data as possible
6668
fn prefetch(
6769
&self,

graphql/src/introspection/resolver.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,10 @@ impl<'a> IntrospectionResolver<'a> {
348348

349349
/// A GraphQL resolver that can resolve entities, enum values, scalar types and interfaces/unions.
350350
impl<'a> Resolver for IntrospectionResolver<'a> {
351+
// `IntrospectionResolver` is not used as a "top level" resolver,
352+
// see `fn as_introspection_context`, so this value is irrelevant.
353+
const CACHEABLE: bool = false;
354+
351355
fn prefetch(
352356
&self,
353357
_: &ExecutionContext<Self>,

0 commit comments

Comments
 (0)