Skip to content

Commit 1274150

Browse files
authored
Expose delete functionality from ES connection to aggregates (#98)
1 parent d2b0517 commit 1274150

File tree

11 files changed

+136
-12
lines changed

11 files changed

+136
-12
lines changed

src/ReactiveDomain.Foundation.Tests/when_using_caching_repository.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,19 @@ public void can_remove_by_id_from_the_cache()
105105

106106
}
107107

108+
[Fact]
109+
public void can_delete_aggregate()
110+
{
111+
var newAccountId = Guid.NewGuid();
112+
ICorrelatedMessage source = MessageBuilder.New(() => new when_using_correlated_repository.CreateAccount(newAccountId));
113+
var newAccount = new when_using_correlated_repository.Account(newAccountId, source);
114+
_cachingRepo.Save(newAccount);
115+
116+
var retrievedAccount = _cachingRepo.GetById<when_using_correlated_repository.Account>(newAccountId);
117+
_cachingRepo.Delete(retrievedAccount);
118+
119+
Assert.Throws<AggregateNotFoundException>(() => _cachingRepo.GetById<when_using_correlated_repository.Account>(newAccountId));
120+
}
108121

109122
public class Account : EventDrivenStateMachine
110123
{

src/ReactiveDomain.Foundation.Tests/when_using_correlated_repository.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,21 @@ public void can_save_updated_correlated_aggregates()
165165
Assert.NotNull(retrievedAccount2);
166166
Assert.Equal(_accountId, retrievedAccount2.Id);
167167
Assert.Equal(101, retrievedAccount.Balance);
168+
}
169+
170+
[Fact]
171+
public void can_delete_aggregate()
172+
{
173+
var newAccountId = Guid.NewGuid();
174+
ICorrelatedMessage source = MessageBuilder.New(() => new CreateAccount(newAccountId));
175+
var newAccount = new Account(newAccountId, source);
176+
_correlatedRepo.Save(newAccount);
168177

169-
}
170-
178+
var retrievedAccount = _correlatedRepo.GetById<Account>(newAccountId, source);
179+
_correlatedRepo.Delete(retrievedAccount);
180+
181+
Assert.Throws<AggregateNotFoundException>(() => _correlatedRepo.GetById<Account>(newAccountId, source));
182+
}
171183

172184
public class Account : AggregateRoot
173185
{

src/ReactiveDomain.Foundation/ICorrelatedRepository.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@ public interface ICorrelatedRepository
99
TAggregate GetById<TAggregate>(Guid id, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource;
1010
TAggregate GetById<TAggregate>(Guid id, int version, ICorrelatedMessage source) where TAggregate : AggregateRoot, IEventSource;
1111
void Save(IEventSource aggregate);
12+
void Delete(IEventSource aggregate);
13+
void HardDelete(IEventSource aggregate);
1214
}
1315
}

src/ReactiveDomain.Foundation/IRepository.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,7 @@ public interface IRepository
77
TAggregate GetById<TAggregate>(Guid id, int version = int.MaxValue) where TAggregate : class, IEventSource;
88
void Update<TAggregate>(ref TAggregate aggregate, int version = int.MaxValue) where TAggregate : class, IEventSource;
99
void Save(IEventSource aggregate);
10+
void Delete(IEventSource aggregate);
11+
void HardDelete(IEventSource aggregate);
1012
}
1113
}

src/ReactiveDomain.Foundation/StreamStore/CachingRepository.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ public TAggregate GetById<TAggregate>(Guid id) where TAggregate : class, IEventS
3131
public void Save(IEventSource aggregate) {
3232
_cache.Save(aggregate);
3333
}
34+
public void Delete(IEventSource aggregate) {
35+
_cache.Delete(aggregate);
36+
}
37+
public void HardDelete(IEventSource aggregate) {
38+
_cache.HardDelete(aggregate);
39+
}
3440
public bool ClearCache(Guid id) {
3541
return _cache.Remove(id);
3642
}

