Skip to content

Commit 6aeacaf

Browse files
authored
refactor(testkit): split expect/get helpers and refine mailbox check (#2313)
1 parent 334a0da commit 6aeacaf

File tree

15 files changed

+313
-101
lines changed

15 files changed

+313
-101
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ Proto.Actor includes a TestKit library for unit testing actors.
101101
```csharp
102102
var probe = new TestProbe();
103103
system.Root.Spawn(Props.FromProducer(() => probe));
104-
await probe.GetNextMessageAsync<string>();
104+
await probe.ExpectNextUserMessageAsync<string>();
105105
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(100));
106106
```
107107

src/Proto.TestKit/ITestProbe.cs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
using System;
88
using System.Threading;
99
using System.Threading.Tasks;
10+
using Proto;
11+
using Proto.Mailbox;
1012

1113
namespace Proto.TestKit;
1214

@@ -60,6 +62,94 @@ public interface ITestProbe
6062
Task<T> GetNextMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
6163
CancellationToken cancellationToken = default);
6264

65+
/// <summary>
66+
/// asynchronously gets the next system message of type <typeparamref name="T"/>
67+
/// </summary>
68+
/// <typeparam name="T">The system message type</typeparam>
69+
/// <param name="timeAllowed"></param>
70+
/// <param name="cancellationToken"></param>
71+
/// <returns></returns>
72+
Task<T> GetNextSystemMessageAsync<T>(TimeSpan? timeAllowed = null,
73+
CancellationToken cancellationToken = default) where T : SystemMessage;
74+
75+
/// <summary>
76+
/// asynchronously gets the next user message of type <typeparamref name="T"/>
77+
/// </summary>
78+
/// <typeparam name="T">The user message type</typeparam>
79+
/// <param name="timeAllowed"></param>
80+
/// <param name="cancellationToken"></param>
81+
/// <returns></returns>
82+
Task<T> GetNextUserMessageAsync<T>(TimeSpan? timeAllowed = null,
83+
CancellationToken cancellationToken = default);
84+
85+
/// <summary>
86+
/// asynchronously gets the next system message of type <typeparamref name="T"/> satisfying a predicate
87+
/// </summary>
88+
/// <typeparam name="T"></typeparam>
89+
/// <param name="when"></param>
90+
/// <param name="timeAllowed"></param>
91+
/// <param name="cancellationToken"></param>
92+
/// <returns></returns>
93+
Task<T> GetNextSystemMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
94+
CancellationToken cancellationToken = default) where T : SystemMessage;
95+
96+
/// <summary>
97+
/// asynchronously gets the next user message of type <typeparamref name="T"/> satisfying a predicate
98+
/// </summary>
99+
/// <typeparam name="T"></typeparam>
100+
/// <param name="when"></param>
101+
/// <param name="timeAllowed"></param>
102+
/// <param name="cancellationToken"></param>
103+
/// <returns></returns>
104+
Task<T> GetNextUserMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
105+
CancellationToken cancellationToken = default);
106+
107+
/// <summary>
108+
/// asynchronously expects the next system message of type <typeparamref name="T"/>
109+
/// </summary>
110+
/// <typeparam name="T">The system message type</typeparam>
111+
/// <param name="timeAllowed"></param>
112+
/// <param name="cancellationToken"></param>
113+
Task ExpectNextSystemMessageAsync<T>(TimeSpan? timeAllowed = null,
114+
CancellationToken cancellationToken = default) where T : SystemMessage;
115+
116+
/// <summary>
117+
/// asynchronously expects the next user message of type <typeparamref name="T"/>
118+
/// </summary>
119+
/// <typeparam name="T">The user message type</typeparam>
120+
/// <param name="timeAllowed"></param>
121+
/// <param name="cancellationToken"></param>
122+
Task ExpectNextUserMessageAsync<T>(TimeSpan? timeAllowed = null,
123+
CancellationToken cancellationToken = default);
124+
125+
/// <summary>
126+
/// asynchronously expects the next system message of type <typeparamref name="T"/> satisfying a predicate
127+
/// </summary>
128+
/// <typeparam name="T"></typeparam>
129+
/// <param name="when"></param>
130+
/// <param name="timeAllowed"></param>
131+
/// <param name="cancellationToken"></param>
132+
Task ExpectNextSystemMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
133+
CancellationToken cancellationToken = default) where T : SystemMessage;
134+
135+
/// <summary>
136+
/// asynchronously expects the next user message of type <typeparamref name="T"/> satisfying a predicate
137+
/// </summary>
138+
/// <typeparam name="T"></typeparam>
139+
/// <param name="when"></param>
140+
/// <param name="timeAllowed"></param>
141+
/// <param name="cancellationToken"></param>
142+
Task ExpectNextUserMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
143+
CancellationToken cancellationToken = default);
144+
145+
/// <summary>
146+
/// ensures that the probe mailbox is empty
147+
/// </summary>
148+
/// <param name="timeAllowed"></param>
149+
/// <param name="cancellationToken"></param>
150+
Task ExpectEmptyMailboxAsync(TimeSpan? timeAllowed = null,
151+
CancellationToken cancellationToken = default);
152+
63153
/// <summary>
64154
/// asynchronously fishes for the next message of a given type from the test probe
65155
/// </summary>

