Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Proto.TestKit/TestProbeSystemMessageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ public static class TestProbeSystemMessageExtensions
public static Task<T> ExpectSystemMessageAsync<T>(this ITestProbe probe, TimeSpan? timeAllowed = null,
CancellationToken cancellationToken = default)
where T : SystemMessage => probe.GetNextMessageAsync<T>(timeAllowed, cancellationToken);

/// <summary>
/// Asynchronously retrieves the next system message of type <typeparamref name="T"/> that satisfies
/// the given predicate.
/// </summary>
public static Task<T> ExpectSystemMessageAsync<T>(this ITestProbe probe, Func<T, bool> when,
TimeSpan? timeAllowed = null, CancellationToken cancellationToken = default)
where T : SystemMessage => probe.GetNextMessageAsync(when, timeAllowed, cancellationToken);
}
7 changes: 3 additions & 4 deletions tests/Proto.Actor.Tests/SchedulerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public async Task SendOnceWorksWithTimeProvider()
scheduler.SendOnce(TimeSpan.FromSeconds(10), pid, "Wakeup");
await hook.WaitAsync();
timeProvider.Advance(TimeSpan.FromMinutes(10));
var msg = await probe.GetNextMessageAsync<string>(TimeSpan.FromMilliseconds(10));
Assert.Equal("Wakeup", msg);
await probe.GetNextMessageAsync<string>(x => x == "Wakeup", TimeSpan.FromMilliseconds(100));
}

[Fact]
Expand All @@ -43,7 +42,7 @@ public async Task SendRepeatedlyCanBeCancelled()

await hook.WaitAsync();
timeProvider.Advance(TimeSpan.FromMinutes(1));
await probe.GetNextMessageAsync<string>(TimeSpan.FromMilliseconds(10));
await probe.GetNextMessageAsync<string>(TimeSpan.FromMilliseconds(100));

cts.Cancel();
timeProvider.Advance(TimeSpan.FromMinutes(1));
Expand Down Expand Up @@ -114,7 +113,7 @@ public async Task RequestRepeatedlyCanBeCancelled()

await hook.WaitAsync();
timeProvider.Advance(TimeSpan.FromSeconds(5));
await probe.GetNextMessageAsync<string>(TimeSpan.FromMilliseconds(10));
await probe.GetNextMessageAsync<string>(TimeSpan.FromMilliseconds(100));

context.Send(requester, "Cancel");

Expand Down
3 changes: 1 addition & 2 deletions tests/Proto.Actor.Tests/TestProbeAsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ public async Task Probe_can_await_next_message()
}));

system.Root.Send(target, "hello");
var msg = await probe.GetNextMessageAsync<string>();
Assert.Equal("hello", msg);
await probe.GetNextMessageAsync<string>(s => s == "hello");
}

[Fact]
Expand Down
6 changes: 2 additions & 4 deletions tests/Proto.Actor.Tests/TimerExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ public async Task SchedulerSchedulesMessageAfterDelay()
// ensure message isn't delivered immediately
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(100));

var msg = await probe.GetNextMessageAsync<string>(TimeSpan.FromSeconds(5));
Assert.Equal("Wakeup", msg);
await probe.GetNextMessageAsync<string>(s => s == "Wakeup", TimeSpan.FromSeconds(5));
}

[Fact]
Expand All @@ -43,8 +42,7 @@ public async Task SchedulerWithTimeProviderSchedulesMessageAfterDelay()
await Task.Delay(50);
timeProvider.Advance(TimeSpan.FromMinutes(1));

var msg = await probe.GetNextMessageAsync<string>(TimeSpan.FromMilliseconds(10));
Assert.Equal("Wakeup", msg);
await probe.GetNextMessageAsync<string>(s => s == "Wakeup", TimeSpan.FromMilliseconds(100));
}
}

4 changes: 2 additions & 2 deletions tests/Proto.Actor.Tests/WatchTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async Task MultipleStopsTriggerSingleTerminated()

context.Send(child, "stop");

await probe.GetNextMessageAsync<Terminated>();
await probe.ExpectSystemMessageAsync<Terminated>(t => Equals(t.Who, child));
await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(100));
}

Expand All @@ -54,7 +54,7 @@ public async Task CanWatchLocalActors()

await context.StopAsync(watchee);

await probe.GetNextMessageAsync<Terminated>();
await probe.ExpectSystemMessageAsync<Terminated>(t => Equals(t.Who, watchee));
}

