Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit daf0d82

Browse files
committed
Fix ALL THE RACES!!!!11
1 parent 6c97825 commit daf0d82

File tree

10 files changed

+428
-229
lines changed

10 files changed

+428
-229
lines changed

src/GitHub.App/Caches/CacheIndex.cs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ public CacheIndex()
2121
OldKeys = new List<string>();
2222
}
2323

24+
public CacheIndex Add(string indexKey, CacheItem item)
25+
{
26+
var k = string.Format(CultureInfo.InvariantCulture, "{0}|{1}", IndexKey, item.Key);
27+
if (!Keys.Contains(k))
28+
Keys.Add(k);
29+
UpdatedAt = DateTimeOffset.UtcNow;
30+
return this;
31+
}
32+
2433
public IObservable<CacheIndex> AddAndSave(IBlobCache cache, string indexKey, CacheItem item,
2534
DateTimeOffset? absoluteExpiration = null)
2635
{
@@ -29,7 +38,7 @@ public IObservable<CacheIndex> AddAndSave(IBlobCache cache, string indexKey, Cac
2938
Keys.Add(k);
3039
UpdatedAt = DateTimeOffset.UtcNow;
3140
return cache.InsertObject(IndexKey, this, absoluteExpiration)
32-
.Select(x => this);
41+
.Select(x => this);
3342
}
3443

3544
public static IObservable<CacheIndex> AddAndSaveToIndex(IBlobCache cache, string indexKey, CacheItem item,
@@ -47,15 +56,19 @@ public static IObservable<CacheIndex> AddAndSaveToIndex(IBlobCache cache, string
4756
.Select(x => index));
4857
}
4958

50-
public IObservable<CacheIndex> Clear(IBlobCache cache, string indexKey, DateTimeOffset? absoluteExpiration = null)
59+
public CacheIndex Clear()
5160
{
5261
OldKeys = Keys.ToList();
5362
Keys.Clear();
5463
UpdatedAt = DateTimeOffset.UtcNow;
55-
return cache
56-
.InvalidateObject<CacheIndex>(indexKey)
57-
.SelectMany(_ => cache.InsertObject(indexKey, this, absoluteExpiration))
58-
.Select(_ => this);
64+
return this;
65+
}
66+
67+
public IObservable<CacheIndex> Save(IBlobCache cache,
68+
DateTimeOffset? absoluteExpiration = null)
69+
{
70+
return cache.InsertObject(IndexKey, this, absoluteExpiration)
71+
.Select(x => this);
5972
}
6073

6174
[AllowNull]

src/GitHub.App/Extensions/AkavacheExtensions.cs

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Reactive.Linq;
55
using Akavache;
66
using GitHub.Caches;
7+
using System.Threading.Tasks;
78

89
namespace GitHub.Extensions
910
{
@@ -165,7 +166,7 @@ public static IObservable<T> GetAndFetchLatestFromIndex<T>(
165166
this IBlobCache blobCache,
166167
string key,
167168
Func<IObservable<T>> fetchFunc,
168-
Action<T> removedItemsCallback,
169+
Action<string> removedItemsCallback,
169170
TimeSpan refreshInterval,
170171
TimeSpan maxCacheDuration)
171172
where T : CacheItem
@@ -193,45 +194,54 @@ public static IObservable<T> GetAndFetchLatestFromIndex<T>(
193194
static IObservable<T> GetAndFetchLatestFromIndex<T>(this IBlobCache This,
194195
string key,
195196
Func<IObservable<T>> fetchFunc,
196-
Action<T> removedItemsCallback,
197+
Action<string> removedItemsCallback,
197198
Func<DateTimeOffset, bool> fetchPredicate = null,
198199
DateTimeOffset? absoluteExpiration = null,
199200
bool shouldInvalidateOnError = false)
200201
where T : CacheItem
201202
{
202-
var fetch = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))
203+
var idx = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))).Replay().RefCount();
204+
205+
206+
var fetch = idx
203207
.Select(x => Tuple.Create(x, fetchPredicate == null || !x.Keys.Any() || fetchPredicate(x.UpdatedAt)))
204208
.Where(predicateIsTrue => predicateIsTrue.Item2)
205209
.Select(x => x.Item1)
206-
.SelectMany(index => index.Clear(This, key, absoluteExpiration))
210+
.Select(index => index.Clear())
207211
.SelectMany(index =>
208212
{
209-
var fetchObs = fetchFunc().Catch<T, Exception>(ex =>
210-
{
211-
var shouldInvalidate = shouldInvalidateOnError ?
212-
This.InvalidateObject<CacheIndex>(key) :
213-
Observable.Return(Unit.Default);
214-
return shouldInvalidate.SelectMany(__ => Observable.Throw<T>(ex));
215-
});
213+
var fetchObs = fetchFunc()
214+
.Catch<T, Exception>(ex =>
215+
{
216+
var shouldInvalidate = shouldInvalidateOnError ?
217+
This.InvalidateObject<CacheIndex>(key) :
218+
Observable.Return(Unit.Default);
219+
return shouldInvalidate.SelectMany(__ => Observable.Throw<T>(ex));
220+
});
216221

217222
return fetchObs
218223
.SelectMany(x => x.Save<T>(This, key, absoluteExpiration))
219-
.Do(x => index.AddAndSave(This, key, x, absoluteExpiration))
220-
.Finally(() =>
221-
{
222-
This.GetObjects<T>(index.OldKeys.Except(index.Keys))
223-
.Do(dict => This.InvalidateObjects<T>(dict.Keys))
224-
.SelectMany(dict => dict.Values)
225-
.Do(removedItemsCallback)
226-
.Subscribe();
227-
});
228-
}));
224+
.Do(x => index.Add(key, x));
225+
});
229226

