diff --git a/tests/Proto.Actor.Tests/Router/BroadcastPoolRouterTests.cs b/tests/Proto.Actor.Tests/Router/BroadcastPoolRouterTests.cs new file mode 100644 index 0000000000..a1bed17eb5 --- /dev/null +++ b/tests/Proto.Actor.Tests/Router/BroadcastPoolRouterTests.cs @@ -0,0 +1,58 @@ +using System; +using System.Threading.Tasks; +using Proto.Router.Messages; +using Proto.TestFixtures; +using Proto; +using Xunit; + +namespace Proto.Router.Tests; + +public class BroadcastPoolRouterTests +{ + private static readonly Props MyActorProps = Props.FromProducer(() => new RecordingActor()); + private readonly TimeSpan _timeout = TimeSpan.FromMilliseconds(1000); + + [Fact] + public async Task BroadcastPoolRouter_RemovedRouteesDoNotReceiveMessages() + { + var system = new ActorSystem(); + await using var _ = system; + + var props = system.Root.NewBroadcastPool(MyActorProps, 3) + .WithMailbox(() => new TestMailbox()); + var router = system.Root.Spawn(props); + + var routees = await system.Root.RequestAsync(router, new RouterGetRoutees(), _timeout); + var routee1 = routees.Pids[0]; + var routee2 = routees.Pids[1]; + var routee3 = routees.Pids[2]; + + system.Root.Send(router, "first"); + system.Root.Send(router, new RouterRemoveRoutee(routee1)); + system.Root.Send(router, "second"); + + Assert.Equal("first", await system.Root.RequestAsync(routee1, "received?", _timeout)); + Assert.Equal("second", await system.Root.RequestAsync(routee2, "received?", _timeout)); + Assert.Equal("second", await system.Root.RequestAsync(routee3, "received?", _timeout)); + } + + private class RecordingActor : IActor + { + private string? _received; + + public Task ReceiveAsync(IContext context) + { + switch (context.Message) + { + case "received?": + context.Respond(_received!); + break; + case string s: + _received = s; + break; + } + + return Task.CompletedTask; + } + } +} diff --git a/tests/Proto.Actor.Tests/Router/ConsistentHashPoolRouterTests.cs b/tests/Proto.Actor.Tests/Router/ConsistentHashPoolRouterTests.cs new file mode 100644 index 0000000000..7b0ba13c16 --- /dev/null +++ b/tests/Proto.Actor.Tests/Router/ConsistentHashPoolRouterTests.cs @@ -0,0 +1,46 @@ +using System; +using System.Threading.Tasks; +using Proto.Router.Messages; +using Proto.TestFixtures; +using Proto; +using Xunit; + +namespace Proto.Router.Tests; + +public class ConsistentHashPoolRouterTests +{ + private static readonly Props MyActorProps = Props.FromProducer(() => new RespondWithSelfActor()); + private readonly TimeSpan _timeout = TimeSpan.FromMilliseconds(1000); + + private record Ping(string Id) : IHashable + { + public string HashBy() => Id; + } + + [Fact] + public async Task ConsistentHashPoolRouter_MessageWithSameHashAlwaysGoesToSameRoutee() + { + var system = new ActorSystem(); + await using var _ = system; + + var props = system.Root.NewConsistentHashPool(MyActorProps, 3) + .WithMailbox(() => new TestMailbox()); + var router = system.Root.Spawn(props); + + var pid1 = await system.Root.RequestAsync(router, new Ping("a"), _timeout); + var pid2 = await system.Root.RequestAsync(router, new Ping("b"), _timeout); + var pid3 = await system.Root.RequestAsync(router, new Ping("a"), _timeout); + + Assert.Equal(pid1, pid3); + Assert.NotEqual(pid1, pid2); + } + + private class RespondWithSelfActor : IActor + { + public Task ReceiveAsync(IContext context) + { + context.Respond(context.Self); + return Task.CompletedTask; + } + } +} diff --git a/tests/Proto.Actor.Tests/Router/RoundRobinPoolRouterTests.cs b/tests/Proto.Actor.Tests/Router/RoundRobinPoolRouterTests.cs new file mode 100644 index 0000000000..1662bbe896 --- /dev/null +++ b/tests/Proto.Actor.Tests/Router/RoundRobinPoolRouterTests.cs @@ -0,0 +1,43 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Proto; +using Proto.TestFixtures; +using Xunit; + +namespace Proto.Router.Tests; + +public class RoundRobinPoolRouterTests +{ + private static readonly Props MyActorProps = Props.FromProducer(() => new RespondWithSelfActor()); + private readonly TimeSpan _timeout = TimeSpan.FromMilliseconds(1000); + + [Fact] + public async Task RoundRobinPoolRouter_RoutesMessagesInRoundRobinOrder() + { + var system = new ActorSystem(); + await using var _ = system; + + var props = system.Root.NewRoundRobinPool(MyActorProps, 3) + .WithMailbox(() => new TestMailbox()); + var router = system.Root.Spawn(props); + + var first = await system.Root.RequestAsync(router, "1", _timeout); + var second = await system.Root.RequestAsync(router, "2", _timeout); + var third = await system.Root.RequestAsync(router, "3", _timeout); + var fourth = await system.Root.RequestAsync(router, "4", _timeout); + + var firstThree = new[] { first, second, third }; + Assert.Equal(3, firstThree.Distinct().Count()); + Assert.Equal(first, fourth); + } + + private class RespondWithSelfActor : IActor + { + public Task ReceiveAsync(IContext context) + { + context.Respond(context.Self); + return Task.CompletedTask; + } + } +}