Skip to content

Commit 4bb66b2

Browse files
authored
ReadModelBase can use a StreamReader for performance (#113)
ReadModelBase can use a StreamReader for performance - Keeps existing ctor of ReadModelBase but tags it for future removal. - New ctor takes an IConfiguredConnection instead of a Func<IListener>. After playing back historical events with a StreamReader, ReadModelBase.Start() switches to a StreamListener. - Fixes #112 in StreamReader. - When starting a StreamListener from a checkpoint, sets the StreamPosition to the checkpoint before reading any events so that immediate queries will return the checkpoint as the current stream position. - Cleans up some XML comments
1 parent b7b8469 commit 4bb66b2

File tree

6 files changed

+379
-46
lines changed

6 files changed

+379
-46
lines changed
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
using System;
2+
using ReactiveDomain.Messaging;
3+
using ReactiveDomain.Messaging.Bus;
4+
using ReactiveDomain.Testing;
5+
using Xunit;
6+
7+
namespace ReactiveDomain.Foundation.Tests
8+
{
9+
// ReSharper disable once InconsistentNaming
10+
public class when_using_read_model_base_with_reader :
11+
ReadModelBase,
12+
IHandle<when_using_read_model_base_with_reader.ReadModelTestEvent>,
13+
IClassFixture<StreamStoreConnectionFixture>
14+
{
15+
private static IStreamStoreConnection _conn;
16+
private static readonly IEventSerializer Serializer =
17+
new JsonMessageSerializer();
18+
private static readonly IStreamNameBuilder Namer =
19+
new PrefixedCamelCaseStreamNameBuilder(nameof(when_using_read_model_base));
20+
21+
private readonly string _stream1;
22+
private readonly string _stream2;
23+
24+
25+
public when_using_read_model_base_with_reader(StreamStoreConnectionFixture fixture)
26+
: base(nameof(when_using_read_model_base),
27+
new ConfiguredConnection(fixture.Connection, Namer, Serializer))
28+
{
29+
_conn = fixture.Connection;
30+
_conn.Connect();
31+
32+
// ReSharper disable once RedundantTypeArgumentsOfMethod
33+
EventStream.Subscribe<ReadModelTestEvent>(this);
34+
35+
_stream1 = Namer.GenerateForAggregate(typeof(TestAggregate), Guid.NewGuid());
36+
_stream2 = Namer.GenerateForAggregate(typeof(TestAggregate), Guid.NewGuid());
37+
38+
AppendEvents(10, _conn, _stream1, 2);
39+
AppendEvents(10, _conn, _stream2, 3);
40+
_conn.TryConfirmStream(_stream1, 10);
41+
_conn.TryConfirmStream(_stream2, 10);
42+
_conn.TryConfirmStream(Namer.GenerateForCategory(typeof(TestAggregate)), 20);
43+
}
44+
45+
private void AppendEvents(
46+
int numEventsToBeSent,
47+
IStreamStoreConnection conn,
48+
string streamName,
49+
int value)
50+
{
51+
for (int evtNumber = 0; evtNumber < numEventsToBeSent; evtNumber++)
52+
{
53+
var evt = new ReadModelTestEvent(evtNumber, value);
54+
conn.AppendToStream(streamName, ExpectedVersion.Any, null, Serializer.Serialize(evt));
55+
}
56+
}
57+
[Fact]
58+
public void can_start_streams_by_aggregate()
59+
{
60+
var aggId = Guid.NewGuid();
61+
var s1 = Namer.GenerateForAggregate(typeof(TestAggregate), aggId);
62+
AppendEvents(1, _conn, s1, 7);
63+
Start<TestAggregate>(aggId);
64+
AssertEx.IsOrBecomesTrue(() => Count == 1, 1000, msg: $"Expected 1 got {Count}");
65+
AssertEx.IsOrBecomesTrue(() => Sum == 7);
66+
}
67+
[Fact]
68+
public void can_start_streams_by_aggregate_category()
69+
{
70+
71+
var s1 = Namer.GenerateForAggregate(typeof(ReadModelTestCategoryAggregate), Guid.NewGuid());
72+
AppendEvents(1, _conn, s1, 7);
73+
var s2 = Namer.GenerateForAggregate(typeof(ReadModelTestCategoryAggregate), Guid.NewGuid());
74+
AppendEvents(1, _conn, s2, 5);
75+
Start<ReadModelTestCategoryAggregate>(null, true);
76+
77+
AssertEx.IsOrBecomesTrue(() => Count == 2, 1000, msg: $"Expected 2 got {Count}");
78+
AssertEx.IsOrBecomesTrue(() => Sum == 12);
79+
}
80+
[Fact]
81+
public void can_read_one_stream()
82+
{
83+
Start(_stream1);
84+
AssertEx.IsOrBecomesTrue(() => Count == 10, 1000, msg: $"Expected 10 got {Count}");
85+
AssertEx.IsOrBecomesTrue(() => Sum == 20);
86+
//confirm checkpoints
87+
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
88+
Assert.Equal(9, GetCheckpoint()[0].Item2);
89+
}
90+
[Fact]
91+
public void can_read_two_streams()
92+
{
93+
Start(_stream1);
94+
Start(_stream2);
95+
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
96+
AssertEx.IsOrBecomesTrue(() => Sum == 50);
97+
//confirm checkpoints
98+
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
99+
Assert.Equal(9, GetCheckpoint()[0].Item2);
100+
Assert.Equal(_stream2, GetCheckpoint()[1].Item1);
101+
Assert.Equal(9, GetCheckpoint()[1].Item2);
102+
}
103+
[Fact]
104+
public void can_wait_for_one_stream_to_go_live()
105+
{
106+
Start(_stream1, null, true);
107+
AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}");
108+
AssertEx.IsOrBecomesTrue(() => Sum == 20, 100);
109+
}
110+
[Fact]
111+
public void can_wait_for_two_streams_to_go_live()
112+
{
113+
Start(_stream1, null, true);
114+
AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}");
115+
AssertEx.IsOrBecomesTrue(() => Sum == 20, 100);
116+
117+
Start(_stream2, null, true);
118+
AssertEx.IsOrBecomesTrue(() => Count == 20, 10, msg: $"Expected 20 got {Count}");
119+
AssertEx.IsOrBecomesTrue(() => Sum == 50, 100);
120+
}
121+
[Fact]
122+
public void can_listen_to_one_stream()
123+
{
124+
Start(_stream1);
125+
AssertEx.IsOrBecomesTrue(() => Count == 10, 1000, msg: $"Expected 10 got {Count}");
126+
AssertEx.IsOrBecomesTrue(() => Sum == 20);
127+
//add more messages
128+
AppendEvents(10, _conn, _stream1, 5);
129+
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
130+
AssertEx.IsOrBecomesTrue(() => Sum == 70);
131+
//confirm checkpoints
132+
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
133+
Assert.Equal(19, GetCheckpoint()[0].Item2); Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
134+
Assert.Equal(19, GetCheckpoint()[0].Item2);
135+
}
136+
[Fact]
137+
public void can_listen_to_two_streams()
138+
{
139+
Start(_stream1);
140+
Start(_stream2);
141+
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
142+
AssertEx.IsOrBecomesTrue(() => Sum == 50);
143+
//add more messages
144+
AppendEvents(10, _conn, _stream1, 5);
145+
AppendEvents(10, _conn, _stream2, 7);
146+
AssertEx.IsOrBecomesTrue(() => Count == 40, 1000, msg: $"Expected 20 got {Count}");
147+
AssertEx.IsOrBecomesTrue(() => Sum == 170);
148+
//confirm checkpoints
149+
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
150+
Assert.Equal(19, GetCheckpoint()[0].Item2);
151+
Assert.Equal(_stream2, GetCheckpoint()[1].Item1);
152+
Assert.Equal(19, GetCheckpoint()[1].Item2);
153+
}
154+
[Fact]
155+
public void can_use_checkpoint_on_one_stream()
156+
{
157+
//restore state
158+
var checkPoint = 8L;//Zero based, ignore the first 9 events
159+
Count = 9;
160+
Sum = 18;
161+
//start at the checkpoint
162+
Start(_stream1, checkPoint);
163+
//add the one recorded event
164+
AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}");
165+
AssertEx.IsOrBecomesTrue(() => Sum == 20);
166+
//add more messages
167+
AppendEvents(10, _conn, _stream1, 5);
168+
AssertEx.IsOrBecomesTrue(() => Count == 20, 100, msg: $"Expected 20 got {Count}");
169+
AssertEx.IsOrBecomesTrue(() => Sum == 70);
170+
//confirm checkpoints
171+
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
172+
Assert.Equal(19, GetCheckpoint()[0].Item2);
173+
}
174+
[Fact]
175+
public void can_use_checkpoint_on_two_streams()
176+
{
177+
//restore state
178+
var checkPoint1 = 8L;//Zero based, ignore the first 9 events
179+
var checkPoint2 = 5L;//Zero based, ignore the first 6 events
180+
Count = (9) + (6);
181+
Sum = (9 * 2) + (6 * 3);
182+
Start(_stream1, checkPoint1);
183+
Start(_stream2, checkPoint2);
184+
//add the recorded events 2 on stream 1 & 5 on stream 2
185+
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
186+
AssertEx.IsOrBecomesTrue(() => Sum == 50, msg: $"Expected 50 got {Sum}");
187+
//add more messages
188+
AppendEvents(10, _conn, _stream1, 5);
189+
AppendEvents(10, _conn, _stream2, 7);
190+
AssertEx.IsOrBecomesTrue(() => Count == 40, 1000, msg: $"Expected 20 got {Count}");
191+
AssertEx.IsOrBecomesTrue(() => Sum == 170);
192+
//confirm checkpoints
193+
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
194+
Assert.Equal(19, GetCheckpoint()[0].Item2);
195+
Assert.Equal(_stream2, GetCheckpoint()[1].Item1);
196+
Assert.Equal(19, GetCheckpoint()[1].Item2);
197+
}
198+
[Fact]
199+
public void can_listen_to_the_same_stream_twice()
200+
{
201+
Assert.Equal(0, Count);
202+
//weird but true
203+
//n.b. Don't do this on purpose
204+
Start(_stream1);
205+
Start(_stream1);
206+
//double events
207+
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
208+
AssertEx.IsOrBecomesTrue(() => Sum == 40);
209+
//even more doubled events
210+
AppendEvents(10, _conn, _stream1, 5);
211+
AssertEx.IsOrBecomesTrue(() => Count == 40, 2000, msg: $"Expected 40 got {Count}");
212+
AssertEx.IsOrBecomesTrue(() => Sum == 140);
213+
}
214+
215+
public long Sum { get; private set; }
216+
public long Count { get; private set; }
217+
void IHandle<ReadModelTestEvent>.Handle(ReadModelTestEvent @event)
218+
{
219+
Sum += @event.Value;
220+
Count++;
221+
}
222+
public class ReadModelTestEvent : Event
223+
{
224+
public readonly int Number;
225+
public readonly int Value;
226+
227+
public ReadModelTestEvent(
228+
int number,
229+
int value)
230+
{
231+
Number = number;
232+
Value = value;
233+
}
234+
}
235+
public class ReadModelTestCategoryAggregate : EventDrivenStateMachine { }
236+
}
237+
}

