Skip to content

Commit 521018e

Browse files
condronjoshkempner
authored andcommitted
Add additional test helpers and specifications
1 parent f48cfcc commit 521018e

File tree

14 files changed

+911
-62
lines changed

14 files changed

+911
-62
lines changed

src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,20 @@ protected virtual void Dispose(bool disposing)
182182
}
183183
_disposed = true;
184184
}
185-
185+
/// <summary>
186+
/// Applies a message synchronously to the read model while ensuring that the <see cref="ReaderLock"/>
187+
/// is respected and bypasses both the queue and listeners. This is primarily useful in tests.
188+
/// </summary>
189+
/// <param name="message">The message to apply.</param>
190+
public void DirectApply(IMessage message) { DequeueMessage(message); }
186191
public void Handle(Message message) { ((IHandle<IMessage>)_queue).Handle(message); }
187192
public void Handle(IMessage message) { ((IHandle<IMessage>)_queue).Handle(message); }
193+
/// <summary>
194+
/// Publishes a message onto the read model's internal queue.
195+
/// This bypasses the Listeners while ensuring that the <see cref="ReaderLock"/>
196+
/// is respected. All messages will be processed in order from the queue thread.
197+
/// </summary>
198+
/// <param name="message">The message to publish.</param>
188199
public void Publish(IMessage message) { ((IPublisher)_queue).Publish(message); }
189200
}
190201
}