[Fact]
Expand Down
31 changes: 14 additions & 17 deletions tests/Proto.Remote.Tests/RemoteTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,9 @@ public async Task CanWatchRemoteActor()
probe.Context.Watch(remoteActor);
await Task.Delay(20); // allow RemoteWatch to propagate

await System.Root.StopAsync(remoteActor);
await System.Root.PoisonAsync(remoteActor);

var terminated = await probe.ExpectSystemMessageAsync<Terminated>(TimeSpan.FromSeconds(10));
Assert.Equal(remoteActor, terminated.Who);
await probe.ExpectSystemMessageAsync<Terminated>(t => Equals(t.Who, remoteActor), TimeSpan.FromSeconds(10));
}

[Fact]
Expand All @@ -188,11 +187,13 @@ public async Task CanWatchMultipleRemoteActors()
probe.Context.Watch(remoteActor2);
await Task.Delay(20); // allow RemoteWatch to propagate

await System.Root.StopAsync(remoteActor1);
await System.Root.StopAsync(remoteActor2);
await System.Root.PoisonAsync(remoteActor1);
await System.Root.PoisonAsync(remoteActor2);

var term1 = await probe.ExpectSystemMessageAsync<Terminated>(TimeSpan.FromSeconds(10));
var term2 = await probe.ExpectSystemMessageAsync<Terminated>(TimeSpan.FromSeconds(10));
var term1 = await probe.ExpectSystemMessageAsync<Terminated>(t =>
Equals(t.Who, remoteActor1) || Equals(t.Who, remoteActor2), TimeSpan.FromSeconds(10));
var term2 = await probe.ExpectSystemMessageAsync<Terminated>(t =>
Equals(t.Who, remoteActor1) || Equals(t.Who, remoteActor2), TimeSpan.FromSeconds(10));
new[] { term1.Who, term2.Who }.Should().BeEquivalentTo(new[] { remoteActor1, remoteActor2 });
}

Expand All @@ -209,12 +210,10 @@ public async Task MultipleLocalActorsCanWatchRemoteActor()
probe2.Context.Watch(remoteActor);
await Task.Delay(20); // allow RemoteWatch to propagate

await System.Root.StopAsync(remoteActor);
await System.Root.PoisonAsync(remoteActor);

var t1 = await probe1.ExpectSystemMessageAsync<Terminated>(TimeSpan.FromSeconds(10));
var t2 = await probe2.ExpectSystemMessageAsync<Terminated>(TimeSpan.FromSeconds(10));
Assert.Equal(remoteActor, t1.Who);
Assert.Equal(remoteActor, t2.Who);
await probe1.ExpectSystemMessageAsync<Terminated>(t => Equals(t.Who, remoteActor), TimeSpan.FromSeconds(10));
await probe2.ExpectSystemMessageAsync<Terminated>(t => Equals(t.Who, remoteActor), TimeSpan.FromSeconds(10));
}

[Fact]
Expand All @@ -233,10 +232,9 @@ public async Task CanUnwatchRemoteActor()
probe2.Context.Unwatch(remoteActor);
await Task.Delay(TimeSpan.FromSeconds(3));

await System.Root.StopAsync(remoteActor);
await System.Root.PoisonAsync(remoteActor);

var term = await probe1.ExpectSystemMessageAsync<Terminated>(TimeSpan.FromSeconds(10));
Assert.Equal(remoteActor, term.Who);
await probe1.ExpectSystemMessageAsync<Terminated>(t => Equals(t.Who, remoteActor), TimeSpan.FromSeconds(10));

await probe2.ExpectNoMessageAsync(TimeSpan.FromSeconds(1));
}
Expand All @@ -254,8 +252,7 @@ public async Task WhenRemoteTerminated_LocalWatcherReceivesNotification()

System.Root.Send(remoteActor, new Die());

var term = await probe.ExpectSystemMessageAsync<Terminated>(TimeSpan.FromSeconds(10));
Assert.Equal(remoteActor, term.Who);
await probe.ExpectSystemMessageAsync<Terminated>(t => Equals(t.Who, remoteActor), TimeSpan.FromSeconds(10));

await probe.ExpectNoMessageAsync(TimeSpan.FromMilliseconds(200));
}
Expand Down
3 changes: 1 addition & 2 deletions tests/Proto.TestKit.Tests/TestProbeAsyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ public async Task GetNextMessageAsync_returns_message()
var (probe, pid) = system.CreateTestProbe();

system.Root.Send(pid, "hello");
var msg = await probe.GetNextMessageAsync<string>();
msg.Should().Be("hello");
await probe.GetNextMessageAsync<string>(s => s == "hello");
}

[Fact]
Expand Down
Loading