Skip to content

Commit f336570

Browse files
committed
graph: Decline to run queries when load exceeds a threshold
1 parent 642ac19 commit f336570

File tree

2 files changed

+210
-4
lines changed

2 files changed

+210
-4
lines changed

docs/environment-variables.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,11 @@ those.
117117
`GRAPH_LOAD_WINDOW_SIZE` seconds exceed a threshold. Measurements within
118118
each window are binned into bins of `GRAPH_LOAD_BIN_SIZE` seconds. The
119119
variables default to 300s and 1s
120+
- `GRAPH_LOAD_THRESHOLD`: If wait times for getting database connections go
121+
above this threshold, throttle queries until the wait times fall below
122+
the threshold. Value is in milliseconds, and defaults to 0 which
123+
turns throttling off and any associated statistics collection off.
124+
- `GRAPH_LOAD_JAIL_THRESHOLD`: When the system is overloaded, any query
125+
that causes more than this fraction of the effort will be rejected for as
126+
long as the process is running (i.e., even after the overload situation
127+
is resolved) Defaults to 0.1

graph/src/data/graphql/effort.rs

Lines changed: 202 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,54 @@
11
//! Utilities to keep moving statistics about queries
22
3+
use rand::{prelude::Rng, thread_rng};
34
use std::collections::HashMap;
5+
use std::env;
6+
use std::str::FromStr;
47
use std::sync::{Arc, RwLock};
58
use std::time::{Duration, Instant};
69

10+
use lazy_static::lazy_static;
11+
712
use crate::components::store::PoolWaitStats;
813
use crate::util::stats::{MovingStats, BIN_SIZE, WINDOW_SIZE};
914

10-
pub struct QueryEffort {
15+
lazy_static! {
16+
static ref LOAD_THRESHOLD: Duration = {
17+
let threshold = env::var("GRAPH_LOAD_THRESHOLD")
18+
.ok()
19+
.map(|s| {
20+
u64::from_str(&s).unwrap_or_else(|_| {
21+
panic!("GRAPH_LOAD_THRESHOLD must be a number, but is `{}`", s)
22+
})
23+
})
24+
.unwrap_or(0);
25+
Duration::from_millis(threshold)
26+
};
27+
28+
static ref JAIL_THRESHOLD: f64 = {
29+
env::var("GRAPH_LOAD_JAIL_THRESHOLD")
30+
.ok()
31+
.map(|s| {
32+
f64::from_str(&s).unwrap_or_else(|_| {
33+
panic!("GRAPH_LOAD_JAIL_THRESHOLD must be a number, but is `{}`", s)
34+
})
35+
})
36+
.unwrap_or(0.1)
37+
};
38+
39+
static ref WAIT_STAT_CHECK_INTERVAL: Duration = Duration::from_secs(1);
40+
41+
static ref ZERO_DURATION: Duration = Duration::from_millis(0);
42+
43+
// Load management can be disabled by setting the threshold to 0. This
44+
// makes sure in particular that we never take any of the locks
45+
// associated with it
46+
static ref LOAD_MANAGEMENT_DISABLED: bool = *LOAD_THRESHOLD == *ZERO_DURATION;
47+
48+
static ref KILL_RATE_UPDATE_INTERVAL: Duration = Duration::from_millis(1000);
49+
}
50+
51+
struct QueryEffort {
1152
inner: Arc<RwLock<QueryEffortInner>>,
1253
}
1354

@@ -39,6 +80,22 @@ impl QueryEffort {
3980
let mut inner = self.inner.write().unwrap();
4081
inner.add(shape_hash, duration);
4182
}
83+
84+
/// Return what we know right now about the effort for the query
85+
/// `shape_hash`, and about the total effort. If we have no measurements
86+
/// at all, return `ZERO_DURATION` as the total effort. If we have no
87+
/// data for the particular query, return `None` as the effort
88+
/// for the query
89+
pub fn current_effort(&self, shape_hash: u64) -> (Option<Duration>, Duration) {
90+
let inner = self.inner.read().unwrap();
91+
let total_effort = inner.total.average().unwrap_or(*ZERO_DURATION);
92+
let query_effort = inner
93+
.effort
94+
.get(&shape_hash)
95+
.map(|stats| stats.average())
96+
.flatten();
97+
(query_effort, total_effort)
98+
}
4299
}
43100

