Skip to content

Commit ec078cc

Browse files
committed
Change the message interface from object to
byte array. Change the Message body section from AMQPValue to Data. It is better for cross protocol use cases Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 6322854 commit ec078cc

File tree

10 files changed

+61
-56
lines changed

10 files changed

+61
-56
lines changed

RabbitMQ.AMQP.Client/IMessage.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@ public interface IMessage
9696
public object Annotation(string key);
9797
public IMessage Annotation(string key, object value);
9898

99-
public object Body();
99+
public byte[] Body();
100+
101+
public string BodyAsString();
102+
103+
100104
public IMessage Body(object body);
101105

102106
IMessageAddressBuilder ToAddress();

RabbitMQ.AMQP.Client/IRpcServer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public interface IRpcServer : ILifeCycle
7878

7979
public interface IContext
8080
{
81-
IMessage Message(object body);
81+
IMessage Message(byte[] body);
82+
IMessage Message(string body);
8283
}
8384
}
8485
}

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,18 @@ public AmqpMessage()
2424
NativeMessage = new Message();
2525
}
2626

27-
public AmqpMessage(object body)
27+
public AmqpMessage(byte[] body)
2828
{
29-
NativeMessage = new Message(body);
29+
NativeMessage = new Message();
30+
NativeMessage.BodySection = new Data { Binary = body };
31+
}
32+
33+
// This constructor is used for string body
34+
// Like AmqpMessage(byte[] body) but for string with UTF8 encoding
35+
public AmqpMessage(string body)
36+
{
37+
NativeMessage = new Message();
38+
NativeMessage.BodySection = new Data() { Binary = System.Text.Encoding.UTF8.GetBytes(body) };
3039
}
3140

3241
public AmqpMessage(Message nativeMessage)
@@ -245,35 +254,40 @@ public object Annotation(string key)
245254
return NativeMessage.MessageAnnotations[new Symbol(key)];
246255
}
247256

248-
public object Body()
257+
public byte[] Body()
258+
{
259+
return (byte[])NativeMessage.Body;
260+
}
261+
262+
public string BodyAsString()
249263
{
250-
return NativeMessage.Body;
264+
if (NativeMessage.BodySection is Data data)
265+
{
266+
return System.Text.Encoding.UTF8.GetString(data.Binary);
267+
}
268+
else
269+
{
270+
throw new InvalidOperationException("Body is not an Application Data");
271+
}
272+
251273
}
252274

253275
public IMessage Body(object body)
254276
{
255277
RestrictedDescribed bodySection;
256278
if (body is byte[] byteArray)
257279
{
258-
bodySection = new Data
259-
{
260-
Binary = byteArray
261-
};
280+
bodySection = new Data { Binary = byteArray };
262281
}
263282
else if (body is IList list)
264283
{
265-
bodySection = new AmqpSequence
266-
{
267-
List = list
268-
};
284+
bodySection = new AmqpSequence { List = list };
269285
}
270286
else
271287
{
272-
bodySection = new AmqpValue
273-
{
274-
Value = body
275-
};
288+
bodySection = new AmqpValue { Value = body };
276289
}
290+
277291
NativeMessage.BodySection = bodySection;
278292
return this;
279293
}

RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ await Utils.WaitWithBackOffUntilFuncAsync(async () =>
166166

167167
private class RpcServerContext : IRpcServer.IContext
168168
{
169-
public IMessage Message(object body) => new AmqpMessage(body);
169+
public IMessage Message(byte[] body) => new AmqpMessage(body);
170+
public IMessage Message(string body) => new AmqpMessage(body);
170171
}
171172

172173
public override async Task CloseAsync()

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,9 @@ RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime() -> System.DateTime
245245
RabbitMQ.AMQP.Client.IMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!
246246
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key) -> object!
247247
RabbitMQ.AMQP.Client.IMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
248-
RabbitMQ.AMQP.Client.IMessage.Body() -> object!
248+
RabbitMQ.AMQP.Client.IMessage.Body() -> byte[]!
249249
RabbitMQ.AMQP.Client.IMessage.Body(object! body) -> RabbitMQ.AMQP.Client.IMessage!
250+
RabbitMQ.AMQP.Client.IMessage.BodyAsString() -> string!
250251
RabbitMQ.AMQP.Client.IMessage.ContentEncoding() -> string!
251252
RabbitMQ.AMQP.Client.IMessage.ContentEncoding(string! contentEncoding) -> RabbitMQ.AMQP.Client.IMessage!
252253
RabbitMQ.AMQP.Client.IMessage.ContentType() -> string!
@@ -391,11 +392,13 @@ RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime() -> System.DateTime
391392
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IMessage!
392393
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage() -> void
393394
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(Amqp.Message! nativeMessage) -> void
394-
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(object! body) -> void
395+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(byte[]! body) -> void
396+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.AmqpMessage(string! body) -> void
395397
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key) -> object!
396398
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Annotation(string! key, object! value) -> RabbitMQ.AMQP.Client.IMessage!
397-
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> object!
399+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body() -> byte[]!
398400
RabbitMQ.AMQP.Client.Impl.AmqpMessage.Body(object! body) -> RabbitMQ.AMQP.Client.IMessage!
401+
RabbitMQ.AMQP.Client.Impl.AmqpMessage.BodyAsString() -> string!
399402
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentEncoding() -> string!
400403
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentEncoding(string! contentEncoding) -> RabbitMQ.AMQP.Client.IMessage!
401404
RabbitMQ.AMQP.Client.Impl.AmqpMessage.ContentType() -> string!
@@ -674,7 +677,8 @@ RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestPostProcessor(System.Func<RabbitMQ
674677
RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
675678
RabbitMQ.AMQP.Client.IRpcServer
676679
RabbitMQ.AMQP.Client.IRpcServer.IContext
677-
RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(object! body) -> RabbitMQ.AMQP.Client.IMessage!
680+
RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(byte[]! body) -> RabbitMQ.AMQP.Client.IMessage!
681+
RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(string! body) -> RabbitMQ.AMQP.Client.IMessage!
678682
RabbitMQ.AMQP.Client.IRpcServerBuilder
679683
RabbitMQ.AMQP.Client.IRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task<RabbitMQ.AMQP.Client.IRpcServer!>!
680684
RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func<RabbitMQ.AMQP.Client.IMessage!, object!>? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!

Tests/Amqp091/FromToAmqp091Tests.cs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
44

5+
using System;
56
using System.Collections.Generic;
67
using System.Threading;
78
using System.Threading.Tasks;
@@ -28,36 +29,16 @@ public async Task ToAmqp091()
2829
await queueSpec.DeclareAsync();
2930

3031
var publisher = await _connection.PublisherBuilder().BuildAsync();
31-
// string text = JsonConvert.SerializeObject(message); //produces {"Text":"as","Seq":1,"Max":7000}
3232
byte[] body = System.Text.Encoding.UTF8.GetBytes("{Text:as,Seq:1,Max:7000}");
3333
IMessage amqpMessage = new AmqpMessage(body).ToAddress().Queue(_queueName).Build();
34-
for (int i = 0; i < 100; i++)
34+
for (int i = 0; i < 1; i++)
3535
{
3636
PublishResult result = await publisher.PublishAsync(message: amqpMessage).ConfigureAwait(true);
3737
Assert.NotNull(result);
3838
Assert.Equal(OutcomeState.Accepted, result.Outcome.State);
3939
}
4040

4141

42-
// TaskCompletionSource<IMessage> tcs = new();
43-
// IConsumer consumer = await _connection.ConsumerBuilder()
44-
// .Queue(queueSpec)
45-
// .MessageHandler((context, message) =>
46-
// {
47-
// tcs.SetResult(message);
48-
// context.Accept();
49-
// return Task.CompletedTask;
50-
// }
51-
// ).BuildAndStartAsync();
52-
//
53-
// IMessage receivedMessage = await tcs.Task;
54-
// Assert.NotNull(receivedMessage);
55-
// // get the string form bytes
56-
//
57-
// string receivedMessageBody = System.Text.Encoding.UTF8.GetString((byte[])receivedMessage.Body());
58-
// Assert.Equal("{Text:as,Seq:1,Max:7000}", receivedMessageBody);
59-
60-
6142
var factory = new ConnectionFactory();
6243
var connection = await factory.CreateConnectionAsync();
6344
var channel = await connection.CreateChannelAsync();

Tests/Consumer/BasicConsumerTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public async Task SimpleConsumeMessage()
3838

3939
await WhenTcsCompletes(tcs);
4040
IMessage receivedMessage = await tcs.Task;
41-
Assert.Equal("message_0", receivedMessage.Body());
41+
Assert.Equal("message_0", receivedMessage.BodyAsString());
4242

4343
await consumer.CloseAsync();
4444
consumer.Dispose();
@@ -69,7 +69,7 @@ public async Task ConsumerReQueueMessage()
6969
{
7070
try
7171
{
72-
Assert.Equal("message_0", message.Body());
72+
Assert.Equal("message_0", message.BodyAsString());
7373
Interlocked.Increment(ref consumed);
7474
switch (consumed)
7575
{
@@ -167,7 +167,7 @@ Task MessageHandler(IContext cxt, IMessage msg)
167167
{
168168
if (i % 2 == 0)
169169
{
170-
Assert.Equal($"message_{i}", receivedMessagesFromTask[i].Body());
170+
Assert.Equal($"message_{i}", receivedMessagesFromTask[i].BodyAsString());
171171
}
172172
}
173173
}

Tests/MessagesTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public void ValidateMessage()
2222
Assert.Equal("CorrelationId_2123", message.CorrelationId());
2323
Assert.Equal("ReplyTo_5123", message.ReplyTo());
2424
Assert.Equal("Subject_9123", message.Subject());
25-
Assert.Equal("my_body", message.Body());
25+
Assert.Equal("my_body", message.BodyAsString());
2626
}
2727

2828
[Fact]

Tests/Rpc/RecoveryRPCTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection()
6767
{
6868
IMessage response = await rpcClient.PublishAsync(request);
6969
messagesConfirmed++;
70-
Assert.Equal("pong", response.Body());
70+
Assert.Equal("pong", response.BodyAsString());
7171
}
7272
catch (AmqpNotOpenException)
7373
{

Tests/Rpc/RpcServerTests.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
5757

5858
await p.PublishAsync(new AmqpMessage("test"));
5959
IMessage m = await WhenTcsCompletes(tcs);
60-
Assert.Equal("pong", m.Body());
60+
Assert.Equal("pong", m.BodyAsString());
6161
await rpcServer.CloseAsync();
6262
}
6363

@@ -143,7 +143,7 @@ Task MessageHandler(IContext context, IMessage message)
143143
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
144144

145145
IMessage m = await WhenTcsCompletes(tcs);
146-
Assert.Equal("pong", m.Body());
146+
Assert.Equal("pong", m.BodyAsString());
147147

148148
await rpcServer.CloseAsync();
149149
await consumer.CloseAsync();
@@ -173,7 +173,7 @@ public async Task RpcServerClientPingPongWithDefault()
173173
IMessage message = new AmqpMessage("ping");
174174

175175
IMessage response = await rpcClient.PublishAsync(message);
176-
Assert.Equal("pong", response.Body());
176+
Assert.Equal("pong", response.BodyAsString());
177177
await rpcClient.CloseAsync();
178178
await rpcServer.CloseAsync();
179179
}
@@ -209,7 +209,7 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
209209
IMessage message = new AmqpMessage("ping");
210210

211211
IMessage response = await rpcClient.PublishAsync(message);
212-
Assert.Equal("pong", response.Body());
212+
Assert.Equal("pong", response.BodyAsString());
213213
Assert.Equal(_correlationId, response.CorrelationId());
214214
await rpcClient.CloseAsync();
215215
await rpcServer.CloseAsync();
@@ -265,7 +265,7 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
265265
while (i < 30)
266266
{
267267
IMessage response = await rpcClient.PublishAsync(message);
268-
Assert.Equal("pong", response.Body());
268+
Assert.Equal("pong", response.BodyAsString());
269269
// the server replies with the correlation id in the application properties
270270
Assert.Equal($"{_correlationId}_{i}", response.Property("correlationId"));
271271
Assert.Equal($"{_correlationId}_{i}", response.Properties()["correlationId"]);
@@ -324,7 +324,7 @@ Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request)
324324
{
325325
IMessage message = new AmqpMessage("ping").Property("id", i1);
326326
IMessage response = await rpcClient.PublishAsync(message);
327-
Assert.Equal("pong", response.Body());
327+
Assert.Equal("pong", response.BodyAsString());
328328
}));
329329
}
330330

@@ -376,7 +376,7 @@ static async Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage req
376376

377377
IMessage msg = new AmqpMessage("ping").Property("wait", 1);
378378
IMessage reply = await rpcClient.PublishAsync(msg);
379-
Assert.Equal("pong", reply.Body());
379+
Assert.Equal("pong", reply.BodyAsString());
380380

381381
await Assert.ThrowsAsync<TimeoutException>(() => rpcClient.PublishAsync(
382382
new AmqpMessage("ping").Property("wait", 700)));

0 commit comments

Comments
 (0)