Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 135 additions & 104 deletions Tests/Rpc/RpcServerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,45 @@ namespace Tests.Rpc
{
public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
{
private string _requestQueueName = string.Empty;
private string _replyToName = $"queueReplyTo-{Now}-{Guid.NewGuid()}";
private string _correlationId = $"my-correlation-id-{Guid.NewGuid()}";

public override async Task InitializeAsync()
{
await base.InitializeAsync();

Assert.NotNull(_management);

IQueueInfo requestQueueInfo = await _management.Queue()
.Exclusive(true)
.AutoDelete(true)
.DeclareAsync();

_requestQueueName = requestQueueInfo.Name();
}

[Fact]
public async Task MockRpcServerPingPong()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync();
TaskCompletionSource<IMessage> tcs = CreateTaskCompletionSource<IMessage>();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>

Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
{
var reply = context.Message("pong");
IMessage reply = context.Message("pong");
tcs.SetResult(reply);
return Task.FromResult(reply);
}).RequestQueue(_queueName).BuildAsync();
Assert.NotNull(rpcServer);
IPublisher p = await _connection.PublisherBuilder().Queue(_queueName).BuildAsync();
}

IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IPublisher p = await _connection.PublisherBuilder()
.Queue(_requestQueueName)
.BuildAsync();

await p.PublishAsync(new AmqpMessage("test"));
IMessage m = await WhenTcsCompletes(tcs);
Expand All @@ -41,15 +65,21 @@ public async Task MockRpcServerPingPong()
public async Task RpcServerValidateStateChange()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);

List<(State, State)> states = [];
await _management.Queue(_queueName).Exclusive(true).AutoDelete(true).DeclareAsync();
TaskCompletionSource<int> tcs = CreateTaskCompletionSource<int>();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>

static Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
{
var m = context.Message(request.Body());
IMessage m = context.Message(request.Body());
return Task.FromResult(m);
}).RequestQueue(_queueName).BuildAsync();
}

IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

rpcServer.ChangeState += (sender, fromState, toState, e) =>
{
states.Add((fromState, toState));
Expand All @@ -58,8 +88,9 @@ public async Task RpcServerValidateStateChange()
tcs.SetResult(states.Count);
}
};
Assert.NotNull(rpcServer);

await rpcServer.CloseAsync();

int count = await WhenTcsCompletes(tcs);
Assert.Equal(2, count);
Assert.Equal(State.Open, states[0].Item1);
Expand All @@ -76,33 +107,38 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
return Task.FromResult(reply);
}).RequestQueue(requestQueue).BuildAsync();

Assert.NotNull(rpcServer);
string queueReplyTo = $"queueReplyTo-{Now}";
IQueueSpecification spec = _management.Queue(queueReplyTo).Exclusive(true).AutoDelete(true);
await spec.DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IQueueSpecification replyQueueSpec = _management.Queue(_replyToName)
.Exclusive(true)
.AutoDelete(true);
await replyQueueSpec.DeclareAsync();

TaskCompletionSource<IMessage> tcs = CreateTaskCompletionSource<IMessage>();

IConsumer consumer = await _connection.ConsumerBuilder().Queue(queueReplyTo).MessageHandler(
(context, message) =>
{
context.Accept();
tcs.SetResult(message);
return Task.CompletedTask;
}).BuildAndStartAsync();
Task MessageHandler(IContext context, IMessage message)
{
context.Accept();
tcs.SetResult(message);
return Task.CompletedTask;
}

IPublisher publisher = await _connection.PublisherBuilder().Queue(requestQueue).BuildAsync();
Assert.NotNull(publisher);
AddressBuilder addressBuilder = new();
IConsumer consumer = await _connection.ConsumerBuilder()
.Queue(replyQueueSpec)
.MessageHandler(MessageHandler)
.BuildAndStartAsync();

IMessage message = new AmqpMessage("test").ReplyTo(addressBuilder.Queue(queueReplyTo).Address());
IPublisher publisher = await _connection.PublisherBuilder()
.Queue(_requestQueueName)
.BuildAsync();

AddressBuilder addressBuilder = new();
string replyToAddress = addressBuilder.Queue(replyQueueSpec).Address();
IMessage message = new AmqpMessage("test").ReplyTo(replyToAddress);
PublishResult pr = await publisher.PublishAsync(message);
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);

Expand All @@ -122,18 +158,15 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
public async Task RpcServerClientPingPongWithDefault()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
return Task.FromResult(reply);
}).RequestQueue(_queueName).BuildAsync();
Assert.NotNull(rpcServer);

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IRpcClient rpcClient = await _connection.RpcClientBuilder()
.RequestAddress()
.Queue(_requestQueueName)
.RpcClient()
.BuildAsync();

Expand All @@ -153,27 +186,22 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
return Task.FromResult(reply);
}).RequestQueue(_queueName)
.BuildAsync();
Assert.NotNull(rpcServer);

// custom replyTo queue
IQueueInfo replyTo =
await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

// custom correlationId supplier
const string correlationId = "my-correlation-id";
IQueueInfo replyTo = await _management.Queue(_replyToName)
.Exclusive(true)
.AutoDelete(true)
.DeclareAsync();

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
IRpcClient rpcClient = await _connection.RpcClientBuilder()
.RequestAddress()
.Queue(_requestQueueName)
.RpcClient()
.CorrelationIdSupplier(() => correlationId)
.CorrelationIdSupplier(() => _correlationId)
.CorrelationIdExtractor(message => message.CorrelationId())
.ReplyToQueue(replyTo.Name())
.BuildAsync();
Expand All @@ -182,7 +210,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS

