Skip to content

Commit d1d1fc1

Browse files
committed
AssertEx to verify an RM's Version
1 parent 64163ce commit d1d1fc1

File tree

3 files changed

+101
-49
lines changed

3 files changed

+101
-49
lines changed

src/ReactiveDomain.Foundation.Tests/when_using_read_model_base.cs

Lines changed: 59 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
using ReactiveDomain.Testing;
55
using Xunit;
66

7-
namespace ReactiveDomain.Foundation.Tests {
7+
namespace ReactiveDomain.Foundation.Tests
8+
{
89
// ReSharper disable once InconsistentNaming
910
public class when_using_read_model_base :
1011
ReadModelBase,
1112
IHandle<when_using_read_model_base.ReadModelTestEvent>,
12-
IClassFixture<StreamStoreConnectionFixture> {
13+
IClassFixture<StreamStoreConnectionFixture>
14+
{
1315

1416
private static IStreamStoreConnection _conn;
1517
private static readonly IEventSerializer Serializer =
@@ -22,7 +24,8 @@ public class when_using_read_model_base :
2224

2325

2426
public when_using_read_model_base(StreamStoreConnectionFixture fixture)
25-
: base(nameof(when_using_read_model_base), new ConfiguredConnection(fixture.Connection, Namer, Serializer)) {
27+
: base(nameof(when_using_read_model_base), new ConfiguredConnection(fixture.Connection, Namer, Serializer))
28+
{
2629
_conn = fixture.Connection;
2730
_conn.Connect();
2831

@@ -43,47 +46,53 @@ private void AppendEvents(
4346
int numEventsToBeSent,
4447
IStreamStoreConnection conn,
4548
string streamName,
46-
int value) {
47-
for (int evtNumber = 0; evtNumber < numEventsToBeSent; evtNumber++) {
49+
int value)
50+
{
51+
for (int evtNumber = 0; evtNumber < numEventsToBeSent; evtNumber++)
52+
{
4853
var evt = new ReadModelTestEvent(evtNumber, value);
4954
conn.AppendToStream(streamName, ExpectedVersion.Any, null, Serializer.Serialize(evt));
5055
}
5156
}
5257
[Fact]
53-
public void can_start_streams_by_aggregate() {
58+
public void can_start_streams_by_aggregate()
59+
{
5460
var aggId = Guid.NewGuid();
5561
var s1 = Namer.GenerateForAggregate(typeof(TestAggregate), aggId);
5662
AppendEvents(1, _conn, s1, 7);
5763
Start<TestAggregate>(aggId);
58-
AssertEx.IsOrBecomesTrue(() => Count == 1, 1000, msg: $"Expected 1 got {Count}");
64+
AssertEx.IsModelVersion(this, 2, 1000, msg: $"Expected 2 got {Version}"); // 1 message + CatchupSubscriptionBecameLive
5965
AssertEx.IsOrBecomesTrue(() => Sum == 7);
6066
}
6167
[Fact]
62-
public void can_start_streams_by_aggregate_category() {
63-
68+
public void can_start_streams_by_aggregate_category()
69+
{
70+
6471
var s1 = Namer.GenerateForAggregate(typeof(ReadModelTestCategoryAggregate), Guid.NewGuid());
6572
AppendEvents(1, _conn, s1, 7);
6673
var s2 = Namer.GenerateForAggregate(typeof(ReadModelTestCategoryAggregate), Guid.NewGuid());
6774
AppendEvents(1, _conn, s2, 5);
68-
Start<ReadModelTestCategoryAggregate>(null,true);
75+
Start<ReadModelTestCategoryAggregate>(null, true);
6976

70-
AssertEx.IsOrBecomesTrue(() => Count == 2, 1000, msg: $"Expected 2 got {Count}");
77+
AssertEx.IsModelVersion(this, 3, 1000, msg: $"Expected 3 got {Version}");
7178
AssertEx.IsOrBecomesTrue(() => Sum == 12);
7279
}
7380
[Fact]
74-
public void can_read_one_stream() {
81+
public void can_read_one_stream()
82+
{
7583
Start(_stream1);
76-
AssertEx.IsOrBecomesTrue(() => Count == 10, 1000, msg: $"Expected 10 got {Count}");
84+
AssertEx.IsModelVersion(this, 11, 1000, msg: $"Expected 11 got {Version}");
7785
AssertEx.IsOrBecomesTrue(() => Sum == 20);
7886
//confirm checkpoints
7987
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
8088
Assert.Equal(9, GetCheckpoint()[0].Item2);
8189
}
8290
[Fact]
83-
public void can_read_two_streams() {
91+
public void can_read_two_streams()
92+
{
8493
Start(_stream1);
8594
Start(_stream2);
86-
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
95+
AssertEx.IsModelVersion(this, 22, 1000, msg: $"Expected 22 got {Version}");
8796
AssertEx.IsOrBecomesTrue(() => Sum == 50);
8897
//confirm checkpoints
8998
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
@@ -92,45 +101,49 @@ public void can_read_two_streams() {
92101
Assert.Equal(9, GetCheckpoint()[1].Item2);
93102
}
94103
[Fact]
95-
public void can_wait_for_one_stream_to_go_live() {
104+
public void can_wait_for_one_stream_to_go_live()
105+
{
96106
Start(_stream1, null, true);
97-
AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}");
107+
AssertEx.IsModelVersion(this, 11, 100, msg: $"Expected 11 got {Version}");
98108
AssertEx.IsOrBecomesTrue(() => Sum == 20, 100);
99109
}
100110
[Fact]
101-
public void can_wait_for_two_streams_to_go_live() {
111+
public void can_wait_for_two_streams_to_go_live()
112+
{
102113
Start(_stream1, null, true);
103-
AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}");
114+
AssertEx.IsModelVersion(this, 11, 100, msg: $"Expected 11 got {Version}");
104115
AssertEx.IsOrBecomesTrue(() => Sum == 20, 100);
105116

106117
Start(_stream2, null, true);
107-
AssertEx.IsOrBecomesTrue(() => Count == 20, 100, msg: $"Expected 20 got {Count}");
118+
AssertEx.IsModelVersion(this, 21, 100, msg: $"Expected 21 got {Version}");
108119
AssertEx.IsOrBecomesTrue(() => Sum == 50, 100);
109120
}
110121
[Fact]
111-
public void can_listen_to_one_stream() {
122+
public void can_listen_to_one_stream()
123+
{
112124
Start(_stream1);
113-
AssertEx.IsOrBecomesTrue(() => Count == 10, 1000, msg: $"Expected 10 got {Count}");
125+
AssertEx.IsModelVersion(this, 11, 1000, msg: $"Expected 11 got {Version}");
114126
AssertEx.IsOrBecomesTrue(() => Sum == 20);
115127
//add more messages
116128
AppendEvents(10, _conn, _stream1, 5);
117-
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
129+
AssertEx.IsModelVersion(this, 21, 1000, msg: $"Expected 21 got {Version}");
118130
AssertEx.IsOrBecomesTrue(() => Sum == 70);
119131
//confirm checkpoints
120132
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
121133
Assert.Equal(19, GetCheckpoint()[0].Item2); Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
122134
Assert.Equal(19, GetCheckpoint()[0].Item2);
123135
}
124136
[Fact]
125-
public void can_listen_to_two_streams() {
137+
public void can_listen_to_two_streams()
138+
{
126139
Start(_stream1);
127140
Start(_stream2);
128-
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
141+
AssertEx.IsModelVersion(this, 22, 1000, msg: $"Expected 22 got {Version}");
129142
AssertEx.IsOrBecomesTrue(() => Sum == 50);
130143
//add more messages
131144
AppendEvents(10, _conn, _stream1, 5);
132145
AppendEvents(10, _conn, _stream2, 7);
133-
AssertEx.IsOrBecomesTrue(() => Count == 40, 1000, msg: $"Expected 20 got {Count}");
146+
AssertEx.IsModelVersion(this, 42, 1000, msg: $"Expected 42 got {Version}");
134147
AssertEx.IsOrBecomesTrue(() => Sum == 170);
135148
//confirm checkpoints
136149
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
@@ -139,40 +152,40 @@ public void can_listen_to_two_streams() {
139152
Assert.Equal(19, GetCheckpoint()[1].Item2);
140153
}
141154
[Fact]
142-
public void can_use_checkpoint_on_one_stream() {
155+
public void can_use_checkpoint_on_one_stream()
156+
{
143157
//restore state
144158
var checkPoint = 8L;//Zero based, ignore the first 9 events
145-
Count = 9;
146159
Sum = 18;
147160
//start at the checkpoint
148161
Start(_stream1, checkPoint);
149162
//add the one recorded event
150-
AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}");
163+
AssertEx.IsModelVersion(this, 2, 100, msg: $"Expected 2 got {Version}");
151164
AssertEx.IsOrBecomesTrue(() => Sum == 20);
152165
//add more messages
153166
AppendEvents(10, _conn, _stream1, 5);
154-
AssertEx.IsOrBecomesTrue(() => Count == 20, 100, msg: $"Expected 20 got {Count}");
167+
AssertEx.IsModelVersion(this, 12, 100, msg: $"Expected 12 got {Version}");
155168
AssertEx.IsOrBecomesTrue(() => Sum == 70);
156169
//confirm checkpoints
157170
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
158171
Assert.Equal(19, GetCheckpoint()[0].Item2);
159172
}
160173
[Fact]
161-
public void can_use_checkpoint_on_two_streams() {
174+
public void can_use_checkpoint_on_two_streams()
175+
{
162176
//restore state
163177
var checkPoint1 = 8L;//Zero based, ignore the first 9 events
164178
var checkPoint2 = 5L;//Zero based, ignore the first 6 events
165-
Count = (9) + (6);
166179
Sum = (9 * 2) + (6 * 3);
167180
Start(_stream1, checkPoint1);
168181
Start(_stream2, checkPoint2);
169182
//add the recorded events 2 on stream 1 & 5 on stream 2
170-
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
183+
AssertEx.IsModelVersion(this, 7, 1000, msg: $"Expected 7 got {Version}");
171184
AssertEx.IsOrBecomesTrue(() => Sum == 50, msg: $"Expected 50 got {Sum}");
172185
//add more messages
173186
AppendEvents(10, _conn, _stream1, 5);
174187
AppendEvents(10, _conn, _stream2, 7);
175-
AssertEx.IsOrBecomesTrue(() => Count == 40, 1000, msg: $"Expected 20 got {Count}");
188+
AssertEx.IsModelVersion(this, 27, 1000, msg: $"Expected 27 got {Version}");
176189
AssertEx.IsOrBecomesTrue(() => Sum == 170);
177190
//confirm checkpoints
178191
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
@@ -181,38 +194,40 @@ public void can_use_checkpoint_on_two_streams() {
181194
Assert.Equal(19, GetCheckpoint()[1].Item2);
182195
}
183196
[Fact]
184-
public void can_listen_to_the_same_stream_twice() {
185-
Assert.Equal(0,Count);
197+
public void can_listen_to_the_same_stream_twice()
198+
{
199+
Assert.Equal(0, Version);
186200
//weird but true
187201
//n.b. Don't do this on purpose
188202
Start(_stream1);
189203
Start(_stream1);
190204
//double events
191-
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
205+
AssertEx.IsModelVersion(this, 22, 1000, msg: $"Expected 22 got {Version}");
192206
AssertEx.IsOrBecomesTrue(() => Sum == 40);
193207
//even more doubled events
194208
AppendEvents(10, _conn, _stream1, 5);
195-
AssertEx.IsOrBecomesTrue(() => Count == 40, 2000, msg: $"Expected 40 got {Count}");
209+
AssertEx.IsModelVersion(this, 42, 2000, msg: $"Expected 42 got {Version}");
196210
AssertEx.IsOrBecomesTrue(() => Sum == 140);
197211
}
198212

199213
public long Sum { get; private set; }
200-
public long Count { get; private set; }
201-
void IHandle<ReadModelTestEvent>.Handle(ReadModelTestEvent @event) {
214+
void IHandle<ReadModelTestEvent>.Handle(ReadModelTestEvent @event)
215+
{
202216
Sum += @event.Value;
203-
Count++;
204217
}
205-
public class ReadModelTestEvent : Event {
218+
public class ReadModelTestEvent : Event
219+
{
206220
public readonly int Number;
207221
public readonly int Value;
208222

209223
public ReadModelTestEvent(
210224
int number,
211-
int value){
225+
int value)
226+
{
212227
Number = number;
213228
Value = value;
214229
}
215230
}
216-
public class ReadModelTestCategoryAggregate:EventDrivenStateMachine{}
231+
public class ReadModelTestCategoryAggregate : EventDrivenStateMachine { }
217232
}
218233
}

src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ public abstract class ReadModelBase :
3535
/// The version is incremented after all handlers have been processed.
3636
/// The number of handlers (including none) will not impact the version.
3737
/// This can be used to ensure read model state for tests. This is *not*
38-
/// the same as the version of any particular stream being read.
38+
/// the same as the version of any particular stream being read. This can
39+
/// include <see cref="StreamStoreMsgs.CatchupSubscriptionBecameLive"/>,
40+
/// which may result in the Version being 1 greater than otherwise expected.
3941
/// </summary>
4042
public int Version { get; private set; }
4143

src/ReactiveDomain.Testing/Assert.cs renamed to src/ReactiveDomain.Testing/AssertEx.cs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Threading;
3+
using ReactiveDomain.Foundation;
34
using ReactiveDomain.Messaging.Bus;
45
using Xunit;
56

@@ -41,7 +42,7 @@ public static void ArraySegmentEqual<T>(
4142
/// <summary>
4243
/// Asserts the given function will return false before the timeout expires.
4344
/// Repeatedly evaluates the function until false is returned or the timeout expires.
44-
/// Will return immediatly when the condition is false.
45+
/// Will return immediately when the condition is false.
4546
/// Evaluates the timeout every 10 msec until expired.
4647
/// Will not yield the thread by default, if yielding is required to resolve deadlocks set yieldThread to true.
4748
/// </summary>
@@ -58,7 +59,7 @@ public static void IsOrBecomesFalse(Func<bool> func, int? timeout = null, string
5859
/// <summary>
5960
/// Asserts the given function will return true before the timeout expires.
6061
/// Repeatedly evaluates the function until true is returned or the timeout expires.
61-
/// Will return immediatly when the condition is true.
62+
/// Will return immediately when the condition is true.
6263
/// Evaluates the timeout every 10 msec until expired.
6364
/// Will not yield the thread by default, if yielding is required to resolve deadlocks set yieldThread to true.
6465
/// </summary>
@@ -73,13 +74,47 @@ public static void IsOrBecomesTrue(Func<bool> func, int? timeout = null, string
7374
if (!timeout.HasValue) timeout = 1000;
7475
var waitLoops = timeout / 10;
7576
var result = false;
76-
for (int i = 0; i < waitLoops; i++) {
77-
if (SpinWait.SpinUntil(func, 10)){
77+
for (int i = 0; i < waitLoops; i++)
78+
{
79+
if (SpinWait.SpinUntil(func, 10))
80+
{
7881
result = true;
7982
break;
8083
}
8184
}
8285
Assert.True(result, msg ?? "");
8386
}
87+
88+
/// <summary>
89+
/// Asserts that the given read model will have exactly the expected version before the timeout expires.
90+
/// This can make tests fragile and should generally not be used unless the exact number of messages
91+
/// handled is critical to your tests. In most cases you should use <see cref="AtLeastModelVersion"/>
92+
/// instead.
93+
/// </summary>
94+
/// <param name="readModel">The read model.</param>
95+
/// <param name="expectedVersion">The read model's expected version.</param>
96+
/// <param name="timeout">A timeout in milliseconds. If not specified, defaults to 1000.</param>
97+
/// <param name="msg">A message to display if the condition is not satisfied.</param>
98+
/// <param name="yieldThread">If true, the thread relinquishes the remainder of its time
99+
/// slice to any thread of equal priority that is ready to run.</param>
100+
public static void IsModelVersion(ReadModelBase readModel, int expectedVersion, int? timeout = null, string msg = null, bool yieldThread = false)
101+
{
102+
IsOrBecomesTrue(() => readModel.Version == expectedVersion, timeout, msg, yieldThread);
103+
}
104+
105+
/// <summary>
106+
/// Asserts that the given read model will have at least the expected version before the
107+
/// timeout expires. This is generally preferred to <see cref="IsModelVersion"/>.
108+
/// </summary>
109+
/// <param name="readModel">The read model.</param>
110+
/// <param name="expectedVersion">The read model's expected minimum version.</param>
111+
/// <param name="timeout">A timeout in milliseconds. If not specified, defaults to 1000.</param>
112+
/// <param name="msg">A message to display if the condition is not satisfied.</param>
113+
/// <param name="yieldThread">If true, the thread relinquishes the remainder of its time
114+
/// slice to any thread of equal priority that is ready to run.</param>
115+
public static void AtLeastModelVersion(ReadModelBase readModel, int expectedVersion, int? timeout = null, string msg = null, bool yieldThread = false)
116+
{
117+
IsOrBecomesTrue(() => readModel.Version >= expectedVersion, timeout, msg, yieldThread);
118+
}
84119
}
85120
}

0 commit comments

Comments
 (0)