src/Proto.TestKit/TestProbe.cs

Lines changed: 130 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,32 +83,20 @@ public async Task ExpectNoMessageAsync(TimeSpan? timeAllowed = null, Cancellatio
8383
public async Task<object?> GetNextMessageAsync(TimeSpan? timeAllowed = null,
8484
CancellationToken cancellationToken = default)
8585
{
86-
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
87-
cts.CancelAfter(timeAllowed ?? TimeSpan.FromSeconds(1));
88-
89-
try
90-
{
91-
var item = await _channel.Reader.ReadAsync(cts.Token);
92-
Sender = item.Sender;
86+
var item = await ReceiveNextAsync(timeAllowed, cancellationToken);
9387

94-
return item.Message;
95-
}
96-
catch (OperationCanceledException)
97-
{
98-
var seconds = (timeAllowed ?? TimeSpan.FromSeconds(1)).TotalSeconds.ToString("0.###");
99-
throw new TestKitException($"Waited {seconds} seconds but failed to receive a message");
100-
}
88+
return item.Message;
10189
}
10290

10391
/// <inheritdoc />
10492
public async Task<T> GetNextMessageAsync<T>(TimeSpan? timeAllowed = null,
10593
CancellationToken cancellationToken = default)
10694
{
107-
var output = await GetNextMessageAsync(timeAllowed, cancellationToken);
95+
var item = await ReceiveNextAsync(timeAllowed, cancellationToken);
10896

109-
if (output is not T typed)
97+
if (item.Message is not T typed)
11098
{
111-
throw new TestKitException($"Message expected type {typeof(T)}, actual type {output?.GetType()}");
99+
throw new TestKitException($"Message expected type {typeof(T)}, actual type {item.Message?.GetType()}");
112100
}
113101

114102
return typed;
@@ -195,4 +183,129 @@ public Task<T> RequestAsync<T>(PID target, object message, TimeSpan timeAllowed)
195183
Context.RequestAsync<T>(target, message, timeAllowed);
196184

197185
public static implicit operator PID?(TestProbe tp) => tp.Context.Self;
186+
187+
private async Task<MessageAndSender> ReceiveNextAsync(TimeSpan? timeAllowed,
188+
CancellationToken cancellationToken)
189+
{
190+
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
191+
cts.CancelAfter(timeAllowed ?? TimeSpan.FromSeconds(1));
192+
193+
try
194+
{
195+
var item = await _channel.Reader.ReadAsync(cts.Token);
196+
Sender = item.Sender;
197+
return item;
198+
}
199+
catch (OperationCanceledException)
200+
{
201+
var seconds = (timeAllowed ?? TimeSpan.FromSeconds(1)).TotalSeconds.ToString("0.###");
202+
throw new TestKitException($"Waited {seconds} seconds but failed to receive a message");
203+
}
204+
}
205+
206+
/// <inheritdoc />
207+
public async Task<T> GetNextSystemMessageAsync<T>(TimeSpan? timeAllowed = null,
208+
CancellationToken cancellationToken = default) where T : SystemMessage
209+
{
210+
var item = await ReceiveNextAsync(timeAllowed, cancellationToken);
211+
212+
if (item.Message is not SystemMessage)
213+
{
214+
throw new TestKitException(
215+
$"Expected system message of type {typeof(T)}, but received user message of type {item.Message?.GetType()}");
216+
}
217+
218+
if (item.Message is not T typed)
219+
{
220+
throw new TestKitException($"Message expected type {typeof(T)}, actual type {item.Message.GetType()}");
221+
}
222+
223+
return typed;
224+
}
225+
226+
/// <inheritdoc />
227+
public async Task<T> GetNextUserMessageAsync<T>(TimeSpan? timeAllowed = null,
228+
CancellationToken cancellationToken = default)
229+
{
230+
var item = await ReceiveNextAsync(timeAllowed, cancellationToken);
231+
232+
if (item.Message is SystemMessage sys)
233+
{
234+
throw new TestKitException(
235+
$"Expected user message of type {typeof(T)}, but received system message of type {sys.GetType()}");
236+
}
237+
238+
if (item.Message is not T typed)
239+
{
240+
throw new TestKitException($"Message expected type {typeof(T)}, actual type {item.Message?.GetType()}");
241+
}
242+
243+
return typed;
244+
}
245+
246+
/// <inheritdoc />
247+
public async Task<T> GetNextSystemMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
248+
CancellationToken cancellationToken = default) where T : SystemMessage
249+
{
250+
var output = await GetNextSystemMessageAsync<T>(timeAllowed, cancellationToken);
251+
252+
if (!when(output))
253+
{
254+
throw new TestKitException("Condition not met");
255+
}
256+
257+
return output;
258+
}
259+
260+
/// <inheritdoc />
261+
public async Task<T> GetNextUserMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
262+
CancellationToken cancellationToken = default)
263+
{
264+
var output = await GetNextUserMessageAsync<T>(timeAllowed, cancellationToken);
265+
266+
if (!when(output))
267+
{
268+
throw new TestKitException("Condition not met");
269+
}
270+
271+
return output;
272+
}
273+
274+
/// <inheritdoc />
275+
public async Task ExpectNextSystemMessageAsync<T>(TimeSpan? timeAllowed = null,
276+
CancellationToken cancellationToken = default) where T : SystemMessage =>
277+
_ = await GetNextSystemMessageAsync<T>(timeAllowed, cancellationToken);
278+
279+
/// <inheritdoc />
280+
public async Task ExpectNextUserMessageAsync<T>(TimeSpan? timeAllowed = null,
281+
CancellationToken cancellationToken = default) =>
282+
_ = await GetNextUserMessageAsync<T>(timeAllowed, cancellationToken);
283+
284+
/// <inheritdoc />
285+
public async Task ExpectNextSystemMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
286+
CancellationToken cancellationToken = default) where T : SystemMessage =>
287+
_ = await GetNextSystemMessageAsync(when, timeAllowed, cancellationToken);
288+
289+
/// <inheritdoc />
290+
public async Task ExpectNextUserMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
291+
CancellationToken cancellationToken = default) =>
292+
_ = await GetNextUserMessageAsync(when, timeAllowed, cancellationToken);
293+
294+
/// <inheritdoc />
295+
public async Task ExpectEmptyMailboxAsync(TimeSpan? timeAllowed = null,
296+
CancellationToken cancellationToken = default)
297+
{
298+
var self = Context.Self;
299+
300+
if (timeAllowed.HasValue)
301+
{
302+
await RequestAsync<Touched>(self, new Touch(), timeAllowed.Value);
303+
}
304+
else
305+
{
306+
await RequestAsync<Touched>(self, new Touch(), cancellationToken);
307+
}
308+
309+
await GetNextUserMessageAsync<Touch>(timeAllowed, cancellationToken);
310+
}
198311
}