IMessage response = await rpcClient.PublishAsync(message);
Assert.Equal("pong", response.Body());
Assert.Equal(correlationId, response.CorrelationId());
Assert.Equal(_correlationId, response.CorrelationId());
await rpcClient.CloseAsync();
await rpcServer.CloseAsync();
}
Expand All @@ -199,34 +227,29 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>
{
var reply = context.Message("pong");
return Task.FromResult(reply);
}).RequestQueue(_queueName)
//come from the client

IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.CorrelationIdExtractor(message => message.Property("correlationId"))
// replace the correlation id location with Application properties
.ReplyPostProcessor((reply, replyCorrelationId) => reply.Property("correlationId",
replyCorrelationId.ToString() ?? throw new InvalidOperationException()))
.BuildAsync();
Assert.NotNull(rpcServer);

IQueueInfo replyTo =
await _management.Queue($"replyTo-{Now}").Exclusive(true).AutoDelete(true).DeclareAsync();
IQueueInfo replyTo = await _management.Queue(_replyToName)
.Exclusive(true)
.AutoDelete(true)
.DeclareAsync();

// custom correlationId supplier
const string correlationId = "my-correlation-id";
int correlationIdCounter = 0;

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
IRpcClient rpcClient = await _connection.RpcClientBuilder()
.RequestAddress()
.Queue(_requestQueueName)
.RpcClient()
.ReplyToQueue(replyTo.Name())
// replace the correlation id creation with a custom function
.CorrelationIdSupplier(() => $"{correlationId}_{Interlocked.Increment(ref correlationIdCounter)}")
.CorrelationIdSupplier(() => $"{_correlationId}_{Interlocked.Increment(ref correlationIdCounter)}")
// The server will reply with the correlation id in application properties
.CorrelationIdExtractor(message => message.Property("correlationId"))
// The client will use application properties to set the correlation id
Expand All @@ -244,8 +267,8 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
IMessage response = await rpcClient.PublishAsync(message);
Assert.Equal("pong", response.Body());
// the server replies with the correlation id in the application properties
Assert.Equal($"{correlationId}_{i}", response.Property("correlationId"));
Assert.Equal($"{correlationId}_{i}", response.Properties()["correlationId"]);
Assert.Equal($"{_correlationId}_{i}", response.Property("correlationId"));
Assert.Equal($"{_correlationId}_{i}", response.Properties()["correlationId"]);
Assert.Single(response.Properties());
i++;
}
Expand All @@ -258,19 +281,16 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
public async Task RpcClientMultiThreadShouldBeSafe()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);

string requestQueue = _queueName;

await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
const int messagesToSend = 99;

TaskCompletionSource<bool> tcs = CreateTaskCompletionSource();
List<IMessage> messagesReceived = [];
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler((context, request) =>

Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
{
try
{
var reply = context.Message("pong");
IMessage reply = context.Message("pong");
messagesReceived.Add(request);
return Task.FromResult(reply);
}
Expand All @@ -281,17 +301,19 @@ public async Task RpcClientMultiThreadShouldBeSafe()
tcs.SetResult(true);
}
}
}).RequestQueue(requestQueue).BuildAsync();
}

Assert.NotNull(rpcServer);
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
.Queue(_requestQueueName)
.RpcClient()
.BuildAsync();

List<Task> tasks = [];

// we simulate a multi-thread environment
// where multiple threads send messages to the server
// and the server replies to each message in a consistent way
Expand Down Expand Up @@ -331,26 +353,29 @@ public async Task RpcClientMultiThreadShouldBeSafe()
public async Task RpcClientShouldRaiseTimeoutError()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
string requestQueue = _queueName;
await _management.Queue(requestQueue).Exclusive(true).AutoDelete(true).DeclareAsync();
IRpcServer rpcServer = await _connection.RpcServerBuilder().Handler(async (context, request) =>

static async Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
{
IMessage reply = context.Message("pong");
object millisecondsToWait = request.Property("wait");
await Task.Delay(TimeSpan.FromMilliseconds((int)millisecondsToWait));
return reply;
}).RequestQueue(_queueName).BuildAsync();
Assert.NotNull(rpcServer);
}

IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
.Queue(requestQueue)
IRpcServer rpcServer = await _connection.RpcServerBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();

IRpcClient rpcClient = await _connection.RpcClientBuilder()
.RequestAddress()
.Queue(_requestQueueName)
.RpcClient()
.Timeout(TimeSpan.FromMilliseconds(300))
.BuildAsync();

IMessage reply = await rpcClient.PublishAsync(
new AmqpMessage("ping").Property("wait", 1));
IMessage msg = new AmqpMessage("ping").Property("wait", 1);
IMessage reply = await rpcClient.PublishAsync(msg);
Assert.Equal("pong", reply.Body());

await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
Expand All @@ -359,5 +384,11 @@ await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
await rpcClient.CloseAsync();
await rpcServer.CloseAsync();
}

private static Task<IMessage> PongRpcHandler(IRpcServer.IContext context, IMessage request)
{
IMessage reply = context.Message("pong");
return Task.FromResult(reply);
}
}
}
Loading