Skip to content

Commit f431e2b

Browse files
authored
Add support for Updating aggregates in addition to GetById in repositories (#81)
* Changes to CorrelatedStreamStoreRepository - Add a ctor overload to inject both an IRepository and a cache factory. - Fix a bug in the Dispose(bool) method that could cause an exception to be thrown. - Update repositories and caches to include update semantics
1 parent 9d38c74 commit f431e2b

File tree

9 files changed

+170
-113
lines changed

9 files changed

+170
-113
lines changed

src/ReactiveDomain.Core/IEventSource.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,16 @@ public interface IEventSource
2727
/// <exception cref="System.ArgumentNullException">Thrown when <paramref name="events"/> is <c>null</c>.</exception>
2828
/// <exception cref="System.InvalidOperationException">Thrown when this instance has already recorded events.</exception>
2929
void RestoreFromEvents(IEnumerable<object> events);
30-
30+
31+
/// <summary>
32+
/// Updates this instance with historical events.
33+
/// </summary>
34+
/// <param name="events">The events to apply.</param>
35+
/// <param name="expectedVersion">The expected version prior to applying events</param>
36+
/// <exception cref="System.ArgumentNullException">Thrown when <paramref name="events"/> is <c>null</c>.</exception>
37+
/// <exception cref="System.InvalidOperationException">Thrown when this instance does not have historical events or expected version mismatch</exception>
38+
void UpdateWithEvents(IEnumerable<object> events, long expectedVersion);
39+
3140
/// <summary>
3241
/// Takes the recorded history of events from this instance (CQS violation, beware).
3342
/// </summary>

src/ReactiveDomain.Foundation/Domain/EventDrivenStateMachine.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@ public void RestoreFromEvents(IEnumerable<object> events) {
4444
_router.Route(@event);
4545
}
4646
}
47+
public void UpdateWithEvents(IEnumerable<object> events, long expectedVersion) {
48+
if (events == null)
49+
throw new ArgumentNullException(nameof(events));
50+
if (_version < 0)
51+
throw new InvalidOperationException("Updating with events is not possible when an instance has no historical events.");
52+
if ( _version != expectedVersion) {
53+
throw new InvalidOperationException("Expected version mismatch when updating ");
54+
}
55+
56+
foreach (var @event in events) {
57+
_version++;
58+
_router.Route(@event);
59+
}
60+
}
4761

4862
/// <summary>
4963
/// Returns all events from the EventRecorder since state was loaded or the last time TakeEvents was called

src/ReactiveDomain.Foundation/IRepository.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@ namespace ReactiveDomain.Foundation
33
{
44
public interface IRepository
55
{
6-
bool TryGetById<TAggregate>(Guid id, out TAggregate aggregate) where TAggregate : class, IEventSource;
7-
bool TryGetById<TAggregate>(Guid id, int version, out TAggregate aggregate) where TAggregate : class, IEventSource;
8-
TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IEventSource;
9-
TAggregate GetById<TAggregate>(Guid id, int version) where TAggregate : class, IEventSource;
10-
void Save(IEventSource aggregate);
11-
void UpdateToCurrent(IEventSource aggregate);
6+
bool TryGetById<TAggregate>(Guid id, out TAggregate aggregate, int version = int.MaxValue) where TAggregate : class, IEventSource;
7+
TAggregate GetById<TAggregate>(Guid id, int version = int.MaxValue) where TAggregate : class, IEventSource;
8+
void Update<TAggregate>(ref TAggregate aggregate, int version = int.MaxValue) where TAggregate : class, IEventSource;
9+
void Save(IEventSource aggregate);
1210
}
1311
}

src/ReactiveDomain.Foundation/StreamStore/CachingRepository.cs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System;
2-
using System.Collections.Generic;
32

