|
4 | 4 | using System.Reactive.Linq; |
5 | 5 | using Akavache; |
6 | 6 | using GitHub.Caches; |
| 7 | +using System.Threading.Tasks; |
7 | 8 |
|
8 | 9 | namespace GitHub.Extensions |
9 | 10 | { |
@@ -165,7 +166,7 @@ public static IObservable<T> GetAndFetchLatestFromIndex<T>( |
165 | 166 | this IBlobCache blobCache, |
166 | 167 | string key, |
167 | 168 | Func<IObservable<T>> fetchFunc, |
168 | | - Action<T> removedItemsCallback, |
| 169 | + Action<string> removedItemsCallback, |
169 | 170 | TimeSpan refreshInterval, |
170 | 171 | TimeSpan maxCacheDuration) |
171 | 172 | where T : CacheItem |
@@ -193,45 +194,54 @@ public static IObservable<T> GetAndFetchLatestFromIndex<T>( |
193 | 194 | static IObservable<T> GetAndFetchLatestFromIndex<T>(this IBlobCache This, |
194 | 195 | string key, |
195 | 196 | Func<IObservable<T>> fetchFunc, |
196 | | - Action<T> removedItemsCallback, |
| 197 | + Action<string> removedItemsCallback, |
197 | 198 | Func<DateTimeOffset, bool> fetchPredicate = null, |
198 | 199 | DateTimeOffset? absoluteExpiration = null, |
199 | 200 | bool shouldInvalidateOnError = false) |
200 | 201 | where T : CacheItem |
201 | 202 | { |
202 | | - var fetch = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key)) |
| 203 | + var idx = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))).Replay().RefCount(); |
| 204 | + |
| 205 | + |
| 206 | + var fetch = idx |
203 | 207 | .Select(x => Tuple.Create(x, fetchPredicate == null || !x.Keys.Any() || fetchPredicate(x.UpdatedAt))) |
204 | 208 | .Where(predicateIsTrue => predicateIsTrue.Item2) |
205 | 209 | .Select(x => x.Item1) |
206 | | - .SelectMany(index => index.Clear(This, key, absoluteExpiration)) |
| 210 | + .Select(index => index.Clear()) |
207 | 211 | .SelectMany(index => |
208 | 212 | { |
209 | | - var fetchObs = fetchFunc().Catch<T, Exception>(ex => |
210 | | - { |
211 | | - var shouldInvalidate = shouldInvalidateOnError ? |
212 | | - This.InvalidateObject<CacheIndex>(key) : |
213 | | - Observable.Return(Unit.Default); |
214 | | - return shouldInvalidate.SelectMany(__ => Observable.Throw<T>(ex)); |
215 | | - }); |
| 213 | + var fetchObs = fetchFunc() |
| 214 | + .Catch<T, Exception>(ex => |
| 215 | + { |
| 216 | + var shouldInvalidate = shouldInvalidateOnError ? |
| 217 | + This.InvalidateObject<CacheIndex>(key) : |
| 218 | + Observable.Return(Unit.Default); |
| 219 | + return shouldInvalidate.SelectMany(__ => Observable.Throw<T>(ex)); |
| 220 | + }); |
216 | 221 |
|
217 | 222 | return fetchObs |
218 | 223 | .SelectMany(x => x.Save<T>(This, key, absoluteExpiration)) |
219 | | - .Do(x => index.AddAndSave(This, key, x, absoluteExpiration)) |
220 | | - .Finally(() => |
221 | | - { |
222 | | - This.GetObjects<T>(index.OldKeys.Except(index.Keys)) |
223 | | - .Do(dict => This.InvalidateObjects<T>(dict.Keys)) |
224 | | - .SelectMany(dict => dict.Values) |
225 | | - .Do(removedItemsCallback) |
226 | | - .Subscribe(); |
227 | | - }); |
228 | | - })); |
| 224 | + .Do(x => index.Add(key, x)); |
| 225 | + }); |
229 | 226 |
|
230 | | - var cache = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key)) |
231 | | - .SelectMany(index => This.GetObjects<T>(index.Keys)) |
232 | | - .SelectMany(dict => dict.Values)); |
| 227 | + var cache = idx |
| 228 | + .SelectMany(index => This.GetObjects<T>(index.Keys.ToList())) |
| 229 | + .SelectMany(dict => dict.Values); |
| 230 | + |
| 231 | + return cache.Merge(fetch) |
| 232 | + .Finally(async () => |
| 233 | + { |
| 234 | + var index = await idx; |
| 235 | + await index.Save(This); |
233 | 236 |
|
234 | | - return cache.Merge(fetch).Replay().RefCount(); |
| 237 | + var list = index.OldKeys.Except(index.Keys); |
| 238 | + if (!list.Any()) |
| 239 | + return; |
| 240 | + foreach (var d in list) |
| 241 | + removedItemsCallback(d); |
| 242 | + await This.InvalidateObjects<T>(list); |
| 243 | + }) |
| 244 | + .Replay().RefCount(); |
235 | 245 | } |
236 | 246 |
|
237 | 247 | /// <summary> |
|
0 commit comments