Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Proto.Actor includes a TestKit library for unit testing actors.
```csharp
var probe = new TestProbe();
system.Root.Spawn(Props.FromProducer(() => probe));
await probe.GetNextMessageAsync<string>();
await probe.ExpectNextUserMessageAsync<string>();
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(100));
```

Expand Down
90 changes: 90 additions & 0 deletions src/Proto.TestKit/ITestProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Proto;
using Proto.Mailbox;

namespace Proto.TestKit;

Expand Down Expand Up @@ -60,6 +62,94 @@ public interface ITestProbe
Task<T> GetNextMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default);

/// <summary>
/// asynchronously gets the next system message of type <typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">The system message type</typeparam>
/// <param name="timeAllowed"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<T> GetNextSystemMessageAsync<T>(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) where T : SystemMessage;

/// <summary>
/// asynchronously gets the next user message of type <typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">The user message type</typeparam>
/// <param name="timeAllowed"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<T> GetNextUserMessageAsync<T>(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default);

/// <summary>
/// asynchronously gets the next system message of type <typeparamref name="T"/> satisfying a predicate
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="when"></param>
/// <param name="timeAllowed"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<T> GetNextSystemMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) where T : SystemMessage;

/// <summary>
/// asynchronously gets the next user message of type <typeparamref name="T"/> satisfying a predicate
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="when"></param>
/// <param name="timeAllowed"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<T> GetNextUserMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default);

/// <summary>
/// asynchronously expects the next system message of type <typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">The system message type</typeparam>
/// <param name="timeAllowed"></param>
/// <param name="cancellationToken"></param>
Task ExpectNextSystemMessageAsync<T>(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) where T : SystemMessage;

/// <summary>
/// asynchronously expects the next user message of type <typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">The user message type</typeparam>
/// <param name="timeAllowed"></param>
/// <param name="cancellationToken"></param>
Task ExpectNextUserMessageAsync<T>(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default);

/// <summary>
/// asynchronously expects the next system message of type <typeparamref name="T"/> satisfying a predicate
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="when"></param>
/// <param name="timeAllowed"></param>
/// <param name="cancellationToken"></param>
Task ExpectNextSystemMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) where T : SystemMessage;

/// <summary>
/// asynchronously expects the next user message of type <typeparamref name="T"/> satisfying a predicate
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="when"></param>
/// <param name="timeAllowed"></param>
/// <param name="cancellationToken"></param>
Task ExpectNextUserMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default);

/// <summary>
/// ensures that the probe mailbox is empty
/// </summary>
/// <param name="timeAllowed"></param>
/// <param name="cancellationToken"></param>
Task ExpectEmptyMailboxAsync(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default);

/// <summary>
/// asynchronously fishes for the next message of a given type from the test probe
/// </summary>
Expand Down
147 changes: 130 additions & 17 deletions src/Proto.TestKit/TestProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,32 +83,20 @@ public async Task ExpectNoMessageAsync(TimeSpan? timeAllowed = null, Cancellatio
public async Task<object?> GetNextMessageAsync(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeAllowed ?? TimeSpan.FromSeconds(1));

try
{
var item = await _channel.Reader.ReadAsync(cts.Token);
Sender = item.Sender;
var item = await ReceiveNextAsync(timeAllowed, cancellationToken);

return item.Message;
}
catch (OperationCanceledException)
{
var seconds = (timeAllowed ?? TimeSpan.FromSeconds(1)).TotalSeconds.ToString("0.###");
throw new TestKitException($"Waited {seconds} seconds but failed to receive a message");
}
return item.Message;
}

/// <inheritdoc />
public async Task<T> GetNextMessageAsync<T>(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default)
{
var output = await GetNextMessageAsync(timeAllowed, cancellationToken);
var item = await ReceiveNextAsync(timeAllowed, cancellationToken);

if (output is not T typed)
if (item.Message is not T typed)
{
throw new TestKitException($"Message expected type {typeof(T)}, actual type {output?.GetType()}");
throw new TestKitException($"Message expected type {typeof(T)}, actual type {item.Message?.GetType()}");
}

return typed;
Expand Down Expand Up @@ -195,4 +183,129 @@ public Task<T> RequestAsync<T>(PID target, object message, TimeSpan timeAllowed)
Context.RequestAsync<T>(target, message, timeAllowed);

public static implicit operator PID?(TestProbe tp) => tp.Context.Self;

private async Task<MessageAndSender> ReceiveNextAsync(TimeSpan? timeAllowed,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeAllowed ?? TimeSpan.FromSeconds(1));

try
{
var item = await _channel.Reader.ReadAsync(cts.Token);
Sender = item.Sender;
return item;
}
catch (OperationCanceledException)
{
var seconds = (timeAllowed ?? TimeSpan.FromSeconds(1)).TotalSeconds.ToString("0.###");
throw new TestKitException($"Waited {seconds} seconds but failed to receive a message");
}
}

