11use anyhow:: Result ;
22use serde:: { Deserialize , Serialize } ;
3-
4- use base64:: prelude:: * ;
53use std:: {
64 collections:: HashMap ,
5+ future:: Future ,
76 sync:: { Arc , Mutex } ,
87} ;
98
10- use crate :: base:: { schema, value} ;
11-
12- #[ derive( Debug , Clone , PartialEq , Eq , Hash ) ]
13- pub struct CacheKey ( Vec < u8 > ) ;
14-
15- impl Serialize for CacheKey {
16- fn serialize < S > ( & self , serializer : S ) -> Result < S :: Ok , S :: Error >
17- where
18- S : serde:: Serializer ,
19- {
20- serializer. serialize_str ( & BASE64_STANDARD . encode ( & self . 0 ) )
21- }
22- }
9+ use crate :: {
10+ base:: { schema, value} ,
11+ service:: error:: { SharedError , SharedResultExt } ,
12+ utils:: fingerprint:: Fingerprint ,
13+ } ;
2314
24- impl < ' de > Deserialize < ' de > for CacheKey {
25- fn deserialize < D > ( deserializer : D ) -> Result < Self , D :: Error >
26- where
27- D : serde:: Deserializer < ' de > ,
28- {
29- let s = String :: deserialize ( deserializer) ?;
30- let bytes = BASE64_STANDARD
31- . decode ( s)
32- . map_err ( serde:: de:: Error :: custom) ?;
33- Ok ( CacheKey ( bytes) )
34- }
15+ #[ derive( Debug , Clone , Serialize , Deserialize ) ]
16+ pub struct CacheEntry {
17+ time_sec : i64 ,
18+ value : serde_json:: Value ,
3519}
36-
3720#[ derive( Debug , Clone , Serialize , Deserialize ) ]
3821pub struct MemoizationInfo {
39- pub cache : HashMap < CacheKey , serde_json :: Value > ,
22+ pub cache : HashMap < Fingerprint , CacheEntry > ,
4023}
4124
4225impl Default for MemoizationInfo {
@@ -47,66 +30,132 @@ impl Default for MemoizationInfo {
4730 }
4831}
4932
50- enum EvaluationCacheEntry {
33+ struct EvaluationCacheEntry {
34+ time : chrono:: DateTime < chrono:: Utc > ,
35+ data : EvaluationCacheData ,
36+ }
37+
38+ enum EvaluationCacheData {
5139 /// Existing entry in previous runs, but not in current run yet.
5240 Previous ( serde_json:: Value ) ,
5341 /// Value appeared in current run.
54- Current ( Arc < async_lock:: OnceCell < value:: Value > > ) ,
42+ Current ( Arc < async_lock:: OnceCell < Result < value:: Value , SharedError > > > ) ,
5543}
5644
57- #[ derive( Default ) ]
5845pub struct EvaluationCache {
59- cache : Mutex < HashMap < CacheKey , EvaluationCacheEntry > > ,
46+ current_time : chrono:: DateTime < chrono:: Utc > ,
47+ cache : Mutex < HashMap < Fingerprint , EvaluationCacheEntry > > ,
6048}
6149
6250impl EvaluationCache {
63- pub fn from_stored ( cache : HashMap < CacheKey , serde_json:: Value > ) -> Self {
51+ pub fn new (
52+ current_time : chrono:: DateTime < chrono:: Utc > ,
53+ existing_cache : Option < HashMap < Fingerprint , CacheEntry > > ,
54+ ) -> Self {
6455 Self {
56+ current_time,
6557 cache : Mutex :: new (
66- cache
58+ existing_cache
6759 . into_iter ( )
68- . map ( |( k, v) | ( k, EvaluationCacheEntry :: Previous ( v) ) )
60+ . map ( |e| e. into_iter ( ) )
61+ . flatten ( )
62+ . map ( |( k, e) | {
63+ (
64+ k,
65+ EvaluationCacheEntry {
66+ time : chrono:: DateTime :: from_timestamp ( e. time_sec , 0 )
67+ . unwrap_or ( chrono:: DateTime :: < chrono:: Utc > :: MIN_UTC ) ,
68+ data : EvaluationCacheData :: Previous ( e. value ) ,
69+ } ,
70+ )
71+ } )
6972 . collect ( ) ,
7073 ) ,
7174 }
7275 }
7376
74- pub fn into_stored ( self ) -> Result < HashMap < CacheKey , serde_json :: Value > > {
77+ pub fn into_stored ( self ) -> Result < HashMap < Fingerprint , CacheEntry > > {
7578 Ok ( self
7679 . cache
7780 . into_inner ( ) ?
7881 . into_iter ( )
79- . filter_map ( |( k, v) | match v {
80- EvaluationCacheEntry :: Previous ( _) => None ,
81- EvaluationCacheEntry :: Current ( entry) => {
82- entry. get ( ) . map ( |v| Ok ( ( k, serde_json:: to_value ( v) ?) ) )
83- }
82+ . filter_map ( |( k, e) | match e. data {
83+ EvaluationCacheData :: Previous ( _) => None ,
84+ EvaluationCacheData :: Current ( entry) => match entry. get ( ) {
85+ Some ( Ok ( v) ) => Some ( serde_json:: to_value ( v) . map ( |value| {
86+ (
87+ k,
88+ CacheEntry {
89+ time_sec : e. time . timestamp ( ) ,
90+ value,
91+ } ,
92+ )
93+ } ) ) ,
94+ _ => None ,
95+ } ,
8496 } )
85- . collect :: < Result < _ > > ( ) ?)
97+ . collect :: < Result < _ , _ > > ( ) ?)
8698 }
8799
88100 pub fn get (
89101 & self ,
90- key : CacheKey ,
102+ key : Fingerprint ,
91103 typ : & schema:: ValueType ,
92- ) -> Result < Arc < async_lock:: OnceCell < value:: Value > > > {
104+ ttl : Option < chrono:: Duration > ,
105+ ) -> Result < Arc < async_lock:: OnceCell < Result < value:: Value , SharedError > > > > {
93106 let mut cache = self . cache . lock ( ) . unwrap ( ) ;
94- let result = match cache. entry ( key) {
95- std:: collections:: hash_map:: Entry :: Occupied ( mut entry) => match & mut entry. get_mut ( ) {
96- EvaluationCacheEntry :: Previous ( value) => {
97- let value = value:: Value :: from_json ( std:: mem:: take ( value) , typ) ?;
98- let cell = Arc :: new ( async_lock:: OnceCell :: from ( value) ) ;
99- entry. insert ( EvaluationCacheEntry :: Current ( cell. clone ( ) ) ) ;
107+ let result = {
108+ match cache. entry ( key) {
109+ std:: collections:: hash_map:: Entry :: Occupied ( mut entry)
110+ if !ttl
111+ . map ( |ttl| entry. get ( ) . time + ttl < self . current_time )
112+ . unwrap_or ( false ) =>
113+ {
114+ let entry_mut = & mut entry. get_mut ( ) ;
115+ match & mut entry_mut. data {
116+ EvaluationCacheData :: Previous ( value) => {
117+ let value = value:: Value :: from_json ( std:: mem:: take ( value) , typ) ?;
118+ let cell = Arc :: new ( async_lock:: OnceCell :: from ( Ok ( value) ) ) ;
119+ let time = entry_mut. time ;
120+ entry. insert ( EvaluationCacheEntry {
121+ time,
122+ data : EvaluationCacheData :: Current ( cell. clone ( ) ) ,
123+ } ) ;
124+ cell
125+ }
126+ EvaluationCacheData :: Current ( cell) => cell. clone ( ) ,
127+ }
128+ }
129+ entry => {
130+ let cell = Arc :: new ( async_lock:: OnceCell :: new ( ) ) ;
131+ entry. insert_entry ( EvaluationCacheEntry {
132+ time : self . current_time ,
133+ data : EvaluationCacheData :: Current ( cell. clone ( ) ) ,
134+ } ) ;
100135 cell
101136 }
102- EvaluationCacheEntry :: Current ( cell) => cell. clone ( ) ,
103- } ,
104- std:: collections:: hash_map:: Entry :: Vacant ( entry) => {
105- let cell = Arc :: new ( async_lock:: OnceCell :: new ( ) ) ;
106- entry. insert ( EvaluationCacheEntry :: Current ( cell. clone ( ) ) ) ;
107- cell
108137 }
109138 } ;
110139 Ok ( result)
111140 }
141+
142+ pub async fn evaluate < Fut > (
143+ & self ,
144+ key : Fingerprint ,
145+ typ : & schema:: ValueType ,
146+ ttl : Option < chrono:: Duration > ,
147+ compute : impl FnOnce ( ) -> Fut ,
148+ ) -> Result < value:: Value >
149+ where
150+ Fut : Future < Output = Result < value:: Value > > ,
151+ {
152+ let cell = self . get ( key, typ, ttl) ?;
153+ let result = cell
154+ . get_or_init ( || {
155+ let fut = compute ( ) ;
156+ async move { fut. await . map_err ( SharedError :: new) }
157+ } )
158+ . await ;
159+ Ok ( result. clone ( ) . std_result ( ) ?)
160+ }
112161}
0 commit comments