Skip to content

Commit b788fdb

Browse files
committed
RUN-650 Add more query cache metrics
1 parent 54071ce commit b788fdb

File tree

5 files changed

+321
-125
lines changed

5 files changed

+321
-125
lines changed

rs/execution_environment/src/metrics.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@ pub(crate) struct QueryHandlerMetrics {
1515
pub query_initial_call: ScopedMetrics,
1616
pub query_retry_call: ScopedMetrics,
1717
pub query_spawned_calls: ScopedMetrics,
18-
pub query_cache_hits: IntCounter,
19-
pub query_cache_misses: IntCounter,
20-
pub query_cache_evicted_entries: IntCounter,
21-
pub query_cache_invalidated_entries: IntCounter,
22-
pub query_cache_count_bytes: Histogram,
2318
pub query_critical_error: IntCounter,
2419
}
2520

@@ -124,27 +119,6 @@ impl QueryHandlerMetrics {
124119
metrics_registry,
125120
),
126121
},
127-
query_cache_hits: metrics_registry.int_counter(
128-
"execution_query_cache_hits",
129-
"The number of replica side query cache hits",
130-
),
131-
query_cache_misses: metrics_registry.int_counter(
132-
"execution_query_cache_misses",
133-
"The number of replica side query cache misses",
134-
),
135-
query_cache_evicted_entries: metrics_registry.int_counter(
136-
"execution_query_cache_evicted_entries",
137-
"The number of evicted entries in the replica side query cache",
138-
),
139-
query_cache_invalidated_entries: metrics_registry.int_counter(
140-
"execution_query_cache_invalidated_entries",
141-
"The number of invalidated entries in the replica side query cache",
142-
),
143-
query_cache_count_bytes: memory_histogram(
144-
"execution_query_cache_count_bytes",
145-
"The replica side query cache size in bytes",
146-
metrics_registry,
147-
),
148122
query_critical_error: metrics_registry.error_counter(QUERY_HANDLER_CRITICAL_ERROR),
149123
}
150124
}

