1
1
use ic_base_types:: { CanisterId , NumBytes } ;
2
2
use ic_error_types:: UserError ;
3
+ use ic_metrics:: MetricsRegistry ;
3
4
use ic_replicated_state:: ReplicatedState ;
4
5
use ic_types:: { ingress:: WasmResult , messages:: UserQuery , CountBytes , Cycles , Time , UserId } ;
5
6
use ic_utils_lru_cache:: LruCache ;
6
- use std:: {
7
- mem:: size_of_val,
8
- sync:: { LockResult , Mutex , MutexGuard } ,
9
- } ;
7
+ use prometheus:: { Histogram , IntCounter , IntGauge } ;
8
+ use std:: { mem:: size_of_val, sync:: Mutex } ;
10
9
10
+ use crate :: metrics:: duration_histogram;
11
+
12
+ ////////////////////////////////////////////////////////////////////////
13
+ /// Query Cache metrics.
14
+ pub ( crate ) struct QueryCacheMetrics {
15
+ pub hits : IntCounter ,
16
+ pub misses : IntCounter ,
17
+ pub evicted_entries : IntCounter ,
18
+ pub evicted_entries_duration : Histogram ,
19
+ pub invalidated_entries : IntCounter ,
20
+ pub invalidated_entries_by_time : IntCounter ,
21
+ pub invalidated_entries_by_canister_version : IntCounter ,
22
+ pub invalidated_entries_by_canister_balance : IntCounter ,
23
+ pub invalidated_entries_duration : Histogram ,
24
+ pub count_bytes : IntGauge ,
25
+ pub len : IntGauge ,
26
+ }
27
+
28
+ impl QueryCacheMetrics {
29
+ fn new ( metrics_registry : & MetricsRegistry ) -> Self {
30
+ Self {
31
+ hits : metrics_registry. int_counter (
32
+ "execution_query_cache_hits_total" ,
33
+ "The total number of replica side query cache hits" ,
34
+ ) ,
35
+ misses : metrics_registry. int_counter (
36
+ "execution_query_cache_misses_total" ,
37
+ "The total number of replica side query cache misses" ,
38
+ ) ,
39
+ evicted_entries : metrics_registry. int_counter (
40
+ "execution_query_cache_evicted_entries_total" ,
41
+ "The total number of evicted entries in the replica side query cache" ,
42
+ ) ,
43
+ evicted_entries_duration : duration_histogram (
44
+ "execution_query_cache_evicted_entries_duration_seconds" ,
45
+ "The duration of evicted cache entries in seconds" ,
46
+ metrics_registry,
47
+ ) ,
48
+ invalidated_entries : metrics_registry. int_counter (
49
+ "execution_query_cache_invalidated_entries_total" ,
50
+ "The total number of invalidated entries in the replica side query cache" ,
51
+ ) ,
52
+ invalidated_entries_by_time : metrics_registry. int_counter (
53
+ "execution_query_cache_invalidated_entries_by_time_total" ,
54
+ "The total number of invalidated entries due to the changed time" ,
55
+ ) ,
56
+ invalidated_entries_by_canister_version : metrics_registry. int_counter (
57
+ "execution_query_cache_invalidated_entries_by_canister_version_total" ,
58
+ "The total number of invalidated entries due to the changed canister version" ,
59
+ ) ,
60
+ invalidated_entries_by_canister_balance : metrics_registry. int_counter (
61
+ "execution_query_cache_invalidated_entries_by_canister_balance_total" ,
62
+ "The total number of invalidated entries due to the changed canister balance" ,
63
+ ) ,
64
+ invalidated_entries_duration : duration_histogram (
65
+ "execution_query_cache_invalidated_entries_duration_seconds" ,
66
+ "The duration of invalidated cache entries in seconds" ,
67
+ metrics_registry,
68
+ ) ,
69
+ count_bytes : metrics_registry. int_gauge (
70
+ "execution_query_cache_count_bytes" ,
71
+ "The current replica side query cache size in bytes" ,
72
+ ) ,
73
+ len : metrics_registry. int_gauge (
74
+ "execution_query_cache_len" ,
75
+ "The current replica side query cache len in elements" ,
76
+ ) ,
77
+ }
78
+ }
79
+ }
80
+
81
+ ////////////////////////////////////////////////////////////////////////
11
82
/// Query Cache entry key.
12
83
///
13
84
/// The key is to distinguish query cache entries, i.e. entries with different
@@ -41,10 +112,12 @@ impl From<&UserQuery> for EntryKey {
41
112
}
42
113
}
43
114
115
+ ////////////////////////////////////////////////////////////////////////
44
116
/// Query Cache entry environment metadata.
45
117
///
46
118
/// The structure captures the environment metadata. The cache entry is valid
47
119
/// only when its environment metadata matches the current state environment.
120
+ #[ derive( PartialEq ) ]
48
121
pub ( crate ) struct EntryEnv {
49
122
/// The Consensus-determined time when the cache entry was created.
50
123
pub batch_time : Time ,
@@ -73,6 +146,7 @@ impl TryFrom<(&EntryKey, &ReplicatedState)> for EntryEnv {
73
146
}
74
147
}
75
148
149
+ ////////////////////////////////////////////////////////////////////////
76
150
/// Query Cache entry value.
77
151
pub ( crate ) struct EntryValue {
78
152
env : EntryEnv ,
@@ -90,22 +164,39 @@ impl EntryValue {
90
164
Self { env, result }
91
165
}
92
166
93
- pub ( crate ) fn is_valid ( & self , env : & EntryEnv ) -> bool {
167
+ fn is_valid ( & self , env : & EntryEnv ) -> bool {
168
+ self . env == * env
169
+ }
170
+
171
+ fn is_valid_time ( & self , env : & EntryEnv ) -> bool {
94
172
self . env . batch_time == env. batch_time
95
- && self . env . canister_version == env. canister_version
96
- && self . env . canister_balance == env. canister_balance
97
173
}
98
174
99
- pub ( crate ) fn result ( & self ) -> Result < WasmResult , UserError > {
175
+ fn is_valid_canister_version ( & self , env : & EntryEnv ) -> bool {
176
+ self . env . canister_version == env. canister_version
177
+ }
178
+
179
+ fn is_valid_canister_balance ( & self , env : & EntryEnv ) -> bool {
180
+ self . env . canister_balance == env. canister_balance
181
+ }
182
+
183
+ fn result ( & self ) -> Result < WasmResult , UserError > {
100
184
self . result . clone ( )
101
185
}
186
+
187
+ fn elapsed_seconds ( & self , now : Time ) -> f64 {
188
+ ( now - self . env . batch_time ) . as_secs_f64 ( )
189
+ }
102
190
}
103
191
104
- /// Replica Query Cache Implementation.
192
+ ////////////////////////////////////////////////////////////////////////
193
+ /// Replica Side Query Cache.
105
194
pub ( crate ) struct QueryCache {
106
195
// We can't use `RwLock`, as the `LruCache::get()` requires mutable reference
107
196
// to update the LRU.
108
197
cache : Mutex < LruCache < EntryKey , EntryValue > > ,
198
+ // Query cache metrics (public for tests)
199
+ pub ( crate ) metrics : QueryCacheMetrics ,
109
200
}
110
201
111
202
impl CountBytes for QueryCache {
@@ -114,22 +205,72 @@ impl CountBytes for QueryCache {
114
205
}
115
206
}
116
207
117
- impl Default for QueryCache {
118
- fn default ( ) -> Self {
208
+ impl QueryCache {
209
+ pub ( crate ) fn new ( metrics_registry : & MetricsRegistry , capacity : NumBytes ) -> Self {
119
210
QueryCache {
120
- cache : Mutex :: new ( LruCache :: unbounded ( ) ) ,
211
+ cache : Mutex :: new ( LruCache :: new ( capacity) ) ,
212
+ metrics : QueryCacheMetrics :: new ( metrics_registry) ,
121
213
}
122
214
}
123
- }
124
215
125
- impl QueryCache {
126
- pub ( crate ) fn new ( capacity : NumBytes ) -> Self {
127
- QueryCache {
128
- cache : Mutex :: new ( LruCache :: new ( capacity) ) ,
216
+ pub ( crate ) fn get_valid_result (
217
+ & self ,
218
+ key : & EntryKey ,
219
+ env : & EntryEnv ,
220
+ ) -> Option < Result < WasmResult , UserError > > {
221
+ let mut cache = self . cache . lock ( ) . unwrap ( ) ;
222
+ let now = env. batch_time ;
223
+
224
+ if let Some ( value) = cache. get ( key) {
225
+ if value. is_valid ( env) {
226
+ let res = value. result ( ) ;
227
+ // Update the metrics.
228
+ self . metrics . hits . inc ( ) ;
229
+ let count_bytes = cache. count_bytes ( ) as i64 ;
230
+ self . metrics . count_bytes . set ( count_bytes) ;
231
+ // The cache entry is valid, return it.
232
+ return Some ( res) ;
233
+ } else {
234
+ // Update the metrics.
235
+ self . metrics . invalidated_entries . inc ( ) ;
236
+ self . metrics
237
+ . invalidated_entries_duration
238
+ . observe ( value. elapsed_seconds ( now) ) ;
239
+ // For the sake of correctness, we need a fall-through logic here.
240
+ if !value. is_valid_time ( env) {
241
+ self . metrics . invalidated_entries_by_time . inc ( ) ;
242
+ }
243
+ if !value. is_valid_canister_version ( env) {
244
+ self . metrics . invalidated_entries_by_canister_version . inc ( ) ;
245
+ }
246
+ if !value. is_valid_canister_balance ( env) {
247
+ self . metrics . invalidated_entries_by_canister_balance . inc ( ) ;
248
+ }
249
+ // The cache entry is no longer valid, remove it.
250
+ cache. pop ( key) ;
251
+ }
129
252
}
253
+ None
130
254
}
131
255
132
- pub ( crate ) fn lock ( & self ) -> LockResult < MutexGuard < LruCache < EntryKey , EntryValue > > > {
133
- self . cache . lock ( )
256
+ pub ( crate ) fn push ( & self , key : EntryKey , value : EntryValue ) -> Vec < ( EntryKey , EntryValue ) > {
257
+ let now = value. env . batch_time ;
258
+ let mut cache = self . cache . lock ( ) . unwrap ( ) ;
259
+ let evicted_entries = cache. push ( key, value) ;
260
+
261
+ // Update the metrics.
262
+ self . metrics
263
+ . evicted_entries
264
+ . inc_by ( evicted_entries. len ( ) as u64 ) ;
265
+ for ( _evited_key, evicted_value) in & evicted_entries {
266
+ let d = evicted_value. elapsed_seconds ( now) ;
267
+ self . metrics . evicted_entries_duration . observe ( d) ;
268
+ }
269
+ self . metrics . misses . inc ( ) ;
270
+ let count_bytes = cache. count_bytes ( ) as i64 ;
271
+ self . metrics . count_bytes . set ( count_bytes) ;
272
+ self . metrics . len . set ( cache. len ( ) as i64 ) ;
273
+
274
+ evicted_entries
134
275
}
135
276
}
0 commit comments