Skip to content

Commit 0bf8022

Browse files
authored
Fixes threading issues with ReadModelBase (#140)
- Adds a `ReaderLock` property to `ReadModelBase` that can be used when reading the state of the read model. - Adds a `Version` property to `ReadModelBase` that increments with the total number of messages passed to the read model. This can be used in tests to ensure the read model state. - Fixes an issue in `ReadModelBase` that could cause events to be read twice if `Start` is called with a checkpoint that is already at the end of the stream.
1 parent 4c0042b commit 0bf8022

File tree

10 files changed

+51
-54
lines changed

10 files changed

+51
-54
lines changed

src/ReactiveDomain.Foundation.Tests/when_using_read_model_base.cs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,6 @@ public class when_using_read_model_base :
1111
IHandle<when_using_read_model_base.ReadModelTestEvent>,
1212
IClassFixture<StreamStoreConnectionFixture> {
1313

14-
private static IListener GetListener() {
15-
return new QueuedStreamListener(
16-
nameof(when_using_read_model_base),
17-
_conn,
18-
Namer,
19-
Serializer);
20-
}
21-
2214
private static IStreamStoreConnection _conn;
2315
private static readonly IEventSerializer Serializer =
2416
new JsonMessageSerializer();
@@ -30,8 +22,7 @@ private static IListener GetListener() {
3022

3123

3224
public when_using_read_model_base(StreamStoreConnectionFixture fixture)
33-
: base(nameof(when_using_read_model_base), GetListener) {
34-
//_conn = new MockStreamStoreConnection("mockStore");
25+
: base(nameof(when_using_read_model_base), new ConfiguredConnection(fixture.Connection, Namer, Serializer)) {
3526
_conn = fixture.Connection;
3627
_conn.Connect();
3728

src/ReactiveDomain.Foundation.Tests/when_using_snapshot_read_model.cs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,8 @@
99
namespace ReactiveDomain.Foundation.Tests {
1010
// ReSharper disable once InconsistentNaming
1111
public class when_using_snapshot_read_model : IClassFixture<StreamStoreConnectionFixture> {
12-
private IListener GetListener() {
13-
return new QueuedStreamListener(
14-
nameof(when_using_read_model_base),
15-
_conn,
16-
_namer,
17-
_serializer);
18-
}
1912

13+
private readonly IConfiguredConnection _configuredConnection;
2014
private readonly IStreamStoreConnection _conn;
2115
private readonly IEventSerializer _serializer =
2216
new JsonMessageSerializer();
@@ -39,6 +33,7 @@ private void AppendEvents(
3933
public when_using_snapshot_read_model(StreamStoreConnectionFixture fixture) {
4034
_conn = fixture.Connection;
4135
_conn.Connect();
36+
_configuredConnection = new ConfiguredConnection(_conn, _namer, _serializer);
4237

4338
_aggId = Guid.NewGuid();
4439
_stream = _namer.GenerateForAggregate(typeof(SnapReadModelTestAggregate), _aggId);
@@ -48,7 +43,7 @@ public when_using_snapshot_read_model(StreamStoreConnectionFixture fixture) {
4843
}
4944
[Fact]
5045
public void can_get_snapshot_from_read_model() {
51-
var rm = new TestSnapShotReadModel(_aggId, GetListener, null);
46+
var rm = new TestSnapShotReadModel(_aggId, _configuredConnection, null);
5247
AssertEx.IsOrBecomesTrue(() => rm.Count == 10);
5348
var snapshot = rm.GetState();
5449

@@ -68,7 +63,7 @@ public void can_apply_snapshot_to_read_model() {
6863
new List<Tuple<string, long>> { new Tuple<string, long>(_stream, 9) },
6964
new TestSnapShotReadModel.MyState { Count = 10, Sum = 20 });
7065

71-
var rm = new TestSnapShotReadModel(_aggId, GetListener, snapshot);
66+
var rm = new TestSnapShotReadModel(_aggId, _configuredConnection, snapshot);
7267
AssertEx.IsOrBecomesTrue(() => rm.Count == 10);
7368
AssertEx.IsOrBecomesTrue(() => rm.Sum == 20);
7469
AppendEvents(1, _conn, _stream, 5);
@@ -78,15 +73,15 @@ public void can_apply_snapshot_to_read_model() {
7873
[Fact]
7974
[SuppressMessage("ReSharper", "AccessToDisposedClosure")]
8075
public void can_snapshot_and_recover_read_model() {
81-
var rm = new TestSnapShotReadModel(_aggId, GetListener, null);
76+
var rm = new TestSnapShotReadModel(_aggId, _configuredConnection, null);
8277
AssertEx.IsOrBecomesTrue(() => rm.Count == 10);
8378
AssertEx.IsOrBecomesTrue(() => rm.Sum == 20);
8479
AppendEvents(1, _conn, _stream, 5);
8580
AssertEx.IsOrBecomesTrue(() => rm.Count == 11, 1000);
8681
AssertEx.IsOrBecomesTrue(() => rm.Sum == 25);
8782
var snap = rm.GetState();
8883
rm.Dispose();
89-
var rm2 = new TestSnapShotReadModel(_aggId, GetListener, snap);
84+
var rm2 = new TestSnapShotReadModel(_aggId, _configuredConnection, snap);
9085
AssertEx.IsOrBecomesTrue(() => rm2.Count == 11, 1000);
9186
AssertEx.IsOrBecomesTrue(() => rm2.Sum == 25);
9287
AppendEvents(1, _conn, _stream, 5);
@@ -95,15 +90,15 @@ public void can_snapshot_and_recover_read_model() {
9590
}
9691
[Fact]
9792
public void can_only_restore_once() {
98-
var rm = new TestSnapShotReadModel(_aggId, GetListener, null);
93+
var rm = new TestSnapShotReadModel(_aggId, _configuredConnection, null);
9994
// ReSharper disable once AccessToDisposedClosure
10095
AssertEx.IsOrBecomesTrue(() => rm.Count == 10);
10196
var state = rm.GetState();
10297
rm.Dispose();
10398
//create while forcing double restore
10499
Assert.Throws<InvalidOperationException>(() => new TestSnapShotReadModel(
105-
_aggId,
106-
GetListener,
100+
_aggId,
101+
_configuredConnection,
107102
state,
108103
forceDoubleRestoreError:true));
109104
}
@@ -114,7 +109,7 @@ public void can_restore_state_without_checkpoints() {
114109
null, //no streams provided
115110
new TestSnapShotReadModel.MyState { Count = 10, Sum = 20 });
116111

117-
var rm = new TestSnapShotReadModel(_aggId, GetListener, snapshot);
112+
var rm = new TestSnapShotReadModel(_aggId, _configuredConnection, snapshot);
118113
AssertEx.IsOrBecomesTrue(() => rm.Count == 10);
119114
AssertEx.IsOrBecomesTrue(() => rm.Sum == 20);
120115

@@ -135,7 +130,7 @@ public void can_restore_state_without_checkpoints() {
135130
//can still get complete snapshot
136131
var snap2 = rm.GetState();
137132
//works as expected
138-
var rm2 = new TestSnapShotReadModel(_aggId, GetListener, snap2);
133+
var rm2 = new TestSnapShotReadModel(_aggId, _configuredConnection, snap2);
139134
AssertEx.IsOrBecomesTrue(() => rm2.Count == 12, 1000);
140135
AssertEx.IsOrBecomesTrue(() => rm2.Sum == 30);
141136
AppendEvents(1, _conn, _stream, 5);
@@ -148,10 +143,10 @@ public sealed class TestSnapShotReadModel :
148143
IHandle<SnapReadModelTestEvent> {
149144
public TestSnapShotReadModel(
150145
Guid aggId,
151-
Func<IListener> getListener,
146+
IConfiguredConnection configuredConnection,
152147
ReadModelState snapshot,
153148
bool forceDoubleRestoreError = false) :
154-
base(nameof(TestSnapShotReadModel), getListener) {
149+
base(nameof(TestSnapShotReadModel), configuredConnection) {
155150
// ReSharper disable once RedundantTypeArgumentsOfMethod
156151
EventStream.Subscribe<SnapReadModelTestEvent>(this);
157152

src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,38 +24,50 @@ public abstract class ReadModelBase :
2424
public bool Idle => _queue.Idle;
2525

2626
/// <summary>
27-
/// Creates a read model using the provided Function to get a listener.
28-
/// This is deprecated and will be removed in a future version of ReactiveDomain.
27+
/// ReaderLock locks the event handler and can be used when reading the model
28+
/// to ensure model state is unchanged during read.
29+
/// The lock should *not* be used in Handle methods as they are inside the lock already by default.
2930
/// </summary>
30-
/// <param name="name">The name of the read model. Also used as the name of the listener.</param>
31-
/// <param name="getListener">A Func to get a new <see cref="IListener"/>.</param>
32-
protected ReadModelBase(string name, Func<IListener> getListener)
33-
{
34-
Ensure.NotNull(getListener, nameof(getListener));
35-
_getListener = getListener;
36-
_listeners = new List<IListener>();
37-
_bus = new InMemoryBus($"{nameof(ReadModelBase)}:{name} bus", false);
38-
_queue = new QueuedHandler(_bus, $"{nameof(ReadModelBase)}:{name} queue");
39-
_queue.Start();
40-
}
31+
protected readonly object ReaderLock = new object();
32+
33+
/// <summary>
34+
/// The version is equal to the number of messages passed to the read model.
35+
/// The version is incremented after all handlers have been processed.
36+
/// The number of handlers (including none) will not impact the version.
37+
/// This can be used to ensure read model state for tests. This is *not*
38+
/// the same as the version of any particular stream being read.
39+
/// </summary>
40+
public int Version { get; private set; }
4141

4242
/// <summary>
4343
/// Creates a read model using the provided stream store connection. Reads existing events using a
4444
/// reader, then transitions to a listener for live events.
4545
/// </summary>
46-
/// <param name="name">The name of the read model. Also used as the name of the listener and reader.</param>
46+
/// <param name="name">The name of the read model. Also used as the names of the listener and reader.</param>
4747
/// <param name="connection">A connection to a stream store.</param>
4848
protected ReadModelBase(string name, IConfiguredConnection connection)
4949
{
5050
Ensure.NotNull(connection, nameof(connection));
51-
_getListener = () => connection.GetListener(name);
5251
_getReader = () => connection.GetReader(name, Handle);
52+
_getListener = () => connection.GetListener(name);
5353
_listeners = new List<IListener>();
5454
_bus = new InMemoryBus($"{nameof(ReadModelBase)}:{name} bus", false);
55-
_queue = new QueuedHandler(_bus, $"{nameof(ReadModelBase)}:{name} queue");
55+
_queue = new QueuedHandler(new AdHocHandler<IMessage>(DequeueMessage), $"{nameof(ReadModelBase)}:{name} queue");
5656
_queue.Start();
5757
}
5858

59+
/// <summary>
60+
/// Every message handled by the read model will pass through here.
61+
/// </summary>
62+
private void DequeueMessage(IMessage message)
63+
{
64+
lock (ReaderLock)
65+
{
66+
_bus.Handle(message);
67+
Version++;
68+
}
69+
}
70+
5971
private IListener AddNewListener()
6072
{
6173
var l = _getListener();
@@ -98,7 +110,7 @@ public void Start(string stream, long? checkpoint = null, bool blockUntilLive =
98110
using (var reader = _getReader())
99111
{
100112
reader.Read(stream, () => Idle, checkpoint);
101-
checkpoint = reader.Position;
113+
checkpoint = reader.Position ?? checkpoint;
102114
}
103115
}
104116
AddNewListener().Start(stream, checkpoint, blockUntilLive, cancelWaitToken);

src/ReactiveDomain.Foundation/StreamStore/SnapshotReadModel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ public abstract class SnapshotReadModel : ReadModelBase {
99

1010
protected SnapshotReadModel(
1111
string name,
12-
Func<IListener> getListener)
13-
: base(name, getListener) {
12+
IConfiguredConnection connection)
13+
: base(name, connection) {
1414
}
1515

1616
protected virtual void Restore(

src/ReactiveDomain.IdentityStorage/ReadModels/SubjectsRm.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class SubjectsRm :
1212
IHandle<SubjectMsgs.SubjectCreated>
1313
{
1414
public SubjectsRm(IConfiguredConnection conn)
15-
: base(nameof(SubjectsRm), () => conn.GetListener(nameof(SubjectsRm)))
15+
: base(nameof(SubjectsRm), conn)
1616
{
1717
//set handlers
1818
EventStream.Subscribe<SubjectMsgs.SubjectCreated>(this);

src/ReactiveDomain.IdentityStorage/ReadModels/UsersRm.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class UsersRm :
1919
private readonly SourceCache<UserDTO, Guid> _allUsers = new SourceCache<UserDTO, Guid>(x => x.UserId);
2020

2121
private readonly List<Guid> _userIds = new List<Guid>();
22-
public UsersRm(IConfiguredConnection conn) : base(nameof(UsersRm), () => conn.GetListener(nameof(UsersRm)))
22+
public UsersRm(IConfiguredConnection conn) : base(nameof(UsersRm), conn)
2323
{
2424
long? position;
2525
EventStream.Subscribe<UserMsgs.UserEvent>(this);

src/ReactiveDomain.IdentityStorage/Services/ClientStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class ClientStore :
2020
IHandle<ClientMsgs.ClientSecretRemoved>
2121
{
2222

23-
public ClientStore(IConfiguredConnection conn) : base(nameof(ClientStore), () => conn.GetListener(nameof(ClientStore)))
23+
public ClientStore(IConfiguredConnection conn) : base(nameof(ClientStore), conn)
2424
{
2525
long checkpoint;
2626

src/ReactiveDomain.PolicyStorage/ReadModels/ApplicationRm.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class ApplicationRm :
2929

3030

3131
public ApplicationRm(IConfiguredConnection conn)
32-
: base(nameof(ApplicationRm), () => conn.GetListener(nameof(ApplicationRm)))
32+
: base(nameof(ApplicationRm), conn)
3333
{
3434
//set handlers
3535
EventStream.Subscribe<ApplicationMsgs.ApplicationCreated>(this);

src/ReactiveDomain.PolicyStorage/ReadModels/FilteredPoliciesRM.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using DynamicData;
22
using ReactiveDomain.Foundation;
3-
using ReactiveDomain.Messaging;
43
using ReactiveDomain.Messaging.Bus;
54
using ReactiveDomain.Policy.Messages;
65
using System;
@@ -28,7 +27,7 @@ public class FilteredPoliciesRM :
2827
private readonly Dictionary<Guid, RoleDTO> _roles = new Dictionary<Guid, RoleDTO>();
2928

3029
public FilteredPoliciesRM(IConfiguredConnection conn, List<string> policyFilter = null)
31-
: base(nameof(FilteredPoliciesRM), () => conn.GetListener(nameof(FilteredPoliciesRM)))
30+
: base(nameof(FilteredPoliciesRM), conn)
3231
{
3332
if (policyFilter != null)
3433
{

src/ReactiveDomain.PolicyStorage/ReadModels/PolicyUserRm.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class PolicyUserRm :
2222
/// Create a read model for getting information about Policy Users.
2323
/// </summary>
2424
public PolicyUserRm(IConfiguredConnection conn)
25-
: base(nameof(PolicyUserRm), () => conn.GetListener(nameof(PolicyUserRm)))
25+
: base(nameof(PolicyUserRm), conn)
2626
{
2727
//set handlers
2828
EventStream.Subscribe<PolicyUserMsgs.PolicyUserAdded>(this);

0 commit comments

Comments
 (0)