230-
var cache = Observable.Defer(() => This.GetOrCreateObject(key, () => CacheIndex.Create(key))
231-
.SelectMany(index => This.GetObjects<T>(index.Keys))
232-
.SelectMany(dict => dict.Values));
227+
var cache = idx
228+
.SelectMany(index => This.GetObjects<T>(index.Keys.ToList()))
229+
.SelectMany(dict => dict.Values);
230+
231+
return cache.Merge(fetch)
232+
.Finally(async () =>
233+
{
234+
var index = await idx;
235+
await index.Save(This);
233236

234-
return cache.Merge(fetch).Replay().RefCount();
237+
var list = index.OldKeys.Except(index.Keys);
238+
if (!list.Any())
239+
return;
240+
foreach (var d in list)
241+
removedItemsCallback(d);
242+
await This.InvalidateObjects<T>(list);
243+
})
244+
.Replay().RefCount();
235245
}
236246

237247
static bool IsExpired(IBlobCache blobCache, DateTimeOffset itemCreatedAt, TimeSpan cacheDuration)

src/GitHub.App/Services/ModelService.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
using NLog;
1717
using NullGuard;
1818
using Octokit;
19+
using ReactiveUI;
1920

2021
namespace GitHub.Services
2122
{
@@ -136,7 +137,7 @@ public IObservable<AccountCacheItem> GetUserFromCache()
136137
}
137138

138139
public ITrackingCollection<IPullRequestModel> GetPullRequests(ISimpleRepositoryModel repo,
139-
[AllowNull]ITrackingCollection<IPullRequestModel> collection = null)
140+
ITrackingCollection<IPullRequestModel> collection)
140141
{
141142
// Since the api to list pull requests returns all the data for each pr, cache each pr in its own entry
142143
// and also cache an index that contains all the keys for each pr. This way we can fetch prs in bulk
@@ -147,9 +148,6 @@ public ITrackingCollection<IPullRequestModel> GetPullRequests(ISimpleRepositoryM
147148
var keyobs = GetUserFromCache()
148149
.Select(user => string.Format(CultureInfo.InvariantCulture, "{0}|{1}|pr", user.Login, repo.Name));
149150

150-
if (collection == null)
151-
collection = new TrackingCollection<IPullRequestModel>();
152-
153151
var source = Observable.Defer(() => keyobs
154152
.SelectMany(key =>
155153
hostCache.GetAndFetchLatestFromIndex(key, () =>
@@ -275,6 +273,11 @@ IPullRequestModel Create(PullRequestCacheItem prCacheItem)
275273
};
276274
}
277275

276+
static IPullRequestModel Create(string key)
277+
{
278+
return new PullRequestModel(Int32.Parse(key.Split('|').Last(), CultureInfo.InvariantCulture));
279+
}
280+
278281
public IObservable<Unit> InsertUser(AccountCacheItem user)
279282
{
280283
return hostCache.InsertObject("user", user);

src/GitHub.App/ViewModels/PullRequestListViewModel.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public PullRequestListViewModel(IRepositoryHost repositoryHost, ISimpleRepositor
7979
PullRequests = new TrackingCollection<IPullRequestModel>();
8080
pullRequests.Comparer = OrderedComparer<IPullRequestModel>.OrderByDescending(x => x.UpdatedAt).Compare;
8181
pullRequests.Filter = (pr, i, l) => pr.IsOpen;
82+
pullRequests.NewerComparer = OrderedComparer<IPullRequestModel>.OrderByDescending(x => x.UpdatedAt).Compare;
8283
}
8384

8485
public override void Initialize([AllowNull] ViewWithData data)

src/GitHub.Exports.Reactive/Collections/ITrackingCollection.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Collections.Specialized;
4+
using System.Reactive;
45

56
namespace GitHub.Collections
67
{
@@ -32,17 +33,29 @@ public interface ITrackingCollection<T> : IDisposable, IList<T> where T : ICopya
3233
/// </summary>
3334
/// <param name="theComparer">The comparer method for sorting, or null if not sorting</param>
3435
Func<T, T, int> Comparer { get; set; }
36+
3537
/// <summary>
3638
/// Set a new filter. This will cause the collection to be filtered
3739
/// </summary>
3840
/// <param name="theFilter">The new filter, or null to not have any filtering</param>
3941
Func<T, int, IList<T>, bool> Filter { get; set; }
42+
43+
/// <summary>
44+
/// Set a comparer that determines whether the item being processed is newer than the same
45+
/// item seen before. This is to prevent stale items from overriding newer items when data
46+
/// is coming simultaneously from cache and from live data. Use a timestamp-like comparison
47+
/// for best results
48+
/// </summary>
49+
/// <param name="theComparer">The comparer method for sorting, or null if not sorting</param>
50+
Func<T, T, int> NewerComparer { get; set; }
51+
4052
void AddItem(T item);
4153
void RemoveItem(T item);
4254
/// <summary>
4355
/// How long to delay between processing incoming items
4456
/// </summary>
4557
TimeSpan ProcessingDelay { get; set; }
4658
event NotifyCollectionChangedEventHandler CollectionChanged;
59+
IObservable<Unit> OriginalCompleted { get; }
4760
}
4861
}

0 commit comments

Comments
 (0)