44101
impl QueryEffortInner {
@@ -63,10 +120,35 @@ impl QueryEffortInner {
63120
}
64121
}
65122

123+
// The size of any individual step when we adjust the kill_rate
124+
const KILL_RATE_STEP: f64 = 0.1;
125+
126+
struct KillState {
127+
kill_rate: f64,
128+
last_update: Instant,
129+
}
130+
131+
impl KillState {
132+
fn new() -> Self {
133+
Self {
134+
kill_rate: 0.0,
135+
last_update: Instant::now() - Duration::from_secs(86400),
136+
}
137+
}
138+
}
139+
66140
pub struct LoadManager {
67141
effort: QueryEffort,
68142
store_wait_stats: PoolWaitStats,
143+
/// List of query shapes that have been statically blocked through
144+
/// configuration
69145
blocked_queries: Vec<u64>,
146+
/// List of query shapes that have caused more than `JAIL_THRESHOLD`
147+
/// proportion of the work while the system was overloaded. Currently,
148+
/// there is no way for a query to get out of jail other than
149+
/// restarting the process
150+
jailed_queries: RwLock<Vec<u64>>,
151+
kill_state: RwLock<KillState>,
70152
}
71153

72154
impl LoadManager {
@@ -75,16 +157,132 @@ impl LoadManager {
75157
effort: QueryEffort::default(),
76158
store_wait_stats,
77159
blocked_queries,
160+
jailed_queries: RwLock::new(Vec::new()),
161+
kill_state: RwLock::new(KillState::new()),
78162
}
79163
}
80164

81165
pub fn add_query(&self, shape_hash: u64, duration: Duration) {
82-
self.effort.add(shape_hash, duration);
166+
if !*LOAD_MANAGEMENT_DISABLED {
167+
self.effort.add(shape_hash, duration);
168+
}
83169
}
84170

