Skip to content

Commit a1eef4c

Browse files
committed
* Pass method RentedMemory to consumer dispatchers
1 parent f39ff6f commit a1eef4c

File tree

12 files changed

+245
-120
lines changed

12 files changed

+245
-120
lines changed

projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,15 @@ public async Task SetUpAsyncConsumer()
5151
[Benchmark]
5252
public async Task AsyncConsumerDispatcher()
5353
{
54+
var m = new MethodBasicDeliver();
55+
m.SetUp();
56+
using (RentedMemory method = new RentedMemory(m.Buffer.ToArray()))
5457
using (RentedMemory body = new RentedMemory(_body))
5558
{
5659
for (int i = 0; i < Count; i++)
5760
{
58-
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag,
59-
false, _exchange, _routingKey, _properties, body, CancellationToken.None);
61+
await _dispatcher.HandleBasicDeliverAsync(_deliveryTag,
62+
false, _properties, method, body, CancellationToken.None);
6063
}
6164
_autoResetEvent.Wait();
6265
_autoResetEvent.Reset();
@@ -74,12 +77,15 @@ public async Task SetUpConsumer()
7477
[Benchmark]
7578
public async Task ConsumerDispatcher()
7679
{
80+
var m = new MethodBasicDeliver();
81+
m.SetUp();
82+
using (RentedMemory method = new RentedMemory(m.Buffer.ToArray()))
7783
using (RentedMemory body = new RentedMemory(_body))
7884
{
7985
for (int i = 0; i < Count; i++)
8086
{
81-
await _dispatcher.HandleBasicDeliverAsync(_consumerTag, _deliveryTag,
82-
false, _exchange, _routingKey, _properties, body, CancellationToken.None);
87+
await _dispatcher.HandleBasicDeliverAsync(_deliveryTag,
88+
false, _properties, method, body, CancellationToken.None);
8389
}
8490
_autoResetEvent.Wait();
8591
_autoResetEvent.Reset();

projects/Benchmarks/WireFormatting/MethodSerialization.cs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace RabbitMQ.Benchmarks
1111
[BenchmarkCategory("Methods")]
1212
public class MethodSerializationBase
1313
{
14-
protected readonly Memory<byte> _buffer = new byte[1024];
14+
public readonly Memory<byte> Buffer = new byte[1024];
1515

1616
[GlobalSetup]
1717
public virtual void SetUp() { }
@@ -20,13 +20,13 @@ public virtual void SetUp() { }
2020
public class MethodBasicAck : MethodSerializationBase
2121
{
2222
private readonly BasicAck _basicAck = new BasicAck(ulong.MaxValue, true);
23-
public override void SetUp() => _basicAck.WriteTo(_buffer.Span);
23+
public override void SetUp() => _basicAck.WriteTo(Buffer.Span);
2424

2525
[Benchmark]
26-
public ulong BasicAckRead() => new BasicAck(_buffer.Span)._deliveryTag; // return one property to not box when returning an object instead
26+
public ulong BasicAckRead() => new BasicAck(Buffer.Span)._deliveryTag; // return one property to not box when returning an object instead
2727

2828
[Benchmark]
29-
public int BasicAckWrite() => _basicAck.WriteTo(_buffer.Span);
29+
public int BasicAckWrite() => _basicAck.WriteTo(Buffer.Span);
3030
}
3131

3232
public class MethodBasicDeliver : MethodSerializationBase
@@ -37,21 +37,21 @@ public class MethodBasicDeliver : MethodSerializationBase
3737

3838
public override void SetUp()
3939
{
40-
int offset = Client.Impl.WireFormatting.WriteShortstr(ref _buffer.Span.GetStart(), string.Empty);
41-
offset += Client.Impl.WireFormatting.WriteLonglong(ref _buffer.Span.GetOffset(offset), 0);
42-
offset += Client.Impl.WireFormatting.WriteBits(ref _buffer.Span.GetOffset(offset), false);
43-
offset += Client.Impl.WireFormatting.WriteShortstr(ref _buffer.Span.GetOffset(offset), string.Empty);
44-
Client.Impl.WireFormatting.WriteShortstr(ref _buffer.Span.GetOffset(offset), string.Empty);
40+
int offset = Client.Impl.WireFormatting.WriteShortstr(ref Buffer.Span.GetStart(), string.Empty);
41+
offset += Client.Impl.WireFormatting.WriteLonglong(ref Buffer.Span.GetOffset(offset), 0);
42+
offset += Client.Impl.WireFormatting.WriteBits(ref Buffer.Span.GetOffset(offset), false);
43+
offset += Client.Impl.WireFormatting.WriteShortstr(ref Buffer.Span.GetOffset(offset), string.Empty);
44+
Client.Impl.WireFormatting.WriteShortstr(ref Buffer.Span.GetOffset(offset), string.Empty);
4545
}
4646

4747
[Benchmark]
48-
public object BasicDeliverRead() => new BasicDeliver(_buffer)._consumerTag; // return one property to not box when returning an object instead
48+
public object BasicDeliverRead() => new BasicDeliver(Buffer)._consumerTag; // return one property to not box when returning an object instead
4949

5050
[Benchmark]
51-
public int BasicPublishWrite() => _basicPublish.WriteTo(_buffer.Span);
51+
public int BasicPublishWrite() => _basicPublish.WriteTo(Buffer.Span);
5252

5353
[Benchmark]
54-
public int BasicPublishMemoryWrite() => _basicPublishMemory.WriteTo(_buffer.Span);
54+
public int BasicPublishMemoryWrite() => _basicPublishMemory.WriteTo(Buffer.Span);
5555

5656
[Benchmark]
5757
public int BasicPublishSize() => _basicPublish.GetRequiredBufferSize();
@@ -64,25 +64,25 @@ public class MethodChannelClose : MethodSerializationBase
6464
{
6565
private readonly ChannelClose _channelClose = new ChannelClose(333, string.Empty, 0099, 2999);
6666

67-
public override void SetUp() => _channelClose.WriteTo(_buffer.Span);
67+
public override void SetUp() => _channelClose.WriteTo(Buffer.Span);
6868

6969
[Benchmark]
70-
public object ChannelCloseRead() => new ChannelClose(_buffer.Span)._replyText; // return one property to not box when returning an object instead
70+
public object ChannelCloseRead() => new ChannelClose(Buffer.Span)._replyText; // return one property to not box when returning an object instead
7171

7272
[Benchmark]
73-
public int ChannelCloseWrite() => _channelClose.WriteTo(_buffer.Span);
73+
public int ChannelCloseWrite() => _channelClose.WriteTo(Buffer.Span);
7474
}
7575

7676
public class MethodBasicProperties : MethodSerializationBase
7777
{
7878
private readonly IAmqpWriteable _basicProperties = new BasicProperties { Persistent = true, AppId = "AppId", ContentEncoding = "content", };
79-
public override void SetUp() => _basicProperties.WriteTo(_buffer.Span);
79+
public override void SetUp() => _basicProperties.WriteTo(Buffer.Span);
8080

8181
[Benchmark]
82-
public ReadOnlyBasicProperties BasicPropertiesRead() => new ReadOnlyBasicProperties(_buffer.Span);
82+
public ReadOnlyBasicProperties BasicPropertiesRead() => new ReadOnlyBasicProperties(Buffer.Span);
8383

8484
[Benchmark]
85-
public int BasicPropertiesWrite() => _basicProperties.WriteTo(_buffer.Span);
85+
public int BasicPropertiesWrite() => _basicProperties.WriteTo(Buffer.Span);
8686

8787
[Benchmark]
8888
public int BasicDeliverSize() => _basicProperties.GetRequiredBufferSize();

projects/RabbitMQ.Client/client/framing/BasicCancelOk.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,11 @@ public BasicCancelOk(ReadOnlySpan<byte> span)
4646
}
4747

4848
public ProtocolCommandId ProtocolCommandId => ProtocolCommandId.BasicCancelOk;
49+
50+
public static ConsumerTag GetConsumerTag(ReadOnlyMemory<byte> data)
51+
{
52+
WireFormatting.ReadShortMemory(data, out ReadOnlyMemory<byte> consumerTagMemory);
53+
return new ConsumerTag(consumerTagMemory);
54+
}
4955
}
5056
}

projects/RabbitMQ.Client/client/framing/BasicDeliver.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,11 @@ public BasicDeliver(ReadOnlyMemory<byte> data)
5454
}
5555

5656
public ProtocolCommandId ProtocolCommandId => ProtocolCommandId.BasicDeliver;
57+
58+
public static ConsumerTag GetConsumerTag(ReadOnlyMemory<byte> data)
59+
{
60+
WireFormatting.ReadShortMemory(data, out ReadOnlyMemory<byte> consumerTagMemory);
61+
return new ConsumerTag(consumerTagMemory);
62+
}
5763
}
5864
}

projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Diagnostics;
3433
using System.Runtime.CompilerServices;
3534
using System.Threading;
3635
using System.Threading.Tasks;
@@ -226,10 +225,8 @@ public override async Task HandleCommandAsync(IncomingCommand cmd)
226225
{
227226
if (cmd.CommandId == ProtocolCommandId.BasicCancelOk)
228227
{
229-
var method = new BasicCancelOk(cmd.MethodSpan);
230228
_tcs.TrySetResult(true);
231-
Debug.Assert(_consumerTag == method._consumerTag);
232-
await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationToken)
229+
await _consumerDispatcher.HandleBasicCancelOkAsync(cmd.Method, CancellationToken)
233230
.ConfigureAwait(false);
234231
}
235232
else
@@ -239,7 +236,7 @@ await _consumerDispatcher.HandleBasicCancelOkAsync(_consumerTag, CancellationTok
239236
}
240237
finally
241238
{
242-
cmd.ReturnBuffers();
239+
cmd.ReturnHeaderAndBodyBuffers();
243240
}
244241
}
245242
}
@@ -276,6 +273,10 @@ await _consumerDispatcher.HandleBasicConsumeOkAsync(_consumer, ct, CancellationT
276273
}
277274
finally
278275
{
276+
/*
277+
* Note:
278+
* OK to return buffers here as we copy the string value for the consumer tag above
279+
*/
279280
cmd.ReturnBuffers();
280281
}
281282
}

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -655,14 +655,13 @@ protected async Task<bool> HandleBasicCancelAsync(IncomingCommand cmd, Cancellat
655655
{
656656
try
657657
{
658-
ConsumerTag consumerTag = BasicCancel.GetConsumerTag(cmd.MethodMemory);
659-
await ConsumerDispatcher.HandleBasicCancelAsync(consumerTag, cancellationToken)
658+
await ConsumerDispatcher.HandleBasicCancelAsync(cmd.Method, cancellationToken)
660659
.ConfigureAwait(false);
661660
return true;
662661
}
663662
finally
664663
{
665-
cmd.ReturnBuffers();
664+
cmd.ReturnHeaderAndBodyBuffers();
666665
}
667666
}
668667

@@ -672,29 +671,23 @@ protected async Task<bool> HandleBasicDeliverAsync(IncomingCommand cmd, Cancella
672671
{
673672
var method = new BasicDeliver(cmd.MethodMemory);
674673
var header = new ReadOnlyBasicProperties(cmd.HeaderSpan);
675-
676-
var consumerTag = new ConsumerTag(method._consumerTag);
677-
var exchangeName = new ExchangeName(method._exchange);
678-
var routingKey = new RoutingKey(method._routingKey);
679-
680674
await ConsumerDispatcher.HandleBasicDeliverAsync(
681-
consumerTag,
682675
AdjustDeliveryTag(method._deliveryTag),
683676
method._redelivered,
684-
exchangeName,
685-
routingKey,
686677
header,
678+
cmd.Method,
687679
cmd.Body,
688680
cancellationToken).ConfigureAwait(false);
689681
return true;
690682
}
691683
finally
692684
{
693685
/*
694-
* Note: do not return the Body as it is necessary for handling
686+
* Note: do not return the Method or Body buffers as they
687+
* are necessary for handling
695688
* the Basic.Deliver method by client code
696689
*/
697-
cmd.ReturnMethodAndHeaderBuffers();
690+
cmd.ReturnHeaderBuffers();
698691
}
699692
}
700693

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Threading;
33
using System.Threading.Tasks;
44
using RabbitMQ.Client.Events;
5+
using RabbitMQ.Client.Framing.Impl;
56
using RabbitMQ.Client.Impl;
67

78
namespace RabbitMQ.Client.ConsumerDispatching
@@ -26,22 +27,56 @@ protected override async Task ProcessChannelAsync(CancellationToken token)
2627
{
2728
try
2829
{
29-
Task task = work.WorkType switch
30+
Task task = Task.CompletedTask;
31+
switch (work.WorkType)
3032
{
31-
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
32-
work.ConsumerTag, work.DeliveryTag, work.Redelivered,
33-
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),
33+
case WorkType.Deliver:
34+
{
35+
IAsyncBasicConsumer? deliverConsumer = work.AsyncConsumer ??
36+
throw new InvalidOperationException("[CRITICAL] should never see this");
37+
ConsumerTag? deliverConsumerTag = work.ConsumerTag;
38+
var method = new BasicDeliver(work.Method.Memory);
39+
var exchangeName = new ExchangeName(method._exchange);
40+
var routingKey = new RoutingKey(method._routingKey);
41+
task = deliverConsumer.HandleBasicDeliver(
42+
deliverConsumerTag, work.DeliveryTag, work.Redelivered,
43+
exchangeName, routingKey, work.BasicProperties, work.Body.Memory);
44+
}
45+
break;
46+
case WorkType.Cancel:
47+
{
48+
RentedMemory cancelMethodMemory = work.Method;
49+
ConsumerTag cancelConsumerTag = BasicCancel.GetConsumerTag(cancelMethodMemory.Memory);
50+
IAsyncBasicConsumer cancelConsumer = (IAsyncBasicConsumer)GetAndRemoveConsumer(cancelConsumerTag);
51+
task = cancelConsumer.HandleBasicCancel(cancelConsumerTag);
52+
}
53+
break;
54+
case WorkType.CancelOk:
55+
{
56+
RentedMemory cancelOkMethodMemory = work.Method;
57+
ConsumerTag cancelOkConsumerTag = BasicCancelOk.GetConsumerTag(cancelOkMethodMemory.Memory);
58+
IAsyncBasicConsumer cancelOkConsumer = (IAsyncBasicConsumer)GetAndRemoveConsumer(cancelOkConsumerTag);
59+
task = cancelOkConsumer.HandleBasicCancelOk(cancelOkConsumerTag);
60+
}
61+
break;
62+
case WorkType.ConsumeOk:
63+
{
64+
IAsyncBasicConsumer? consumeOkConsumer = work.AsyncConsumer ??
65+
throw new InvalidOperationException("[CRITICAL] should never see this");
66+
task = consumeOkConsumer.HandleBasicConsumeOk(work.ConsumerTag);
67+
}
68+
break;
69+
case WorkType.Shutdown:
70+
{
71+
IAsyncBasicConsumer? shutdownConsumer = work.AsyncConsumer ??
72+
throw new InvalidOperationException("[CRITICAL] should never see this");
73+
task = shutdownConsumer.HandleChannelShutdown(_channel, work.Reason);
74+
}
75+
break;
76+
default:
77+
throw new InvalidOperationException();
78+
}
3479

35-
WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),
36-
37-
WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),
38-
39-
WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),
40-
41-
WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),
42-
43-
_ => Task.CompletedTask
44-
};
4580
await task.ConfigureAwait(false);
4681
}
4782
catch (Exception e)

0 commit comments

Comments
 (0)