11using System ;
2+ using System . Linq ;
3+ using System . Reactive ;
24using System . Reactive . Linq ;
35using Akavache ;
6+ using GitHub . Caches ;
47
58namespace GitHub . Extensions
69{
@@ -14,6 +17,7 @@ public static class AkavacheExtensions
1417 /// the stale value will be produced first, followed by the fresh value
1518 /// when the fetch func completes.
1619 /// </summary>
20+ /// <param name="blobCache">The cache to retrieve the object from.</param>
1721 /// <param name="key">The key to look up the cache value with.</param>
1822 /// <param name="fetchFunc">The fetch function.</param>
1923 /// <param name="refreshInterval">
@@ -59,6 +63,7 @@ public static IObservable<T> GetAndRefreshObject<T>(
5963 /// the stale value will be produced first, followed by the fresh value
6064 /// when the fetch func completes.
6165 /// </summary>
66+ /// <param name="blobCache">The cache to retrieve the object from.</param>
6267 /// <param name="key">The key to look up the cache value with.</param>
6368 /// <param name="fetchFunc">The fetch function.</param>
6469 /// <param name="refreshInterval">
@@ -133,10 +138,105 @@ static IObservable<byte[]> GetAndFetchLatestBytes(
133138 . RefCount ( ) ;
134139 }
135140
141+ /// <summary>
142+ /// This method attempts to return a series of cached value(s) aggregated by
143+ /// a <paramref name="key"/>. In the case of a
144+ /// cache miss the fetchFunc will be used to provide a fresh value which
145+ /// is then returned. In the case of a cache hit where the cache item is
146+ /// considered stale (but not expired) as determined by <paramref name="refreshInterval"/>
147+ /// the stale values will be produced first, followed by the fresh values
148+ /// when the fetch func completes.
149+ /// </summary>
150+ /// <param name="blobCache">The cache to retrieve the object from.</param>
151+ /// <param name="key">The key to look up the cache value with.</param>
152+ /// <param name="fetchFunc">The fetch function.</param>
153+ /// <param name="removedItemsCallback">The callback that receives items that
154+ /// were a part of the cache but not of the list list of values.</param>
155+ /// <param name="refreshInterval">
156+ /// Cache objects with an age exceeding this value will be treated as stale
157+ /// and the fetch function will be invoked to refresh it.
158+ /// </param>
159+ /// <param name="maxCacheDuration">
160+ /// The maximum age of a cache object before the object is treated as
161+ /// expired and unusable. Cache objects older than this will be treated
162+ /// as a cache miss.
163+ /// </param>
164+ public static IObservable < T > GetAndFetchLatestFromIndex < T > (
165+ this IBlobCache blobCache ,
166+ string key ,
167+ Func < IObservable < T > > fetchFunc ,
168+ Action < T > removedItemsCallback ,
169+ TimeSpan refreshInterval ,
170+ TimeSpan maxCacheDuration )
171+ where T : CacheItem
172+ {
173+ return Observable . Defer ( ( ) =>
174+ {
175+ var absoluteExpiration = blobCache . Scheduler . Now + maxCacheDuration ;
176+
177+ try
178+ {
179+ return blobCache . GetAndFetchLatestFromIndex (
180+ key ,
181+ fetchFunc ,
182+ removedItemsCallback ,
183+ createdAt => IsExpired ( blobCache , createdAt , refreshInterval ) ,
184+ absoluteExpiration ) ;
185+ }
186+ catch ( Exception exc )
187+ {
188+ return Observable . Throw < T > ( exc ) ;
189+ }
190+ } ) ;
191+ }
192+
193+ static IObservable < T > GetAndFetchLatestFromIndex < T > ( this IBlobCache This ,
194+ string key ,
195+ Func < IObservable < T > > fetchFunc ,
196+ Action < T > removedItemsCallback ,
197+ Func < DateTimeOffset , bool > fetchPredicate = null ,
198+ DateTimeOffset ? absoluteExpiration = null ,
199+ bool shouldInvalidateOnError = false )
200+ where T : CacheItem
201+ {
202+ var fetch = Observable . Defer ( ( ) => This . GetOrCreateObject ( key , ( ) => CacheIndex . Create ( key ) )
203+ . Select ( x => Tuple . Create ( x , fetchPredicate == null || ! x . Keys . Any ( ) || fetchPredicate ( x . UpdatedAt ) ) )
204+ . Where ( predicateIsTrue => predicateIsTrue . Item2 )
205+ . Select ( x => x . Item1 )
206+ . SelectMany ( index => index . Clear ( This , key , absoluteExpiration ) )
207+ . SelectMany ( index =>
208+ {
209+ var fetchObs = fetchFunc ( ) . Catch < T , Exception > ( ex =>
210+ {
211+ var shouldInvalidate = shouldInvalidateOnError ?
212+ This . InvalidateObject < CacheIndex > ( key ) :
213+ Observable . Return ( Unit . Default ) ;
214+ return shouldInvalidate . SelectMany ( __ => Observable . Throw < T > ( ex ) ) ;
215+ } ) ;
216+
217+ return fetchObs
218+ . SelectMany ( x => x . Save < T > ( This , key , absoluteExpiration ) )
219+ . Do ( x => index . AddAndSave ( This , key , x , absoluteExpiration ) )
220+ . Finally ( ( ) =>
221+ {
222+ This . GetObjects < T > ( index . OldKeys . Except ( index . Keys ) )
223+ . Do ( dict => This . InvalidateObjects < T > ( dict . Keys ) )
224+ . SelectMany ( dict => dict . Values )
225+ . Do ( removedItemsCallback )
226+ . Subscribe ( ) ;
227+ } ) ;
228+ } ) ) ;
229+
230+ var cache = Observable . Defer ( ( ) => This . GetOrCreateObject ( key , ( ) => CacheIndex . Create ( key ) )
231+ . SelectMany ( index => This . GetObjects < T > ( index . Keys ) )
232+ . SelectMany ( dict => dict . Values ) ) ;
233+
234+ return cache . Merge ( fetch ) . Replay ( ) . RefCount ( ) ;
235+ }
236+
136237 static bool IsExpired ( IBlobCache blobCache , DateTimeOffset itemCreatedAt , TimeSpan cacheDuration )
137238 {
138- var now = blobCache . Scheduler . Now ;
139- var elapsed = now - itemCreatedAt ;
239+ var elapsed = blobCache . Scheduler . Now - itemCreatedAt . ToUniversalTime ( ) ;
140240
141241 return elapsed > cacheDuration ;
142242 }
0 commit comments