src/ReactiveDomain.Foundation/IListener.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,31 @@ public interface IListener : IDisposable
99
ISubscriber EventStream { get; }
1010
long Position { get; }
1111
string StreamName { get; }
12+
1213
/// <summary>
1314
/// Starts listening on a named stream
1415
/// </summary>
1516
/// <param name="stream">the exact stream name</param>
1617
/// <param name="checkpoint">start point to listen from</param>
1718
/// <param name="blockUntilLive">wait for the is live event from the catchup subscription before returning</param>
18-
/// <param name="millisecondsTimeout">Timeout to wait before aborting Load defaults to 1000ms</param>
19-
void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken));
19+
/// <param name="cancelWaitToken">Cancellation token to cancel waiting if blockUntilLive is true</param>
20+
void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default);
2021
/// <summary>
2122
/// Starts listening on an aggregate root stream
2223
/// </summary>
2324
/// <typeparam name="TAggregate">The type of aggregate</typeparam>
2425
/// <param name="id">the aggregate id</param>
2526
/// <param name="checkpoint">start point to listen from</param>
2627
/// <param name="blockUntilLive">wait for the is live event from the catchup subscription before returning</param>
27-
/// <param name="millisecondsTimeout">Timeout to wait before aborting Load defaults to 1000ms</param>
28-
void Start<TAggregate>(Guid id, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource;
28+
/// <param name="cancelWaitToken">Cancellation token to cancel waiting if blockUntilLive is true</param>
29+
void Start<TAggregate>(Guid id, long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource;
2930
/// <summary>
3031
/// Starts listening on a Aggregate Category Stream
3132
/// </summary>
3233
/// <typeparam name="TAggregate">The type of aggregate</typeparam>
3334
/// <param name="checkpoint">start point to listen from</param>
3435
/// <param name="blockUntilLive">wait for the is live event from the catchup subscription before returning</param>
35-
/// <param name="millisecondsTimeout">Timeout to wait before aborting Load defaults to 1000ms</param>
36-
void Start<TAggregate>(long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default(CancellationToken)) where TAggregate : class, IEventSource;
36+
/// <param name="cancelWaitToken">Cancellation token to cancel waiting if blockUntilLive is true</param>
37+
void Start<TAggregate>(long? checkpoint = null, bool blockUntilLive = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource;
3738
}
3839
}

0 commit comments

Comments
 (0)