rs/execution_environment/src/query_handler.rs

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use ic_logger::ReplicaLogger;
2424
use ic_metrics::MetricsRegistry;
2525
use ic_registry_subnet_type::SubnetType;
2626
use ic_replicated_state::ReplicatedState;
27-
use ic_types::CountBytes;
2827
use ic_types::{
2928
ingress::WasmResult,
3029
messages::{
@@ -129,7 +128,7 @@ impl InternalHttpQueryHandler {
129128
metrics: QueryHandlerMetrics::new(metrics_registry),
130129
max_instructions_per_query,
131130
cycles_account_manager,
132-
query_cache: query_cache::QueryCache::new(query_cache_capacity),
131+
query_cache: query_cache::QueryCache::new(metrics_registry, query_cache_capacity),
133132
}
134133
}
135134
}
@@ -153,20 +152,8 @@ impl QueryHandler for InternalHttpQueryHandler {
153152
let key = query_cache::EntryKey::from(&query);
154153
let env = query_cache::EntryEnv::try_from((&key, state.as_ref()))?;
155154

156-
let mut cache = self.query_cache.lock().unwrap();
157-
if let Some(value) = cache.get(&key) {
158-
if value.is_valid(&env) {
159-
let res = value.result();
160-
// The cache entry is valid, return it.
161-
self.metrics.query_cache_hits.inc();
162-
let count_bytes = cache.count_bytes() as f64;
163-
self.metrics.query_cache_count_bytes.observe(count_bytes);
164-
return res;
165-
} else {
166-
// The cache entry is no longer valid, remove it.
167-
cache.pop(&key);
168-
self.metrics.query_cache_invalidated_entries.inc();
169-
}
155+
if let Some(result) = self.query_cache.get_valid_result(&key, &env) {
156+
return result;
170157
}
171158
(Some(key), Some(env))
172159
} else {
@@ -205,17 +192,8 @@ impl QueryHandler for InternalHttpQueryHandler {
205192
// Add the query execution result to the query cache (if the query caching is enabled).
206193
if self.config.query_caching == FlagStatus::Enabled {
207194
if let (Some(key), Some(env)) = (cache_entry_key, cache_entry_env) {
208-
let mut cache = self.query_cache.lock().unwrap();
209-
let evicted_entries =
210-
cache.push(key, query_cache::EntryValue::new(env, result.clone()));
211-
if !evicted_entries.is_empty() {
212-
self.metrics
213-
.query_cache_evicted_entries
214-
.inc_by(evicted_entries.len() as u64);
215-
}
216-
self.metrics.query_cache_misses.inc();
217-
let count_bytes = cache.count_bytes() as f64;
218-
self.metrics.query_cache_count_bytes.observe(count_bytes);
195+
self.query_cache
196+
.push(key, query_cache::EntryValue::new(env, result.clone()));
219197
}
220198
}
221199
result

rs/execution_environment/src/query_handler/query_cache.rs

Lines changed: 160 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,84 @@
11
use ic_base_types::{CanisterId, NumBytes};
22
use ic_error_types::UserError;
3+
use ic_metrics::MetricsRegistry;
34
use ic_replicated_state::ReplicatedState;
45
use ic_types::{ingress::WasmResult, messages::UserQuery, CountBytes, Cycles, Time, UserId};
56
use ic_utils_lru_cache::LruCache;
6-
use std::{
7-
mem::size_of_val,
8-
sync::{LockResult, Mutex, MutexGuard},
9-
};
7+
use prometheus::{Histogram, IntCounter, IntGauge};
8+
use std::{mem::size_of_val, sync::Mutex};
109

10+
use crate::metrics::duration_histogram;
11+
12+
////////////////////////////////////////////////////////////////////////
13+
/// Query Cache metrics.
14+
pub(crate) struct QueryCacheMetrics {
15+
pub hits: IntCounter,
16+
pub misses: IntCounter,
17+
pub evicted_entries: IntCounter,
18+
pub evicted_entries_duration: Histogram,
19+
pub invalidated_entries: IntCounter,
20+
pub invalidated_entries_by_time: IntCounter,
21+
pub invalidated_entries_by_canister_version: IntCounter,
22+
pub invalidated_entries_by_canister_balance: IntCounter,
23+
pub invalidated_entries_duration: Histogram,
24+
pub count_bytes: IntGauge,
25+
pub len: IntGauge,
26+
}
27+
28+
impl QueryCacheMetrics {
29+
fn new(metrics_registry: &MetricsRegistry) -> Self {
30+
Self {
31+
hits: metrics_registry.int_counter(
32+
"execution_query_cache_hits_total",
33+
"The total number of replica side query cache hits",
34+
),
35+
misses: metrics_registry.int_counter(
36+
"execution_query_cache_misses_total",
37+
"The total number of replica side query cache misses",
38+
),
39+
evicted_entries: metrics_registry.int_counter(
40+
"execution_query_cache_evicted_entries_total",
41+
"The total number of evicted entries in the replica side query cache",
42+
),
43+
evicted_entries_duration: duration_histogram(
44+
"execution_query_cache_evicted_entries_duration_seconds",
45+
"The duration of evicted cache entries in seconds",
46+
metrics_registry,
47+
),
48+
invalidated_entries: metrics_registry.int_counter(
49+
"execution_query_cache_invalidated_entries_total",
50+
"The total number of invalidated entries in the replica side query cache",
51+
),
52+
invalidated_entries_by_time: metrics_registry.int_counter(
53+
"execution_query_cache_invalidated_entries_by_time_total",
54+
"The total number of invalidated entries due to the changed time",
55+
),
56+
invalidated_entries_by_canister_version: metrics_registry.int_counter(
57+
"execution_query_cache_invalidated_entries_by_canister_version_total",
58+
"The total number of invalidated entries due to the changed canister version",
59+
),
60+
invalidated_entries_by_canister_balance: metrics_registry.int_counter(
61+
"execution_query_cache_invalidated_entries_by_canister_balance_total",
62+
"The total number of invalidated entries due to the changed canister balance",
63+
),
64+
invalidated_entries_duration: duration_histogram(
65+
"execution_query_cache_invalidated_entries_duration_seconds",
66+
"The duration of invalidated cache entries in seconds",
67+
metrics_registry,
68+
),
69+
count_bytes: metrics_registry.int_gauge(
70+
"execution_query_cache_count_bytes",
71+
"The current replica side query cache size in bytes",
72+
),
73+
len: metrics_registry.int_gauge(
74+
"execution_query_cache_len",
75+
"The current replica side query cache len in elements",
76+
),
77+
}
78+
}
79+
}
80+
81+
////////////////////////////////////////////////////////////////////////
1182
/// Query Cache entry key.
1283
///
1384
/// The key is to distinguish query cache entries, i.e. entries with different
@@ -41,10 +112,12 @@ impl From<&UserQuery> for EntryKey {
41112
}
42113
}
43114

115+
////////////////////////////////////////////////////////////////////////
44116
/// Query Cache entry environment metadata.
45117
///
46118
/// The structure captures the environment metadata. The cache entry is valid
47119
/// only when its environment metadata matches the current state environment.
120+
#[derive(PartialEq)]
48121
pub(crate) struct EntryEnv {
49122
/// The Consensus-determined time when the cache entry was created.
50123
pub batch_time: Time,
@@ -73,6 +146,7 @@ impl TryFrom<(&EntryKey, &ReplicatedState)> for EntryEnv {
73146
}
74147
}
75148

149+
////////////////////////////////////////////////////////////////////////
76150
/// Query Cache entry value.
77151
pub(crate) struct EntryValue {
78152
env: EntryEnv,
@@ -90,22 +164,39 @@ impl EntryValue {
90164
Self { env, result }
91165
}
92166

93-
pub(crate) fn is_valid(&self, env: &EntryEnv) -> bool {
167+
fn is_valid(&self, env: &EntryEnv) -> bool {
168+
self.env == *env
169+
}
170+
171+
fn is_valid_time(&self, env: &EntryEnv) -> bool {
94172
self.env.batch_time == env.batch_time
95-
&& self.env.canister_version == env.canister_version
96-
&& self.env.canister_balance == env.canister_balance
97173
}
98174

99-
pub(crate) fn result(&self) -> Result<WasmResult, UserError> {
175+
fn is_valid_canister_version(&self, env: &EntryEnv) -> bool {
176+
self.env.canister_version == env.canister_version
177+
}
178+
179+
fn is_valid_canister_balance(&self, env: &EntryEnv) -> bool {
180+
self.env.canister_balance == env.canister_balance
181+
}
182+
183+
fn result(&self) -> Result<WasmResult, UserError> {
100184
self.result.clone()
101185
}
186+
187+
fn elapsed_seconds(&self, now: Time) -> f64 {
188+
(now - self.env.batch_time).as_secs_f64()
189+
}
102190
}
103191

104-
/// Replica Query Cache Implementation.
192+
////////////////////////////////////////////////////////////////////////
193+
/// Replica Side Query Cache.
105194
pub(crate) struct QueryCache {
106195
// We can't use `RwLock`, as the `LruCache::get()` requires mutable reference
107196
// to update the LRU.
108197
cache: Mutex<LruCache<EntryKey, EntryValue>>,
198+
// Query cache metrics (public for tests)
199+
pub(crate) metrics: QueryCacheMetrics,
109200
}
110201

111202
impl CountBytes for QueryCache {
@@ -114,22 +205,72 @@ impl CountBytes for QueryCache {
114205
}
115206
}
116207

117-
impl Default for QueryCache {
118-
fn default() -> Self {
208+
impl QueryCache {
209+
pub(crate) fn new(metrics_registry: &MetricsRegistry, capacity: NumBytes) -> Self {
119210
QueryCache {
120-
cache: Mutex::new(LruCache::unbounded()),
211+
cache: Mutex::new(LruCache::new(capacity)),
212+
metrics: QueryCacheMetrics::new(metrics_registry),
121213
}
122214
}
123-
}
124215

125-
impl QueryCache {
126-
pub(crate) fn new(capacity: NumBytes) -> Self {
127-
QueryCache {
128-
cache: Mutex::new(LruCache::new(capacity)),
216+
pub(crate) fn get_valid_result(
217+
&self,
218+
key: &EntryKey,
219+
env: &EntryEnv,
220+
) -> Option<Result<WasmResult, UserError>> {
221+
let mut cache = self.cache.lock().unwrap();
222+
let now = env.batch_time;
223+
224+
if let Some(value) = cache.get(key) {
225+
if value.is_valid(env) {
226+
let res = value.result();
227+
// Update the metrics.
228+
self.metrics.hits.inc();
229+
let count_bytes = cache.count_bytes() as i64;
230+
self.metrics.count_bytes.set(count_bytes);
231+
// The cache entry is valid, return it.
232+
return Some(res);
233+
} else {
234+
// Update the metrics.
235+
self.metrics.invalidated_entries.inc();
236+
self.metrics
237+
.invalidated_entries_duration
238+
.observe(value.elapsed_seconds(now));
239+
// For the sake of correctness, we need a fall-through logic here.
240+
if !value.is_valid_time(env) {
241+
self.metrics.invalidated_entries_by_time.inc();
242+
}
243+
if !value.is_valid_canister_version(env) {
244+
self.metrics.invalidated_entries_by_canister_version.inc();
245+
}
246+
if !value.is_valid_canister_balance(env) {
247+
self.metrics.invalidated_entries_by_canister_balance.inc();
248+
}
249+
// The cache entry is no longer valid, remove it.
250+
cache.pop(key);
251+
}
129252
}
253+
None
130254
}
131255

132-
pub(crate) fn lock(&self) -> LockResult<MutexGuard<LruCache<EntryKey, EntryValue>>> {
133-
self.cache.lock()
256+
pub(crate) fn push(&self, key: EntryKey, value: EntryValue) -> Vec<(EntryKey, EntryValue)> {
257+
let now = value.env.batch_time;
258+
let mut cache = self.cache.lock().unwrap();
259+
let evicted_entries = cache.push(key, value);
260+
261+
// Update the metrics.
262+
self.metrics
263+
.evicted_entries
264+
.inc_by(evicted_entries.len() as u64);
265+
for (_evited_key, evicted_value) in &evicted_entries {
266+
let d = evicted_value.elapsed_seconds(now);
267+
self.metrics.evicted_entries_duration.observe(d);
268+
}
269+
self.metrics.misses.inc();
270+
let count_bytes = cache.count_bytes() as i64;
271+
self.metrics.count_bytes.set(count_bytes);
272+
self.metrics.len.set(cache.len() as i64);
273+
274+
evicted_entries
134275
}
135276
}

0 commit comments

Comments
 (0)