43
namespace ReactiveDomain.Foundation.StreamStore {
54
/// <summary>
@@ -21,24 +20,15 @@ public CachingRepository(IRepository baseRepository, Func<IRepository,IAggregate
2120
if (cacheFactory == null) {
2221
cacheFactory = repo => new ReadThroughAggregateCache(repo);
2322
}
24-
2523
_cache = cacheFactory(baseRepository);
2624
}
27-
28-
29-
public bool TryGetById<TAggregate>(Guid id, out TAggregate aggregate) where TAggregate : class, IEventSource
30-
{
31-
return _cache.GetById(id, out aggregate);
32-
}
33-
public TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IEventSource
34-
{
35-
if (!_cache.GetById(id, out TAggregate aggregate)) {
36-
throw new AggregateNotFoundException(id,typeof(TAggregate));
37-
}
38-
return aggregate;
25+
public bool TryGetById<TAggregate>(Guid id, out TAggregate aggregate) where TAggregate : class, IEventSource {
26+
return _cache.TryGetById(id, out aggregate);
27+
}
28+
public TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IEventSource {
29+
return _cache.GetById<TAggregate>(id);
3930
}
40-
public void Save(IEventSource aggregate)
41-
{
31+
public void Save(IEventSource aggregate) {
4232
_cache.Save(aggregate);
4333
}
4434
public bool ClearCache(Guid id) {

src/ReactiveDomain.Foundation/StreamStore/CorrelatedStreamStoreRepository.cs

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,39 @@
33
using ReactiveDomain.Messaging;
44

55
// ReSharper disable once CheckNamespace
6-
namespace ReactiveDomain.Foundation{
7-
public class CorrelatedStreamStoreRepository:ICorrelatedRepository, IDisposable
6+
namespace ReactiveDomain.Foundation {
7+
public class CorrelatedStreamStoreRepository : ICorrelatedRepository, IDisposable
88
{
99
private readonly IRepository _repository;
10-
private readonly IAggregateCache _cache = null;
11-
public CorrelatedStreamStoreRepository(IRepository repository) {
10+
private readonly IAggregateCache _cache;
11+
public CorrelatedStreamStoreRepository(
12+
IRepository repository,
13+
Func<IRepository, IAggregateCache> cacheFactory = null) {
1214
_repository = repository;
15+
if (cacheFactory != null) {
16+
_cache = cacheFactory(_repository);
17+
}
1318
}
19+
1420
public CorrelatedStreamStoreRepository(
1521
IStreamNameBuilder streamNameBuilder,
1622
IStreamStoreConnection streamStoreConnection,
1723
IEventSerializer eventSerializer,
18-
Func<IRepository,IAggregateCache> cacheFactory = null) {
19-
_repository = new StreamStoreRepository(streamNameBuilder,streamStoreConnection,eventSerializer);
24+
Func<IRepository, IAggregateCache> cacheFactory = null) {
25+
_repository = new StreamStoreRepository(streamNameBuilder, streamStoreConnection, eventSerializer);
2026
if (cacheFactory != null) {
2127
_cache = cacheFactory(_repository);
2228
}
2329
}
24-
30+
2531
public bool TryGetById<TAggregate>(Guid id, out TAggregate aggregate, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource {
2632
return TryGetById(id, int.MaxValue, out aggregate, source);
2733
}
2834

2935
public TAggregate GetById<TAggregate>(Guid id, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource {
30-
return GetById<TAggregate>(id, int.MaxValue, source);
36+
return GetById<TAggregate>(id, int.MaxValue, source);
3137
}
32-
38+
3339
public bool TryGetById<TAggregate>(Guid id, int version, out TAggregate aggregate, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource {
3440
try {
3541
aggregate = GetById<TAggregate>(id, version, source);
@@ -40,33 +46,38 @@ public bool TryGetById<TAggregate>(Guid id, int version, out TAggregate aggregat
4046
return false;
4147
}
4248
}
43-
44-
public TAggregate GetById<TAggregate>(Guid id, int version, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource
45-
{
46-
TAggregate agg = null;
47-
_cache?.GetById(id, out agg);
4849

49-
if(agg == null || agg.Version > version) {
50+
public TAggregate GetById<TAggregate>(Guid id, int version, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource {
51+
TAggregate agg = _cache?.GetById<TAggregate>(id, version);
52+
53+
if (agg == null || agg.Version > version) {
5054
agg = _repository.GetById<TAggregate>(id, version);
55+
if (agg != null) {
56+
_cache?.Save(agg);
57+
}
58+
}
59+
60+
if (agg != null) {
61+
((ICorrelatedEventSource) agg).Source = source;
5162
}
52-
53-
((ICorrelatedEventSource)agg).Source = source;
5463
return agg;
5564
}
5665

5766
public void Save(IEventSource aggregate) {
5867
if (_cache != null) {
59-
_cache.Save( aggregate);
68+
_cache.Save(aggregate);
6069
}
6170
else {
6271
_repository.Save(aggregate);
6372
}
6473
}
74+
6575
protected virtual void Dispose(bool disposing) {
6676
if (disposing) {
67-
_cache.Dispose();
77+
_cache?.Dispose();
6878
}
6979
}
80+
7081
public void Dispose() {
7182
Dispose(true);
7283
GC.SuppressFinalize(this);

src/ReactiveDomain.Foundation/StreamStore/IAggregateCache.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ namespace ReactiveDomain.Foundation.StreamStore {
55
/// While it might seem more natural to save and restore the event set behind the aggregate,
66
/// this cache stores only the collapsed state in the aggregate
77
/// </summary>
8-
public interface IAggregateCache: IDisposable
8+
public interface IAggregateCache:IRepository, IDisposable
99
{
10-
bool GetById<TAggregate>(Guid id, out TAggregate aggregate) where TAggregate : class, IEventSource;
11-
bool Save(IEventSource aggregate);
1210
bool Remove(Guid id);
1311
void Clear();
1412
}

src/ReactiveDomain.Foundation/StreamStore/ReadThroughAggregateCache.cs

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System;
2+
using System.Collections;
23
using System.Collections.Generic;
34

4-
namespace ReactiveDomain.Foundation.StreamStore {
5+
namespace ReactiveDomain.Foundation.StreamStore
6+
{
57
/// <summary>
68
/// This implementation assumes that the cached aggregate may have changes since it was last seen
79
///
@@ -14,47 +16,81 @@ namespace ReactiveDomain.Foundation.StreamStore {
1416
///
1517
/// Save failures will clear the aggregate from the cache and return false
1618
/// </summary>
17-
public class ReadThroughAggregateCache :IAggregateCache, IDisposable {
19+
public class ReadThroughAggregateCache : IAggregateCache, IDisposable
20+
{
1821

1922
private readonly IRepository _baseRepository;
2023
private readonly Dictionary<Guid, IEventSource> _knownAggregates = new Dictionary<Guid, IEventSource>();
2124
public ReadThroughAggregateCache(IRepository baseRepository) {
2225
_baseRepository = baseRepository;
2326
}
24-
25-
26-
public bool GetById<TAggregate>(Guid id, out TAggregate aggregate) where TAggregate : class, IEventSource {
27+
public bool TryGetById<TAggregate>(Guid id, out TAggregate aggregate, int version = int.MaxValue) where TAggregate : class, IEventSource {
2728
try {
28-
aggregate = GetById<TAggregate>(id);
29+
aggregate = GetById<TAggregate>(id, version);
2930
return true;
3031
}
3132
catch (Exception) {
3233
aggregate = null;
34+
Remove(id);
3335
return false;
3436
}
3537
}
36-
private TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IEventSource {
37-
if (_knownAggregates.TryGetValue(id, out var aggregate)) {
38-
try {
39-
_baseRepository.UpdateToCurrent(aggregate);
40-
}
41-
catch(InvalidOperationException ex) {
42-
_knownAggregates.Remove(aggregate.Id);
43-
throw new Exception("Persisted version changed with recorded events in aggregate.", ex);
44-
}
45-
catch (AggregateVersionException ex) {
46-
_knownAggregates.Remove(aggregate.Id);
47-
throw new Exception("Persisted version mismatch.", ex);
48-
}
49-
return aggregate as TAggregate;
50-
}
5138

52-
aggregate = _baseRepository.GetById<TAggregate>(id);
39+
public TAggregate GetById<TAggregate>(Guid id, int version = int.MaxValue) where TAggregate : class, IEventSource {
40+
if (_knownAggregates.TryGetValue(id, out var cached)) {
41+
var agg = (TAggregate) cached;
42+
Update(ref agg, version);
43+
return (TAggregate)cached;
44+
}
45+
46+
var aggregate = _baseRepository.GetById<TAggregate>(id, version);
5347
_knownAggregates.Add(id, aggregate);
48+
return aggregate;
49+
}
50+
51+
public void Update<TAggregate>(ref TAggregate aggregate, int version = int.MaxValue) where TAggregate : class, IEventSource {
52+
if (aggregate == null) throw new ArgumentNullException(nameof(aggregate));
53+
if (aggregate.ExpectedVersion == version) return;
54+
55+
_knownAggregates.TryGetValue(aggregate.Id, out var cached);
56+
57+
if (cached == null) {
58+
_baseRepository.Update(ref aggregate, version);
59+
UpdateCache(aggregate);
60+
return;
61+
}
62+
63+
if (cached.ExpectedVersion == version) {
64+
aggregate = (TAggregate)cached;
65+
return;
66+
}
5467

55-
return (TAggregate)aggregate;
68+
if (cached.ExpectedVersion > version) {
69+
//cache is ahead of requested version, we need to get it fresh from repo
70+
_baseRepository.Update(ref aggregate, version);
71+
//don't regress the cache, just return
72+
return;
73+
}
74+
//cache is ahead of item, but behind requested version
75+
aggregate = (TAggregate)cached;
76+
_baseRepository.Update(ref aggregate, version);
77+
UpdateCache(aggregate);
5678
}
57-
public bool Save(IEventSource aggregate) {
79+
80+
81+
82+
private void UpdateCache(IEventSource aggregate) {
83+
if (_knownAggregates.TryGetValue(aggregate.Id, out var cached)) {
84+
if (cached.ExpectedVersion < aggregate.ExpectedVersion) {
85+
_knownAggregates[aggregate.Id] = aggregate;
86+
}
87+
}
88+
else {
89+
_knownAggregates.Add(aggregate.Id, aggregate);
90+
}
91+
}
92+
93+
public void Save(IEventSource aggregate) {
5894
try {
5995
_baseRepository.Save(aggregate);
6096
if (!_knownAggregates.ContainsKey(aggregate.Id)) {
@@ -63,15 +99,12 @@ public bool Save(IEventSource aggregate) {
6399
else {
64100
_knownAggregates[aggregate.Id] = aggregate;
65101
}
66-
67-
return true;
68102
}
69103
catch {
70104
_knownAggregates.Remove(aggregate.Id);
71-
return false;
72105
}
73106

74-
107+
75108
}
76109
public bool Remove(Guid id) {
77110
return _knownAggregates.Remove(id);

0 commit comments

Comments
 (0)