/// <inheritdoc />
public async Task<T> GetNextSystemMessageAsync<T>(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) where T : SystemMessage
{
var item = await ReceiveNextAsync(timeAllowed, cancellationToken);

if (item.Message is not SystemMessage)
{
throw new TestKitException(
$"Expected system message of type {typeof(T)}, but received user message of type {item.Message?.GetType()}");
}

if (item.Message is not T typed)
{
throw new TestKitException($"Message expected type {typeof(T)}, actual type {item.Message.GetType()}");
}

return typed;
}

/// <inheritdoc />
public async Task<T> GetNextUserMessageAsync<T>(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default)
{
var item = await ReceiveNextAsync(timeAllowed, cancellationToken);

if (item.Message is SystemMessage sys)
{
throw new TestKitException(
$"Expected user message of type {typeof(T)}, but received system message of type {sys.GetType()}");
}

if (item.Message is not T typed)
{
throw new TestKitException($"Message expected type {typeof(T)}, actual type {item.Message?.GetType()}");
}

return typed;
}

/// <inheritdoc />
public async Task<T> GetNextSystemMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) where T : SystemMessage
{
var output = await GetNextSystemMessageAsync<T>(timeAllowed, cancellationToken);

if (!when(output))
{
throw new TestKitException("Condition not met");
}

return output;
}

/// <inheritdoc />
public async Task<T> GetNextUserMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default)
{
var output = await GetNextUserMessageAsync<T>(timeAllowed, cancellationToken);

if (!when(output))
{
throw new TestKitException("Condition not met");
}

return output;
}

/// <inheritdoc />
public async Task ExpectNextSystemMessageAsync<T>(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) where T : SystemMessage =>
_ = await GetNextSystemMessageAsync<T>(timeAllowed, cancellationToken);

/// <inheritdoc />
public async Task ExpectNextUserMessageAsync<T>(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) =>
_ = await GetNextUserMessageAsync<T>(timeAllowed, cancellationToken);

/// <inheritdoc />
public async Task ExpectNextSystemMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) where T : SystemMessage =>
_ = await GetNextSystemMessageAsync(when, timeAllowed, cancellationToken);

/// <inheritdoc />
public async Task ExpectNextUserMessageAsync<T>(Func<T, bool> when, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default) =>
_ = await GetNextUserMessageAsync(when, timeAllowed, cancellationToken);

/// <inheritdoc />
public async Task ExpectEmptyMailboxAsync(TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default)
{
var self = Context.Self;

if (timeAllowed.HasValue)
{
await RequestAsync<Touched>(self, new Touch(), timeAllowed.Value);
}
else
{
await RequestAsync<Touched>(self, new Touch(), cancellationToken);
}

await GetNextUserMessageAsync<Touch>(timeAllowed, cancellationToken);
}
}
27 changes: 0 additions & 27 deletions src/Proto.TestKit/TestProbeSystemMessageExtensions.cs

This file was deleted.

10 changes: 5 additions & 5 deletions tests/Proto.Actor.Tests/ReceiveTimeoutTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public async Task receive_timeout_received_within_expected_time()

context.Spawn(props);

await probe.GetNextMessageAsync<ReceiveTimeout>();
await probe.ExpectNextSystemMessageAsync<ReceiveTimeout>();
}

[Fact]
Expand Down Expand Up @@ -64,7 +64,7 @@ public async Task receive_timeout_received_within_expected_time_when_sending_ign
var scheduler = context.Scheduler();
var cts = scheduler.SendRepeatedly(TimeSpan.Zero, TimeSpan.FromMilliseconds(100), pid, new IgnoreMe());

await probe.GetNextMessageAsync<ReceiveTimeout>();
await probe.ExpectNextSystemMessageAsync<ReceiveTimeout>();
cts.Cancel();
}

Expand Down Expand Up @@ -100,7 +100,7 @@ public async Task receive_timeout_is_reset_by_influencing_messages()

await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(400));
cts.Cancel();
await probe.GetNextMessageAsync<ReceiveTimeout>();
await probe.ExpectNextSystemMessageAsync<ReceiveTimeout>();
}

[Fact]
Expand Down Expand Up @@ -129,7 +129,7 @@ public async Task receive_timeout_not_received_within_expected_time()

context.Spawn(props);

await probe.GetNextMessageAsync<string>();
await probe.ExpectNextUserMessageAsync<string>();
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(200));
}

Expand Down Expand Up @@ -194,7 +194,7 @@ public async Task can_still_set_receive_timeout_after_cancelling()

context.Spawn(props);

await probe.GetNextMessageAsync<ReceiveTimeout>();
await probe.ExpectNextSystemMessageAsync<ReceiveTimeout>();
}

private record IgnoreMe : INotInfluenceReceiveTimeout;
Expand Down
Loading
Loading