Skip to content

Commit c5670dd

Browse files
authored
IAggregatedCache and ReadThroughAggregateCache to support CorrelatedStreamStoreRepository with tests (#79)
1 parent 1a8e19e commit c5670dd

File tree

5 files changed

+401
-52
lines changed

5 files changed

+401
-52
lines changed
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
using System;
2+
using System.Linq;
3+
using ReactiveDomain.Messaging;
4+
using ReactiveDomain.Testing.EventStore;
5+
using ReactiveDomain.Util;
6+
using Xunit;
7+
8+
namespace ReactiveDomain.Foundation.Tests
9+
{
10+
// ReSharper disable once InconsistentNaming
11+
public class when_using_correlated_repository
12+
{
13+
private readonly CorrelatedStreamStoreRepository _correlatedRepo;
14+
private readonly Guid _accountId = Guid.NewGuid();
15+
16+
public when_using_correlated_repository()
17+
{
18+
var mockStore = new MockStreamStoreConnection("testRepo");
19+
mockStore.Connect();
20+
var repo = new StreamStoreRepository(new PrefixedCamelCaseStreamNameBuilder(), mockStore, new JsonMessageSerializer());
21+
_correlatedRepo = new CorrelatedStreamStoreRepository(repo);
22+
ICorrelatedMessage source = MessageBuilder.New(() => new CreateAccount(_accountId));
23+
var account = new Account(_accountId, source);
24+
account.Credit(7);
25+
account.Credit(13);
26+
account.Credit(31);
27+
repo.Save(account);
28+
}
29+
30+
[Fact]
31+
public void can_get_by_id()
32+
{
33+
ICorrelatedMessage source = MessageBuilder.New(() => new CreditAccount(_accountId, 50));
34+
var retrievedAccount = _correlatedRepo.GetById<Account>(_accountId, source);
35+
Assert.NotNull(retrievedAccount);
36+
Assert.Equal(51, retrievedAccount.Balance);
37+
Assert.Equal(_accountId, retrievedAccount.Id);
38+
39+
}
40+
41+
[Fact]
42+
public void can_get_by_id_at_version()
43+
{
44+
ICorrelatedMessage source = MessageBuilder.New(() => new CreditAccount(_accountId, 50));
45+
var retrievedAccount = _correlatedRepo.GetById<Account>(_accountId, 1, source);
46+
Assert.NotNull(retrievedAccount);
47+
Assert.Equal(0, retrievedAccount.Balance);
48+
Assert.Equal(_accountId, retrievedAccount.Id);
49+
50+
retrievedAccount = _correlatedRepo.GetById<Account>(_accountId, 2, source);
51+
Assert.NotNull(retrievedAccount);
52+
Assert.Equal(7, retrievedAccount.Balance);
53+
Assert.Equal(_accountId, retrievedAccount.Id);
54+
55+
}
56+
[Fact]
57+
public void can_try_get_by_id()
58+
{
59+
ICorrelatedMessage source = MessageBuilder.New(() => new CreditAccount(_accountId, 50));
60+
Assert.True(_correlatedRepo.TryGetById<Account>(_accountId, out var retrievedAccount, source));
61+
Assert.NotNull(retrievedAccount);
62+
Assert.Equal(51, retrievedAccount.Balance);
63+
Assert.Equal(_accountId, retrievedAccount.Id);
64+
65+
}
66+
67+
[Fact]
68+
public void can_try_get_by_id_at_version()
69+
{
70+
ICorrelatedMessage source = MessageBuilder.New(() => new CreditAccount(_accountId, 50));
71+
Assert.True(_correlatedRepo.TryGetById<Account>(_accountId, 1, out var retrievedAccount, source));
72+
Assert.NotNull(retrievedAccount);
73+
Assert.Equal(0, retrievedAccount.Balance);
74+
Assert.Equal(_accountId, retrievedAccount.Id);
75+
76+
Assert.True(_correlatedRepo.TryGetById<Account>(_accountId, 3, out retrievedAccount, source));
77+
Assert.NotNull(retrievedAccount);
78+
Assert.Equal(20, retrievedAccount.Balance);
79+
Assert.Equal(_accountId, retrievedAccount.Id);
80+
81+
}
82+
[Fact]
83+
public void try_get_does_not_throw()
84+
{
85+
var badId = Guid.NewGuid();
86+
ICorrelatedMessage source = MessageBuilder.New(() => new CreditAccount(_accountId, 50));
87+
Assert.False(_correlatedRepo.TryGetById<Account>(badId, out var retrievedAccount, source));
88+
Assert.Null(retrievedAccount);
89+
90+
}
91+
[Fact]
92+
public void invalid_get_rethrows()
93+
{
94+
var badId = Guid.NewGuid();
95+
ICorrelatedMessage source = MessageBuilder.New(() => new CreditAccount(_accountId, 50));
96+
Assert.Throws<AggregateNotFoundException>(() => _correlatedRepo.GetById<Account>(badId, source));
97+
}
98+
99+
[Fact]
100+
public void new_correlated_aggregates_inject_source_information()
101+
{
102+
var newAccountId = Guid.NewGuid();
103+
ICorrelatedMessage source = MessageBuilder.New(() => new CreateAccount(newAccountId));
104+
var newAccount = new Account(newAccountId, source);
105+
newAccount.Credit(7);
106+
newAccount.Credit(13);
107+
newAccount.Credit(31);
108+
109+
var eventSource = (IEventSource)newAccount;
110+
var correlatedEvents = eventSource.TakeEvents().Select(evt => evt as ICorrelatedMessage).ToArray();
111+
foreach (var evt in correlatedEvents) {
112+
Assert.Equal(source.MsgId, evt.CausationId);
113+
Assert.Equal(source.CorrelationId, evt.CorrelationId);
114+
}
115+
}
116+
[Fact]
117+
public void updated_correlated_aggregates_inject_source_information()
118+
{
119+
120+
ICorrelatedMessage source = MessageBuilder.New(() => new CreditAccount(_accountId, 50));
121+
var retrievedAccount = _correlatedRepo.GetById<Account>(_accountId, source);
122+
retrievedAccount.Credit(7);
123+
retrievedAccount.Credit(13);
124+
retrievedAccount.Credit(31);
125+
126+
var eventSource = (IEventSource)retrievedAccount;
127+
var correlatedEvents = eventSource.TakeEvents().Select(evt => evt as ICorrelatedMessage).ToArray();
128+
foreach (var evt in correlatedEvents) {
129+
Assert.Equal(source.MsgId, evt.CausationId);
130+
Assert.Equal(source.CorrelationId, evt.CorrelationId);
131+
}
132+
}
133+
134+
135+
[Fact]
136+
public void can_save_new_correlated_aggregates()
137+
{
138+
var newAccountId = Guid.NewGuid();
139+
ICorrelatedMessage source = MessageBuilder.New(() => new CreateAccount(newAccountId));
140+
var newAccount = new Account(newAccountId, source);
141+
newAccount.Credit(7);
142+
newAccount.Credit(13);
143+
newAccount.Credit(31);
144+
_correlatedRepo.Save(newAccount);
145+
146+
var retrievedAccount = _correlatedRepo.GetById<Account>(newAccountId, source);
147+
Assert.NotNull(retrievedAccount);
148+
Assert.Equal(51, retrievedAccount.Balance);
149+
Assert.Equal(newAccountId, retrievedAccount.Id);
150+
}
151+
152+
[Fact]
153+
public void can_save_updated_correlated_aggregates()
154+
{
155+
ICorrelatedMessage source = MessageBuilder.New(() => new CreditAccount(_accountId, 50));
156+
157+
var retrievedAccount = _correlatedRepo.GetById<Account>(_accountId, source);
158+
Assert.NotNull(retrievedAccount);
159+
Assert.Equal(_accountId, retrievedAccount.Id);
160+
161+
retrievedAccount.Credit(50);
162+
_correlatedRepo.Save(retrievedAccount);
163+
164+
var retrievedAccount2 = _correlatedRepo.GetById<Account>(_accountId, source);
165+
Assert.NotNull(retrievedAccount2);
166+
Assert.Equal(_accountId, retrievedAccount2.Id);
167+
Assert.Equal(101, retrievedAccount.Balance);
168+
169+
}
170+
171+
172+
public class Account : AggregateRoot
173+
{
174+
//n.b. for infrastructure testing only not for prod or business unit tests
175+
public long Balance { get; private set; }
176+
//reflection constructor
177+
// ReSharper disable once UnusedMember.Local
178+
private Account()
179+
{
180+
Register<AccountCreated>(evt => Id = evt.AccountId);
181+
Register<AccountCredited>(evt => { Balance += evt.Amount; });
182+
}
183+
public Account(Guid id, ICorrelatedMessage source) : this()
184+
{
185+
((ICorrelatedEventSource)this).Source = source;
186+
Ensure.NotEmptyGuid(id, "id");
187+
Raise(new AccountCreated(id));
188+
}
189+
190+
public void Credit(uint amount)
191+
{
192+
Raise(new AccountCredited(Id, amount));
193+
}
194+
}
195+
public class CreateAccount : Command
196+
{
197+
public readonly Guid AccountId;
198+
public CreateAccount(Guid accountId)
199+
{
200+
AccountId = accountId;
201+
}
202+
}
203+
public class AccountCreated : Event
204+
{
205+
public readonly Guid AccountId;
206+
public AccountCreated(Guid accountId)
207+
{
208+
AccountId = accountId;
209+
}
210+
}
211+
public class CreditAccount : Command
212+
{
213+
public readonly Guid AccountId;
214+
public readonly uint Amount;
215+
public CreditAccount(Guid accountId, uint amount)
216+
{
217+
AccountId = accountId;
218+
Amount = amount;
219+
}
220+
}
221+
//use of base class Event is optional
222+
public class AccountCredited : ICorrelatedMessage
223+
{
224+
public Guid MsgId { get; }
225+
public Guid CorrelationId { get; set; }
226+
public Guid CausationId { get; set; }
227+
public readonly Guid AccountId;
228+
public readonly uint Amount;
229+
230+
public AccountCredited(Guid accountId, uint amount)
231+
{
232+
MsgId = Guid.NewGuid();
233+
AccountId = accountId;
234+
Amount = amount;
235+
}
236+
}
237+
}
238+
}

src/ReactiveDomain.Foundation/StreamStore/CachingRepository.cs

Lines changed: 23 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -14,66 +14,43 @@ namespace ReactiveDomain.Foundation.StreamStore {
1414
///
1515
/// Save failures will clear the aggregate from the cache and rethrow
1616
/// </summary>
17-
public class CachingRepository : IDisposable {
17+
public class CachingRepository : IDisposable
18+
{
19+
private readonly IAggregateCache _cache;
20+
public CachingRepository(IRepository baseRepository, Func<IRepository,IAggregateCache> cacheFactory = null) {
21+
if (cacheFactory == null) {
22+
cacheFactory = repo => new ReadThroughAggregateCache(repo);
23+
}
1824

19-
private readonly IRepository _baseRepository;
20-
private readonly Dictionary<Guid, IEventSource> _knownAggregates = new Dictionary<Guid, IEventSource>();
21-
public CachingRepository(IRepository baseRepository) {
22-
_baseRepository = baseRepository;
25+
_cache = cacheFactory(baseRepository);
2326
}
2427

2528

26-
public bool TryGetById<TAggregate>(Guid id, out TAggregate aggregate) where TAggregate : class, IEventSource {
27-
try {
28-
aggregate = GetById<TAggregate>(id);
29-
return true;
30-
}
31-
catch (Exception) {
32-
aggregate = null;
33-
return false;
34-
}
35-
}
36-
public TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IEventSource {
37-
if (_knownAggregates.TryGetValue(id, out var aggregate)) {
38-
try {
39-
_baseRepository.UpdateToCurrent(aggregate);
40-
}
41-
catch(InvalidOperationException ex) {
42-
throw new Exception("Persisted version changed with recorded events in aggregate.", ex);
43-
}
44-
catch (AggregateVersionException ex) {
45-
throw new Exception("Persisted version mismatch.", ex);
46-
}
47-
return aggregate as TAggregate;
29+
public bool TryGetById<TAggregate>(Guid id, out TAggregate aggregate) where TAggregate : class, IEventSource
30+
{
31+
return _cache.GetById(id, out aggregate);
32+
}
33+
public TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IEventSource
34+
{
35+
if (!_cache.GetById(id, out TAggregate aggregate)) {
36+
throw new AggregateNotFoundException(id,typeof(TAggregate));
4837
}
49-
50-
aggregate = _baseRepository.GetById<TAggregate>(id);
51-
_knownAggregates.Add(id, aggregate);
52-
53-
return (TAggregate)aggregate;
38+
return aggregate;
5439
}
55-
public void Save(IEventSource aggregate) {
56-
try {
57-
_baseRepository.Save(aggregate);
58-
if (!_knownAggregates.ContainsKey(aggregate.Id)) {
59-
_knownAggregates.Add(aggregate.Id, aggregate);
60-
}
61-
}
62-
catch {
63-
_knownAggregates.Remove(aggregate.Id);
64-
throw;
65-
}
40+
public void Save(IEventSource aggregate)
41+
{
42+
_cache.Save(aggregate);
6643
}
6744
public bool ClearCache(Guid id) {
68-
return _knownAggregates.Remove(id);
45+
return _cache.Remove(id);
6946
}
7047
public void ClearCache() {
71-
_knownAggregates.Clear();
48+
_cache.Clear();
7249
}
7350

7451
protected virtual void Dispose(bool disposing) {
7552
if (disposing) {
76-
_knownAggregates.Clear();
53+
_cache.Dispose();
7754
}
7855
}
7956
public void Dispose() {

src/ReactiveDomain.Foundation/StreamStore/CorrelatedStreamStoreRepository.cs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
11
using System;
2+
using ReactiveDomain.Foundation.StreamStore;
23
using ReactiveDomain.Messaging;
34

45
// ReSharper disable once CheckNamespace
56
namespace ReactiveDomain.Foundation{
6-
public class CorrelatedStreamStoreRepository:ICorrelatedRepository
7+
public class CorrelatedStreamStoreRepository:ICorrelatedRepository, IDisposable
78
{
89
private readonly IRepository _repository;
10+
private readonly IAggregateCache _cache = null;
911
public CorrelatedStreamStoreRepository(IRepository repository) {
1012
_repository = repository;
1113
}
1214
public CorrelatedStreamStoreRepository(
1315
IStreamNameBuilder streamNameBuilder,
1416
IStreamStoreConnection streamStoreConnection,
15-
IEventSerializer eventSerializer) {
17+
IEventSerializer eventSerializer,
18+
Func<IRepository,IAggregateCache> cacheFactory = null) {
1619
_repository = new StreamStoreRepository(streamNameBuilder,streamStoreConnection,eventSerializer);
20+
if (cacheFactory != null) {
21+
_cache = cacheFactory(_repository);
22+
}
1723
}
1824

1925
public bool TryGetById<TAggregate>(Guid id, out TAggregate aggregate, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource {
@@ -35,15 +41,35 @@ public bool TryGetById<TAggregate>(Guid id, int version, out TAggregate aggregat
3541
}
3642
}
3743

38-
public TAggregate GetById<TAggregate>(Guid id, int version, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource {
39-
var agg = _repository.GetById<TAggregate>(id, version);
44+
public TAggregate GetById<TAggregate>(Guid id, int version, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource
45+
{
46+
TAggregate agg = null;
47+
_cache?.GetById(id, out agg);
48+
49+
if(agg == null || agg.Version > version) {
50+
agg = _repository.GetById<TAggregate>(id, version);
51+
}
52+
4053
((ICorrelatedEventSource)agg).Source = source;
4154
return agg;
4255
}
43-
4456

4557
public void Save(IEventSource aggregate) {
46-
_repository.Save(aggregate);
58+
if (_cache != null) {
59+
_cache.Save( aggregate);
60+
}
61+
else {
62+
_repository.Save(aggregate);
63+
}
64+
}
65+
protected virtual void Dispose(bool disposing) {
66+
if (disposing) {
67+
_cache.Dispose();
68+
}
69+
}
70+
public void Dispose() {
71+
Dispose(true);
72+
GC.SuppressFinalize(this);
4773
}
4874
}
4975
}

0 commit comments

Comments
 (0)