src/ReactiveDomain.Testing/AssertEx.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public static void IsOrBecomesTrue(Func<bool> func, int? timeout = null, string
116116
var delay = 1;
117117
while (true)
118118
{
119-
if (EvaluateAfterDelay(func, TimeSpan.FromMilliseconds(delay)))
119+
if (func())
120120
{
121121
result = true;
122122
break;
@@ -129,6 +129,7 @@ public static void IsOrBecomesTrue(Func<bool> func, int? timeout = null, string
129129
delay = delay << 1;
130130
}
131131
delay = Math.Min(delay, endTime - now);
132+
Thread.Sleep(delay);
132133
}
133134
Assert.True(result, msg ?? "");
134135
}

src/ReactiveDomain.Testing/EventStore/MockStreamStoreConnection.cs

Lines changed: 119 additions & 51 deletions
Large diffs are not rendered by default.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using ReactiveDomain.Foundation;
2+
using ReactiveDomain.Messaging;
3+
using System;
4+
5+
namespace ReactiveDomain.Testing
6+
{
7+
/// <summary>
8+
/// An empty configured connection that produces null repositories, readers, etc.
9+
/// Implements <see cref="IConfiguredConnection"/>.
10+
/// </summary>
11+
public class NullConfiguredConnection : IConfiguredConnection
12+
{
13+
/// <summary>
14+
/// Gets a <see cref="NullConnection"/>.
15+
/// </summary>
16+
public IStreamStoreConnection Connection => new NullConnection();
17+
18+
/// <summary>
19+
/// Gets a standard stream name builder
20+
/// </summary>
21+
public IStreamNameBuilder StreamNamer => new PrefixedCamelCaseStreamNameBuilder();
22+
23+
/// <summary>
24+
/// Gets a default Json message serializer.
25+
/// </summary>
26+
public IEventSerializer Serializer => new JsonMessageSerializer();
27+
28+
/// <summary>
29+
/// Gets a <see cref="NullRepository"/>.
30+
/// </summary>
31+
/// <param name="baseRepository">This parameter is ignored.</param>
32+
/// <param name="caching">This parameter is ignored.</param>
33+
/// <param name="currentPolicyUserId">This parameter is ignored.</param>
34+
/// <returns>A <see cref="NullRepository"/>.</returns>
35+
public ICorrelatedRepository GetCorrelatedRepository(
36+
IRepository baseRepository = null,
37+
bool caching = false,
38+
Func<Guid> currentPolicyUserId = null)
39+
{
40+
return new NullRepository();
41+
}
42+
43+
/// <summary>
44+
/// Gets a <see cref="NullListener"/>.
45+
/// </summary>
46+
/// <param name="name">The name of the listener.</param>
47+
/// <returns>A <see cref="NullListener"/></returns>
48+
public IListener GetListener(string name)
49+
{
50+
return new NullListener(name);
51+
}
52+
53+
/// <summary>
54+
/// Gets a <see cref="NullListener"/>.
55+
/// </summary>
56+
/// <param name="name">The name of the listener.</param>
57+
/// <returns>A <see cref="NullListener"/></returns>
58+
public IListener GetQueuedListener(string name)
59+
{
60+
return new NullListener(name);
61+
}
62+
63+
/// <summary>
64+
/// Gets a <see cref="NullReader"/>.
65+
/// </summary>
66+
/// <param name="name">The name of the reader.</param>
67+
/// <param name="handle">This parameter is ignored.</param>
68+
/// <returns>A <see cref="NullListener"/></returns>
69+
public IStreamReader GetReader(string name, Action<IMessage> handle)
70+
{
71+
return new NullReader(name);
72+
}
73+
74+
/// <summary>
75+
/// Gets a <see cref="NullRepository"/>.
76+
/// </summary>
77+
/// <param name="caching">This parameter is ignored.</param>
78+
/// <param name="currentPolicyUserId">This parameter is ignored.</param>
79+
/// <returns>A <see cref="NullRepository"/>.</returns>
80+
public IRepository GetRepository(bool caching = false, Func<Guid> currentPolicyUserId = null)
81+
{
82+
return new NullRepository();
83+
}
84+
}
85+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
using ReactiveDomain.Util;
2+
using System;
3+
4+
namespace ReactiveDomain.Testing
5+
{
6+
/// <summary>
7+
/// An empty connection that implements <see cref="IStreamStoreConnection"/>.
8+
/// </summary>
9+
public class NullConnection : IStreamStoreConnection
10+
{
11+
/// <summary>
12+
/// The name of the connection.
13+
/// </summary>
14+
public string ConnectionName => "NullConnection";
15+
16+
/// <summary>
17+
/// Drops the events and returns a write result at the default version.
18+
/// </summary>
19+
/// <param name="stream">This parameter is ignored.</param>
20+
/// <param name="expectedVersion">This parameter is ignored.</param>
21+
/// <param name="credentials">This parameter is ignored.</param>
22+
/// <param name="events">This parameter is ignored.</param>
23+
/// <returns>A <see cref="WriteResult"/> at the default version.</returns>
24+
public WriteResult AppendToStream(string stream, long expectedVersion, UserCredentials credentials = null, params EventData[] events)
25+
{
26+
return new WriteResult(0);
27+
}
28+
29+
/// <summary>
30+
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
31+
/// </summary>
32+
public void Close() { }
33+
34+
/// <summary>
35+
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
36+
/// </summary>
37+
public void Connect() { }
38+
39+
/// <summary>
40+
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
41+
/// </summary>
42+
/// <param name="stream">This parameter is ignored.</param>
43+
/// <param name="expectedVersion">This parameter is ignored.</param>
44+
/// <param name="credentials">This parameter is ignored.</param>
45+
public void DeleteStream(string stream, long expectedVersion, UserCredentials credentials = null)
46+
{
47+
}
48+
49+
/// <summary>
50+
/// Cleans up resources.
51+
/// </summary>
52+
public void Dispose()
53+
{
54+
}
55+
56+
/// <summary>
57+
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
58+
/// </summary>
59+
/// <param name="stream">This parameter is ignored.</param>
60+
/// <param name="expectedVersion">This parameter is ignored.</param>
61+
/// <param name="credentials">This parameter is ignored.</param>
62+
public void HardDeleteStream(string stream, long expectedVersion, UserCredentials credentials = null)
63+
{
64+
}
65+
66+
/// <summary>
67+
/// Gets an empty stream slice.
68+
/// </summary>
69+
/// <param name="stream">This parameter is ignored.</param>
70+
/// <param name="start">This parameter is ignored.</param>
71+
/// <param name="count">This parameter is ignored.</param>
72+
/// <param name="credentials">This parameter is ignored.</param>
73+
/// <returns>An empty <see cref="StreamEventsSlice"/>.</returns>
74+
public StreamEventsSlice ReadStreamBackward(string stream, long start, long count, UserCredentials credentials = null)
75+
{
76+
return new StreamEventsSlice(stream, 0, ReadDirection.Backward, Array.Empty<RecordedEvent>(), 0, 0, true);
77+
}
78+
79+
/// <summary>
80+
/// Gets an empty stream slice.
81+
/// </summary>
82+
/// <param name="stream">This parameter is ignored.</param>
83+
/// <param name="start">This parameter is ignored.</param>
84+
/// <param name="count">This parameter is ignored.</param>
85+
/// <param name="credentials">This parameter is ignored.</param>
86+
/// <returns>An empty <see cref="StreamEventsSlice"/>.</returns>
87+
public StreamEventsSlice ReadStreamForward(string stream, long start, long count, UserCredentials credentials = null)
88+
{
89+
return new StreamEventsSlice(stream, 0, ReadDirection.Forward, Array.Empty<RecordedEvent>(), 0, 0, true);
90+
}
91+
92+
/// <summary>
93+
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
94+
/// </summary>
95+
/// <param name="eventAppeared">This parameter is ignored.</param>
96+
/// <param name="subscriptionDropped">This parameter is ignored.</param>
97+
/// <param name="credentials">This parameter is ignored.</param>
98+
/// <param name="resolveLinkTos">This parameter is ignored.</param>
99+
/// <returns>This connection.</returns>
100+
public IDisposable SubscribeToAll(Action<RecordedEvent> eventAppeared, Action<SubscriptionDropReason, Exception> subscriptionDropped = null, UserCredentials credentials = null, bool resolveLinkTos = true)
101+
{
102+
return this;
103+
}
104+
105+
/// <summary>
106+
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
107+
/// </summary>
108+
/// <param name="from">This parameter is ignored.</param>
109+
/// <param name="eventAppeared">This parameter is ignored.</param>
110+
/// <param name="settings">This parameter is ignored.</param>
111+
/// <param name="liveProcessingStarted">This parameter is ignored.</param>
112+
/// <param name="subscriptionDropped">This parameter is ignored.</param>
113+
/// <param name="credentials">This parameter is ignored.</param>
114+
/// <param name="resolveLinkTos">This parameter is ignored.</param>
115+
/// <returns>This connection.</returns>
116+
public IDisposable SubscribeToAllFrom(Position from, Action<RecordedEvent> eventAppeared, CatchUpSubscriptionSettings settings = null, Action liveProcessingStarted = null, Action<SubscriptionDropReason, Exception> subscriptionDropped = null, UserCredentials credentials = null, bool resolveLinkTos = true)
117+
{
118+
return this;
119+
}
120+
121+
/// <summary>
122+
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
123+
/// </summary>
124+
/// <param name="stream">This parameter is ignored.</param>
125+
/// <param name="eventAppeared">This parameter is ignored.</param>
126+
/// <param name="subscriptionDropped">This parameter is ignored.</param>
127+
/// <param name="credentials">This parameter is ignored.</param>
128+
/// <returns>This connection.</returns>
129+
public IDisposable SubscribeToStream(string stream, Action<RecordedEvent> eventAppeared, Action<SubscriptionDropReason, Exception> subscriptionDropped = null, UserCredentials credentials = null)
130+
{
131+
return this;
132+
}
133+
134+
/// <summary>
135+
/// Does nothing. Required for implementation of <see cref="IStreamStoreConnection"/>.
136+
/// </summary>
137+
/// <param name="stream">This parameter is ignored.</param>
138+
/// <param name="lastCheckpoint">This parameter is ignored.</param>
139+
/// <param name="settings">This parameter is ignored.</param>
140+
/// <param name="eventAppeared">This parameter is ignored.</param>
141+
/// <param name="liveProcessingStarted">This parameter is ignored.</param>
142+
/// <param name="subscriptionDropped">This parameter is ignored.</param>
143+
/// <param name="credentials"></param>
144+
/// <returns>This connection.</returns>
145+
public IDisposable SubscribeToStreamFrom(string stream, long? lastCheckpoint, CatchUpSubscriptionSettings settings, Action<RecordedEvent> eventAppeared, Action<Unit> liveProcessingStarted = null, Action<SubscriptionDropReason, Exception> subscriptionDropped = null, UserCredentials credentials = null)
146+
{
147+
return this;
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)