src/ReactiveDomain.Foundation/StreamStore/CorrelatedStreamStoreRepository.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,33 @@ public void Save(IEventSource aggregate) {
7272
}
7373
}
7474

75+
/// <summary>
76+
/// Soft delete the aggregate. Its stream can be re-created by appending new events.
77+
/// </summary>
78+
/// <param name="aggregate">The aggregate to be deleted.</param>
79+
public void Delete(IEventSource aggregate) {
80+
if (_cache != null) {
81+
_cache.Delete(aggregate);
82+
}
83+
else {
84+
_repository.Delete(aggregate);
85+
}
86+
}
87+
88+
/// <summary>
89+
/// Hard delete the aggregate. This permanently deletes the aggregate's stream.
90+
/// </summary>
91+
/// <param name="aggregate">The aggregate to be deleted.</param>
92+
public void HardDelete(IEventSource aggregate)
93+
{
94+
if (_cache != null) {
95+
_cache.HardDelete(aggregate);
96+
}
97+
else {
98+
_repository.HardDelete(aggregate);
99+
}
100+
}
101+
75102
protected virtual void Dispose(bool disposing) {
76103
if (disposing) {
77104
_cache?.Dispose();

src/ReactiveDomain.Foundation/StreamStore/ReadThroughAggregateCache.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,25 @@ public void Save(IEventSource aggregate) {
106106

107107

108108
}
109+
110+
/// <summary>
111+
/// Soft delete the aggregate. Its stream can be re-created by appending new events.
112+
/// </summary>
113+
/// <param name="aggregate">The aggregate to be deleted.</param>
114+
public void Delete(IEventSource aggregate) {
115+
_baseRepository.Delete(aggregate);
116+
_knownAggregates.Remove(aggregate.Id);
117+
}
118+
119+
/// <summary>
120+
/// Hard delete the aggregate. This permanently deletes the aggregate's stream.
121+
/// </summary>
122+
/// <param name="aggregate">The aggregate to be deleted.</param>
123+
public void HardDelete(IEventSource aggregate) {
124+
_baseRepository.HardDelete(aggregate);
125+
_knownAggregates.Remove(aggregate.Id);
126+
}
127+
109128
public bool Remove(Guid id) {
110129
return _knownAggregates.Remove(id);
111130
}

src/ReactiveDomain.Foundation/StreamStore/StreamStoreRepository.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,25 @@ public void Save(IEventSource aggregate) {
136136
_streamStoreConnection.AppendToStream(streamName, expectedVersion, null, eventsToSave);
137137
}
138138

139+
/// <summary>
140+
/// Soft delete the aggregate. Its stream can be re-created by appending new events.
141+
/// </summary>
142+
/// <param name="aggregate">The aggregate to be deleted.</param>
143+
public void Delete(IEventSource aggregate) {
144+
var streamName = _streamNameBuilder.GenerateForAggregate(aggregate.GetType(), aggregate.Id);
145+
var expectedVersion = aggregate.ExpectedVersion;
146+
_streamStoreConnection.DeleteStream(streamName, expectedVersion);
147+
}
139148

149+
/// <summary>
150+
/// Hard delete the aggregate. This permanently deletes the aggregate's stream.
151+
/// </summary>
152+
/// <param name="aggregate">The aggregate to be deleted.</param>
153+
public void HardDelete(IEventSource aggregate)
154+
{
155+
var streamName = _streamNameBuilder.GenerateForAggregate(aggregate.GetType(), aggregate.Id);
156+
var expectedVersion = aggregate.ExpectedVersion;
157+
_streamStoreConnection.HardDeleteStream(streamName, expectedVersion);
158+
}
140159
}
141160
}