src/Proto.TestKit/TestProbeSystemMessageExtensions.cs

Lines changed: 0 additions & 27 deletions
This file was deleted.

tests/Proto.Actor.Tests/ReceiveTimeoutTests.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public async Task receive_timeout_received_within_expected_time()
3434

3535
context.Spawn(props);
3636

37-
await probe.GetNextMessageAsync<ReceiveTimeout>();
37+
await probe.ExpectNextSystemMessageAsync<ReceiveTimeout>();
3838
}
3939

4040
[Fact]
@@ -64,7 +64,7 @@ public async Task receive_timeout_received_within_expected_time_when_sending_ign
6464
var scheduler = context.Scheduler();
6565
var cts = scheduler.SendRepeatedly(TimeSpan.Zero, TimeSpan.FromMilliseconds(100), pid, new IgnoreMe());
6666

67-
await probe.GetNextMessageAsync<ReceiveTimeout>();
67+
await probe.ExpectNextSystemMessageAsync<ReceiveTimeout>();
6868
cts.Cancel();
6969
}
7070

@@ -100,7 +100,7 @@ public async Task receive_timeout_is_reset_by_influencing_messages()
100100

101101
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(400));
102102
cts.Cancel();
103-
await probe.GetNextMessageAsync<ReceiveTimeout>();
103+
await probe.ExpectNextSystemMessageAsync<ReceiveTimeout>();
104104
}
105105

106106
[Fact]
@@ -129,7 +129,7 @@ public async Task receive_timeout_not_received_within_expected_time()
129129

130130
context.Spawn(props);
131131

132-
await probe.GetNextMessageAsync<string>();
132+
await probe.ExpectNextUserMessageAsync<string>();
133133
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(200));
134134
}
135135

@@ -194,7 +194,7 @@ public async Task can_still_set_receive_timeout_after_cancelling()
194194

195195
context.Spawn(props);
196196

197-
await probe.GetNextMessageAsync<ReceiveTimeout>();
197+
await probe.ExpectNextSystemMessageAsync<ReceiveTimeout>();
198198
}
199199

200200
private record IgnoreMe : INotInfluenceReceiveTimeout;

0 commit comments

Comments
 (0)