@@ -16,7 +16,10 @@ use common::{
1616 CoDelQueueSender ,
1717 } ,
1818 errors:: recapture_stacktrace_noreport,
19- runtime:: Runtime ,
19+ runtime:: {
20+ Runtime ,
21+ RuntimeInstant ,
22+ } ,
2023} ;
2124use futures:: {
2225 future:: BoxFuture ,
@@ -28,6 +31,7 @@ use parking_lot::Mutex;
2831use crate :: metrics:: {
2932 async_lru_compute_timer,
3033 async_lru_get_timer,
34+ async_lru_log_eviction,
3135 log_async_lru_cache_hit,
3236 log_async_lru_cache_miss,
3337 log_async_lru_cache_waiting,
@@ -78,19 +82,20 @@ impl<RT: Runtime, Key, Value> Clone for AsyncLru<RT, Key, Value> {
7882 }
7983 }
8084}
81- enum CacheResult < Value > {
85+ enum CacheResult < Value , RT : Runtime > {
8286 Ready {
8387 value : Arc < Value > ,
8488 // Memoize the size to guard against implementations of `SizedValue`
8589 // that (unexpectedly) change while the value is in the cache.
8690 size : u64 ,
91+ added : RT :: Instant ,
8792 } ,
8893 Waiting {
8994 receiver : BroadcastReceiver < Result < Arc < Value > , Arc < anyhow:: Error > > > ,
9095 } ,
9196}
9297
93- impl < Value : SizedValue > SizedValue for CacheResult < Value > {
98+ impl < Value : SizedValue , RT : Runtime > SizedValue for CacheResult < Value , RT > {
9499 fn size ( & self ) -> u64 {
95100 match self {
96101 CacheResult :: Ready { size, .. } => * size,
@@ -100,7 +105,7 @@ impl<Value: SizedValue> SizedValue for CacheResult<Value> {
100105}
101106
102107struct Inner < RT : Runtime , Key , Value > {
103- cache : LruCache < Key , CacheResult < Value > > ,
108+ cache : LruCache < Key , CacheResult < Value , RT > > ,
104109 current_size : u64 ,
105110 max_size : u64 ,
106111 label : & ' static str ,
@@ -109,7 +114,7 @@ struct Inner<RT: Runtime, Key, Value> {
109114
110115impl < RT : Runtime , Key , Value > Inner < RT , Key , Value > {
111116 fn new (
112- cache : LruCache < Key , CacheResult < Value > > ,
117+ cache : LruCache < Key , CacheResult < Value , RT > > ,
113118 max_size : u64 ,
114119 label : & ' static str ,
115120 tx : CoDelQueueSender < RT , BuildValueRequest < Key , Value > > ,
@@ -194,7 +199,7 @@ impl<
194199
195200 fn _new (
196201 rt : RT ,
197- cache : LruCache < Key , CacheResult < Value > > ,
202+ cache : LruCache < Key , CacheResult < Value , RT > > ,
198203 max_size : u64 ,
199204 concurrency : usize ,
200205 label : & ' static str ,
@@ -204,7 +209,7 @@ impl<
204209 let inner = Inner :: new ( cache, max_size, label, tx) ;
205210 let handle = rt. spawn (
206211 label,
207- Self :: value_generating_worker_thread ( rx, inner. clone ( ) , concurrency) ,
212+ Self :: value_generating_worker_thread ( rt . clone ( ) , rx, inner. clone ( ) , concurrency) ,
208213 ) ;
209214 Self {
210215 inner,
@@ -226,6 +231,7 @@ impl<
226231 }
227232
228233 fn update_value (
234+ rt : RT ,
229235 inner : Arc < Mutex < Inner < RT , Key , Value > > > ,
230236 key : Key ,
231237 value : anyhow:: Result < Value > ,
@@ -237,6 +243,7 @@ impl<
237243 let new_value = CacheResult :: Ready {
238244 size : result. size ( ) ,
239245 value : result. clone ( ) ,
246+ added : rt. monotonic_now ( ) ,
240247 } ;
241248 inner. current_size += new_value. size ( ) ;
242249 // Ideally we'd not change the LRU order by putting here...
@@ -269,15 +276,21 @@ impl<
269276 . expect ( "Over max size, but no more entries" ) ;
270277 // This isn't catastrophic necessarily, but it may lead to
271278 // under / over counting of the cache's size.
272- if let CacheResult :: Ready { ref value, size } = evicted
273- && size != value. size ( )
279+ if let CacheResult :: Ready {
280+ ref value,
281+ size,
282+ ref added,
283+ } = evicted
274284 {
275- tracing:: warn!(
276- "Value changed size from {} to {} while in the {} cache!" ,
277- size,
278- value. size( ) ,
279- inner. label
280- )
285+ if size != value. size ( ) {
286+ tracing:: warn!(
287+ "Value changed size from {} to {} while in the {} cache!" ,
288+ size,
289+ value. size( ) ,
290+ inner. label
291+ )
292+ }
293+ async_lru_log_eviction ( inner. label , added. elapsed ( ) ) ;
281294 }
282295 inner. current_size -= evicted. size ( ) ;
283296 }
@@ -380,12 +393,14 @@ impl<
380393 }
381394
382395 async fn value_generating_worker_thread (
396+ rt : RT ,
383397 rx : CoDelQueueReceiver < RT , BuildValueRequest < Key , Value > > ,
384398 inner : Arc < Mutex < Inner < RT , Key , Value > > > ,
385399 concurrency : usize ,
386400 ) {
387401 rx. for_each_concurrent ( concurrency, |( ( key, generator, tx) , expired) | {
388402 let inner = inner. clone ( ) ;
403+ let rt = rt. clone ( ) ;
389404 async move {
390405 if let Some ( expired) = expired {
391406 Self :: drop_waiting ( inner, & key) ;
@@ -395,7 +410,7 @@ impl<
395410
396411 let value = generator. await ;
397412
398- let to_broadcast = Self :: update_value ( inner, key, value) . map_err ( Arc :: new) ;
413+ let to_broadcast = Self :: update_value ( rt , inner, key, value) . map_err ( Arc :: new) ;
399414 let _ = tx. broadcast ( to_broadcast) . await ;
400415 }
401416 } )
0 commit comments