src/ReactiveDomain.Persistence/EventStore/EventStoreConnectionWrapper.cs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void Connect()
4141
EsConnection.ConnectAsync().Wait();
4242
}
4343
catch (AggregateException aggregate) {
44-
if (aggregate.InnerException is ES.Exceptions.CannotEstablishConnectionException) {
44+
if (aggregate.InnerException is CannotEstablishConnectionException) {
4545
throw new CannotEstablishConnectionException(aggregate.InnerException.Message, aggregate.InnerException);
4646
}
4747
throw;
@@ -64,10 +64,10 @@ public WriteResult AppendToStream(
6464
{
6565
if (events.Length < WriteBatchSize)
6666
{
67-
return EsConnection.AppendToStreamAsync(stream, (int)expectedVersion, events.ToESEventData(), credentials.ToESCredentials()).Result.ToWriteResult();
67+
return EsConnection.AppendToStreamAsync(stream, expectedVersion, events.ToESEventData(), credentials.ToESCredentials()).Result.ToWriteResult();
6868
}
6969

70-
var transaction = EsConnection.StartTransactionAsync(stream, (int)expectedVersion).Result;
70+
var transaction = EsConnection.StartTransactionAsync(stream, expectedVersion).Result;
7171
var position = 0;
7272
while (position < events.Length)
7373
{
@@ -95,8 +95,7 @@ public StreamEventsSlice ReadStreamForward(
9595
long count,
9696
UserCredentials credentials = null)
9797
{
98-
//todo: why does this need an int with v 4.0 of eventstore?
99-
var slice = EsConnection.ReadStreamEventsForwardAsync(stream, (int)start, (int)count, true, credentials.ToESCredentials()).Result;
98+
var slice = EsConnection.ReadStreamEventsForwardAsync(stream, start, (int)count, true, credentials.ToESCredentials()).Result;
10099
switch (slice.Status)
101100
{
102101
case ES.SliceReadStatus.Success:
@@ -116,8 +115,7 @@ public StreamEventsSlice ReadStreamBackward(
116115
long count,
117116
UserCredentials credentials = null)
118117
{
119-
//todo: why does this need an int with v 4.0 of eventstore?
120-
var slice = EsConnection.ReadStreamEventsBackwardAsync(stream, (int)start, (int)count, true, credentials.ToESCredentials()).Result;
118+
var slice = EsConnection.ReadStreamEventsBackwardAsync(stream, start, (int)count, true, credentials.ToESCredentials()).Result;
121119
switch (slice.Status)
122120
{
123121
case ES.SliceReadStatus.Success:
@@ -227,8 +225,12 @@ public IDisposable SubscribeToAllFrom(
227225
}
228226

229227

230-
public void DeleteStream(string stream, int expectedVersion, UserCredentials credentials = null)
228+
public void DeleteStream(string stream, long expectedVersion, UserCredentials credentials = null)
231229
=> EsConnection.DeleteStreamAsync(stream, expectedVersion, credentials.ToESCredentials()).Wait();
230+
231+
public void HardDeleteStream(string stream, long expectedVersion, UserCredentials credentials = null)
232+
=> EsConnection.DeleteStreamAsync(stream, expectedVersion, true, credentials.ToESCredentials()).Wait();
233+
232234
public void Dispose()
233235
{
234236
if (!_disposed)

src/ReactiveDomain.Persistence/IStreamStoreConnection.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ IDisposable SubscribeToStreamFrom(
133133
/// <param name="eventAppeared">A Task invoked and awaited when a new event is received over the subscription.</param>
134134
/// <param name="subscriptionDropped">An action invoked if the subscription is dropped.</param>
135135
/// <param name="userCredentials">User credentials to use for the operation.</param>
136+
/// <param name="resolveLinkTos">If true, resolve link events.</param>
136137
/// <returns>An IDisposable that can be used to close the subscription.</returns>
137138
IDisposable SubscribeToAll(
138139
Action<RecordedEvent> eventAppeared,
@@ -149,6 +150,8 @@ IDisposable SubscribeToAllFrom(
149150
UserCredentials userCredentials = null,
150151
bool resolveLinkTos = true);
151152

152-
void DeleteStream(string stream, int expectedVersion, UserCredentials credentials = null);
153+
void DeleteStream(string stream, long expectedVersion, UserCredentials credentials = null);
154+
155+
void HardDeleteStream(string stream, long expectedVersion, UserCredentials credentials = null);
153156
}
154157
}

0 commit comments

Comments
 (0)