85171
/// Return `true` if we should decline running the query with this
86-
/// `ShapeHash`
172+
/// `ShapeHash`. This is the heart of reacting to overload situations.
173+
///
174+
/// The decision to decline a query is geared towards mitigating two
175+
/// different ways in which the system comes under high load:
176+
/// 1) A relatively small number of queries causes a large fraction
177+
/// of the overall work that goes into responding to queries. That
178+
/// is usually inadvertent, and the result of a dApp using a new query,
179+
/// or the data for a subgraph changing in a way that makes a query
180+
/// that was previously fast take a long time
181+
/// 2) A large number of queries that by themselves are reasonably fast
182+
/// cause so much work that the system gets bogged down. When none
183+
/// of them by themselves is expensive, it becomes impossible to
184+
/// name a culprit for an overload, and we therefore shed
185+
/// increasing amounts of traffic by declining to run queries
186+
/// in proportion to the work they cause
187+
///
188+
/// Note that any mitigation for (2) is prone to flip-flopping in and
189+
/// out of overload situations, as we will oscillate between being
190+
/// overloaded and not being overloaded, though we'd expect the amount
191+
/// of traffic we shed to settle on something that stays close to the
192+
/// point where we alternate between the two states.
193+
///
194+
/// We detect whether we are in an overloaded situation by looking at
195+
/// the average wait time for connection checkouts. If that exceeds
196+
/// `GRAPH_LOAD_THRESHOLD`, we consider ourselves to be in an overload
197+
/// situation.
198+
///
199+
/// There are several criteria that will lead to us declining to run
200+
/// a query with a certain `ShapeHash`:
201+
/// 1) If the query is one of the configured `blocked_queries`, we will
202+
/// always decline
203+
/// 2) If a query, during an overload situation, causes more than
204+
/// `JAIL_THRESHOLD` fraction of the total query effort, we will
205+
/// refuse to run this query again for the lifetime of the process
206+
/// 3) During an overload situation, we step a `kill_rate` from 0 to 1,
207+
/// roughly in steps of `KILL_RATE_STEP`, though with an eye towards
208+
/// not hitting a `kill_rate` of 1 too soon. We will decline to run
209+
/// queries randomly with a probability of
210+
/// kill_rate * query_effort / total_effort
211+
///
212+
/// If `GRAPH_LOAD_THRESHOLD` is set to 0, we bypass all this logic, and
213+
/// only ever decline to run statically configured queries (1). In that
214+
/// case, we also do not take any locks when asked to update statistics,
215+
/// or to check whether we are overloaded; these operations amount to
216+
/// noops.
87217
pub fn decline(&self, shape_hash: u64) -> bool {
88-
self.blocked_queries.contains(&shape_hash)
218+
if self.blocked_queries.contains(&shape_hash) {
219+
return true;
220+
}
221+
if *LOAD_MANAGEMENT_DISABLED {
222+
return false;
223+
}
224+
225+
if self.jailed_queries.read().unwrap().contains(&shape_hash) {
226+
return true;
227+
}
228+
229+
let overloaded = self.overloaded();
230+
let (kill_rate, last_update) = self.kill_state();
231+
if !overloaded && kill_rate == 0.0 {
232+
return false;
233+
}
234+
235+
let (query_effort, total_effort) = self.effort.current_effort(shape_hash);
236+
// When `total_effort` is `ZERO_DURATION`, we haven't done any work. All are
237+
// welcome
238+
if total_effort == *ZERO_DURATION {
239+
return false;
240+
}
241+
242+
// If `query_effort` is `None`, we haven't seen the query. Since we
243+
// are in an overload situation, we are very suspicious of new things
244+
// and assume the worst. This ensures that even if we only ever see
245+
// new queries, we drop `kill_rate` amount of traffic
246+
let known_query = query_effort.is_some();
247+
let query_effort = query_effort.unwrap_or_else(|| total_effort).as_millis() as f64;
248+
let total_effort = total_effort.as_millis() as f64;
249+
250+
if known_query && query_effort / total_effort > *JAIL_THRESHOLD {
251+
// Any single query that causes at least JAIL_THRESHOLD of the
252+
// effort in an overload situation gets killed
253+
self.jailed_queries.write().unwrap().push(shape_hash);
254+
return true;
255+
}
256+
257+
// Kill random queries in case we have no queries, or not enough queries
258+
// that cause at least 20% of the effort
259+
let kill_rate = self.update_kill_rate(kill_rate, last_update, overloaded);
260+
thread_rng().gen_bool(kill_rate * query_effort / total_effort)
261+
}
262+
263+
fn overloaded(&self) -> bool {
264+
let stats = self.store_wait_stats.read().unwrap();
265+
stats.average_gt(*LOAD_THRESHOLD)
266+
}
267+
268+
fn kill_state(&self) -> (f64, Instant) {
269+
let state = self.kill_state.read().unwrap();
270+
(state.kill_rate, state.last_update)
271+
}
272+
273+
fn update_kill_rate(&self, mut kill_rate: f64, last_update: Instant, overloaded: bool) -> f64 {
274+
let now = Instant::now();
275+
if now.saturating_duration_since(last_update) > *KILL_RATE_UPDATE_INTERVAL {
276+
if overloaded {
277+
kill_rate = kill_rate + KILL_RATE_STEP * (1.0 - kill_rate);
278+
} else {
279+
kill_rate = (kill_rate - KILL_RATE_STEP).max(0.0);
280+
}
281+
// FIXME: Log the new kill_rate
282+
let mut state = self.kill_state.write().unwrap();
283+
state.kill_rate = kill_rate;
284+
state.last_update = now;
285+
}
286+
kill_rate
89287
}
90288
}

0 commit comments

Comments
 (0)