@@ -10,6 +10,7 @@ use std::time::{Duration, Instant};
10
10
use lazy_static:: lazy_static;
11
11
12
12
use crate :: components:: store:: PoolWaitStats ;
13
+ use crate :: prelude:: { info, warn, Logger } ;
13
14
use crate :: util:: stats:: { MovingStats , BIN_SIZE , WINDOW_SIZE } ;
14
15
15
16
lazy_static ! {
@@ -123,21 +124,65 @@ impl QueryEffortInner {
123
124
// The size of any individual step when we adjust the kill_rate
124
125
const KILL_RATE_STEP : f64 = 0.1 ;
125
126
127
+ /// What to log about the state we are currently in
128
+ enum KillStateLogEvent {
129
+ /// Overload is starting right now
130
+ Start ,
131
+ /// Overload has been going on for the duration
132
+ Ongoing ( Duration ) ,
133
+ /// Overload was resolved after duration time
134
+ Resolved ( Duration ) ,
135
+ /// Don't log anything right now
136
+ Skip ,
137
+ }
138
+
126
139
struct KillState {
140
+ // A value between 0 and 1, where 0 means 'respond to all queries'
141
+ // and 1 means 'do not respond to any queries'
127
142
kill_rate : f64 ,
143
+ // We adjust the `kill_rate` at most every `KILL_RATE_UPDATE_INTERVAL`
128
144
last_update : Instant ,
145
+ // When the current overload situation started
146
+ overload_start : Option < Instant > ,
147
+ // Throttle logging while we are overloaded to no more often than
148
+ // once every 30s
149
+ last_overload_log : Instant ,
129
150
}
130
151
131
152
impl KillState {
132
153
fn new ( ) -> Self {
154
+ let long_ago = Duration :: from_secs ( 86400 ) ;
133
155
Self {
134
156
kill_rate : 0.0 ,
135
- last_update : Instant :: now ( ) - Duration :: from_secs ( 86400 ) ,
157
+ last_update : Instant :: now ( ) - long_ago,
158
+ overload_start : None ,
159
+ last_overload_log : Instant :: now ( ) - long_ago,
160
+ }
161
+ }
162
+
163
+ fn log_event ( & mut self , now : Instant , overloaded : bool ) -> KillStateLogEvent {
164
+ use KillStateLogEvent :: * ;
165
+
166
+ if let Some ( overload_start) = self . overload_start {
167
+ if !overloaded {
168
+ self . overload_start = None ;
169
+ Resolved ( overload_start. elapsed ( ) )
170
+ } else if now. saturating_duration_since ( self . last_overload_log )
171
+ > Duration :: from_secs ( 30 )
172
+ {
173
+ self . last_overload_log = now;
174
+ Ongoing ( overload_start. elapsed ( ) )
175
+ } else {
176
+ Skip
177
+ }
178
+ } else {
179
+ Start
136
180
}
137
181
}
138
182
}
139
183
140
184
pub struct LoadManager {
185
+ logger : Logger ,
141
186
effort : QueryEffort ,
142
187
store_wait_stats : PoolWaitStats ,
143
188
/// List of query shapes that have been statically blocked through
@@ -152,8 +197,17 @@ pub struct LoadManager {
152
197
}
153
198
154
199
impl LoadManager {
155
- pub fn new ( store_wait_stats : PoolWaitStats , blocked_queries : HashSet < u64 > ) -> Self {
200
+ pub fn new (
201
+ logger : Logger ,
202
+ store_wait_stats : PoolWaitStats ,
203
+ blocked_queries : HashSet < u64 > ,
204
+ ) -> Self {
205
+ info ! (
206
+ logger,
207
+ "Creating LoadManager with disabled={}" , * LOAD_MANAGEMENT_DISABLED ,
208
+ ) ;
156
209
Self {
210
+ logger,
157
211
effort : QueryEffort :: default ( ) ,
158
212
store_wait_stats,
159
213
blocked_queries,
@@ -226,7 +280,7 @@ impl LoadManager {
226
280
return true ;
227
281
}
228
282
229
- let overloaded = self . overloaded ( ) ;
283
+ let ( overloaded, wait_ms ) = self . overloaded ( ) ;
230
284
let ( kill_rate, last_update) = self . kill_state ( ) ;
231
285
if !overloaded && kill_rate == 0.0 {
232
286
return false ;
@@ -256,32 +310,71 @@ impl LoadManager {
256
310
257
311
// Kill random queries in case we have no queries, or not enough queries
258
312
// that cause at least 20% of the effort
259
- let kill_rate = self . update_kill_rate ( kill_rate, last_update, overloaded) ;
313
+ let kill_rate = self . update_kill_rate ( kill_rate, last_update, overloaded, wait_ms ) ;
260
314
thread_rng ( ) . gen_bool ( kill_rate * query_effort / total_effort)
261
315
}
262
316
263
- fn overloaded ( & self ) -> bool {
317
+ fn overloaded ( & self ) -> ( bool , Duration ) {
264
318
let stats = self . store_wait_stats . read ( ) . unwrap ( ) ;
265
- stats. average_gt ( * LOAD_THRESHOLD )
319
+ let average = stats. average ( ) ;
320
+ let overloaded = average
321
+ . map ( |average| average > * LOAD_THRESHOLD )
322
+ . unwrap_or ( false ) ;
323
+ ( overloaded, average. unwrap_or ( * ZERO_DURATION ) )
266
324
}
267
325
268
326
fn kill_state ( & self ) -> ( f64 , Instant ) {
269
327
let state = self . kill_state . read ( ) . unwrap ( ) ;
270
328
( state. kill_rate , state. last_update )
271
329
}
272
330
273
- fn update_kill_rate ( & self , mut kill_rate : f64 , last_update : Instant , overloaded : bool ) -> f64 {
331
+ fn update_kill_rate (
332
+ & self ,
333
+ mut kill_rate : f64 ,
334
+ last_update : Instant ,
335
+ overloaded : bool ,
336
+ wait_ms : Duration ,
337
+ ) -> f64 {
338
+ assert ! ( overloaded || kill_rate > 0.0 ) ;
339
+
274
340
let now = Instant :: now ( ) ;
275
341
if now. saturating_duration_since ( last_update) > * KILL_RATE_UPDATE_INTERVAL {
342
+ // Update the kill_rate
276
343
if overloaded {
277
344
kill_rate = kill_rate + KILL_RATE_STEP * ( 1.0 - kill_rate) ;
278
345
} else {
279
346
kill_rate = ( kill_rate - KILL_RATE_STEP ) . max ( 0.0 ) ;
280
347
}
281
- // FIXME: Log the new kill_rate
282
- let mut state = self . kill_state . write ( ) . unwrap ( ) ;
283
- state. kill_rate = kill_rate;
284
- state. last_update = now;
348
+ let event = {
349
+ let mut state = self . kill_state . write ( ) . unwrap ( ) ;
350
+ state. kill_rate = kill_rate;
351
+ state. last_update = now;
352
+ state. log_event ( now, overloaded)
353
+ } ;
354
+ // Log information about what's happening after we've released the
355
+ // lock on self.kill_state
356
+ use KillStateLogEvent :: * ;
357
+ match event {
358
+ Resolved ( duration) => {
359
+ info ! ( self . logger, "Query overload resolved" ;
360
+ "duration_ms" => duration. as_millis( ) ,
361
+ "wait_ms" => wait_ms. as_millis( ) ,
362
+ "event" => "resolved" ) ;
363
+ }
364
+ Ongoing ( duration) => {
365
+ info ! ( self . logger, "Query overload still happening" ;
366
+ "duration_ms" => duration. as_millis( ) ,
367
+ "wait_ms" => wait_ms. as_millis( ) ,
368
+ "kill_rate" => kill_rate,
369
+ "event" => "ongoing" ) ;
370
+ }
371
+ Start => {
372
+ warn ! ( self . logger, "Query overload" ;
373
+ "wait_ms" => wait_ms. as_millis( ) ,
374
+ "event" => "start" ) ;
375
+ }
376
+ Skip => { /* do nothing */ }
377
+ }
285
378
}
286
379
kill_rate
287
380
}
0 commit comments