Skip to content

Commit e247e8b

Browse files
authored
Merge pull request #1628 from bollhals/fix/InboundCommand
make IncomingCommand a class and simplify code around it
2 parents 3787d61 + 462513d commit e247e8b

File tree

6 files changed

+65
-117
lines changed

6 files changed

+65
-117
lines changed

projects/RabbitMQ.Client/client/RentedMemory.cs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ namespace RabbitMQ.Client
3636
{
3737
internal struct RentedMemory : IDisposable
3838
{
39-
private bool _disposedValue;
40-
4139
internal RentedMemory(byte[] rentedArray)
4240
: this(new ReadOnlyMemory<byte>(rentedArray), rentedArray)
4341
{
@@ -47,21 +45,20 @@ internal RentedMemory(ReadOnlyMemory<byte> memory, byte[] rentedArray)
4745
{
4846
Memory = memory;
4947
RentedArray = rentedArray;
50-
_disposedValue = false;
5148
}
5249

53-
internal readonly ReadOnlyMemory<byte> Memory;
54-
55-
internal readonly byte[] ToArray()
56-
{
57-
return Memory.ToArray();
58-
}
50+
internal ReadOnlyMemory<byte> Memory;
5951

6052
internal readonly int Size => Memory.Length;
6153

6254
internal readonly ReadOnlySpan<byte> Span => Memory.Span;
6355

64-
internal readonly byte[] RentedArray;
56+
internal byte[] RentedArray;
57+
58+
internal readonly byte[] ToArray()
59+
{
60+
return Memory.ToArray();
61+
}
6562

6663
internal readonly ReadOnlyMemory<byte> CopyToMemory()
6764
{
@@ -70,14 +67,11 @@ internal readonly ReadOnlyMemory<byte> CopyToMemory()
7067

7168
public void Dispose()
7269
{
73-
if (!_disposedValue)
70+
if (RentedArray != null)
7471
{
75-
if (RentedArray != null)
76-
{
77-
ArrayPool<byte>.Shared.Return(RentedArray);
78-
}
79-
80-
_disposedValue = true;
72+
ArrayPool<byte>.Shared.Return(RentedArray);
73+
RentedArray = default;
74+
Memory = default;
8175
}
8276
}
8377
}

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,17 +88,17 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
8888
}
8989
case ProtocolCommandId.BasicAck:
9090
{
91-
HandleBasicAck(in cmd);
91+
HandleBasicAck(cmd);
9292
return Task.FromResult(true);
9393
}
9494
case ProtocolCommandId.BasicNack:
9595
{
96-
HandleBasicNack(in cmd);
96+
HandleBasicNack(cmd);
9797
return Task.FromResult(true);
9898
}
9999
case ProtocolCommandId.BasicReturn:
100100
{
101-
HandleBasicReturn(in cmd);
101+
HandleBasicReturn(cmd);
102102
return Task.FromResult(true);
103103
}
104104
case ProtocolCommandId.ChannelClose:
@@ -118,7 +118,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
118118
}
119119
case ProtocolCommandId.ConnectionBlocked:
120120
{
121-
HandleConnectionBlocked(in cmd);
121+
HandleConnectionBlocked(cmd);
122122
return Task.FromResult(true);
123123
}
124124
case ProtocolCommandId.ConnectionClose:
@@ -143,7 +143,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
143143
}
144144
case ProtocolCommandId.ConnectionUnblocked:
145145
{
146-
HandleConnectionUnblocked(in cmd);
146+
HandleConnectionUnblocked(cmd);
147147
return Task.FromResult(true);
148148
}
149149
default:

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
5454
internal TaskCompletionSource<ConnectionStartDetails> m_connectionStartCell;
5555
private Exception m_connectionStartException = null;
5656

57-
// AMQP only allows one RPC operation to be active at a time.
57+
// AMQP only allows one RPC operation to be active at a time.
5858
protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
5959
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
6060
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);
@@ -425,8 +425,8 @@ private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken can
425425
/*
426426
* If DispatchCommandAsync returns `true`, it means that the incoming command is server-originated, and has
427427
* already been handled.
428-
*
429-
* Else, the incoming command is the return of an RPC call, and must be handled.
428+
*
429+
* Else, the incoming command is the return of an RPC call, and must be handled.
430430
*/
431431
if (false == await DispatchCommandAsync(cmd, cancellationToken)
432432
.ConfigureAwait(false))
@@ -561,7 +561,7 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart
561561
return ModelSendAsync(method, cancellationToken).AsTask();
562562
}
563563

564-
protected void HandleBasicAck(in IncomingCommand cmd)
564+
protected void HandleBasicAck(IncomingCommand cmd)
565565
{
566566
try
567567
{
@@ -580,7 +580,7 @@ protected void HandleBasicAck(in IncomingCommand cmd)
580580
}
581581
}
582582

583-
protected void HandleBasicNack(in IncomingCommand cmd)
583+
protected void HandleBasicNack(IncomingCommand cmd)
584584
{
585585
try
586586
{
@@ -679,17 +679,17 @@ await ConsumerDispatcher.HandleBasicDeliverAsync(
679679
method._exchange,
680680
method._routingKey,
681681
header,
682-
cmd.Body,
682+
/*
683+
* Takeover Body so it doesn't get returned as it is necessary
684+
* for handling the Basic.Deliver method by client code.
685+
*/
686+
cmd.TakeoverBody(),
683687
cancellationToken).ConfigureAwait(false);
684688
return true;
685689
}
686690
finally
687691
{
688-
/*
689-
* Note: do not return the Body as it is necessary for handling
690-
* the Basic.Deliver method by client code
691-
*/
692-
cmd.ReturnMethodAndHeaderBuffers();
692+
cmd.ReturnBuffers();
693693
}
694694
}
695695

@@ -698,7 +698,7 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)
698698
return deliveryTag;
699699
}
700700

701-
protected void HandleBasicReturn(in IncomingCommand cmd)
701+
protected void HandleBasicReturn(IncomingCommand cmd)
702702
{
703703
try
704704
{
@@ -800,7 +800,7 @@ await ModelSendAsync(method, cancellationToken).
800800
}
801801
}
802802

803-
protected void HandleConnectionBlocked(in IncomingCommand cmd)
803+
protected void HandleConnectionBlocked(IncomingCommand cmd)
804804
{
805805
try
806806
{
@@ -851,7 +851,7 @@ await ModelSendAsync(replyMethod, cancellationToken)
851851
protected async Task<bool> HandleConnectionSecureAsync(IncomingCommand _)
852852
{
853853
var k = (ConnectionSecureOrTuneAsyncRpcContinuation)_continuationQueue.Next();
854-
await k.HandleCommandAsync(IncomingCommand.Empty)
854+
await k.HandleCommandAsync(new IncomingCommand())
855855
.ConfigureAwait(false); // release the continuation.
856856
return true;
857857
}
@@ -903,7 +903,7 @@ await k.HandleCommandAsync(cmd)
903903
return true;
904904
}
905905

906-
protected void HandleConnectionUnblocked(in IncomingCommand cmd)
906+
protected void HandleConnectionUnblocked(IncomingCommand cmd)
907907
{
908908
try
909909
{

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -44,34 +44,19 @@ internal sealed class CommandAssembler
4444
{
4545
private const int MaxArrayOfBytesSize = 2_147_483_591;
4646

47-
private ProtocolCommandId _commandId;
48-
private RentedMemory _methodMemory;
49-
private RentedMemory _headerMemory;
50-
private RentedMemory _bodyMemory;
47+
private readonly IncomingCommand _currentCommand;
48+
private readonly uint _maxBodyLength;
49+
5150
private int _remainingBodyByteCount;
52-
private int _offset;
5351
private AssemblyState _state;
5452

55-
private readonly uint _maxBodyLength;
56-
5753
public CommandAssembler(uint maxBodyLength)
5854
{
55+
_currentCommand = new IncomingCommand();
5956
_maxBodyLength = maxBodyLength;
60-
Reset();
61-
}
62-
63-
private void Reset()
64-
{
65-
_commandId = default;
66-
_methodMemory = default;
67-
_headerMemory = default;
68-
_bodyMemory = default;
69-
_remainingBodyByteCount = 0;
70-
_offset = 0;
71-
_state = AssemblyState.ExpectingMethod;
7257
}
7358

74-
public void HandleFrame(InboundFrame frame, out IncomingCommand command)
59+
public IncomingCommand? HandleFrame(InboundFrame frame)
7560
{
7661
switch (_state)
7762
{
@@ -88,13 +73,14 @@ public void HandleFrame(InboundFrame frame, out IncomingCommand command)
8873

8974
if (_state != AssemblyState.Complete)
9075
{
91-
command = IncomingCommand.Empty;
92-
return;
76+
return default;
9377
}
9478

9579
RabbitMqClientEventSource.Log.CommandReceived();
96-
command = new IncomingCommand(_commandId, _methodMemory, _headerMemory, _bodyMemory);
97-
Reset();
80+
_remainingBodyByteCount = 0;
81+
_state = AssemblyState.ExpectingMethod;
82+
83+
return _currentCommand;
9884
}
9985

10086
private void ParseMethodFrame(InboundFrame frame)
@@ -104,10 +90,10 @@ private void ParseMethodFrame(InboundFrame frame)
10490
throw new UnexpectedFrameException(frame.Type);
10591
}
10692

107-
_commandId = (ProtocolCommandId)NetworkOrderDeserializer.ReadUInt32(frame.Payload.Span);
108-
_methodMemory = frame.TakeoverPayload(Framing.Method.ArgumentsOffset);
93+
_currentCommand.CommandId = (ProtocolCommandId)NetworkOrderDeserializer.ReadUInt32(frame.Payload.Span);
94+
_currentCommand.Method = frame.TakeoverPayload(Framing.Method.ArgumentsOffset);
10995

110-
switch (_commandId)
96+
switch (_currentCommand.CommandId)
11197
{
11298
// Commands with payload
11399
case ProtocolCommandId.BasicGetOk:
@@ -154,7 +140,7 @@ private void ParseHeaderFrame(InboundFrame frame)
154140
}
155141
else
156142
{
157-
_headerMemory = frame.TakeoverPayload(Framing.Header.HeaderArgumentOffset);
143+
_currentCommand.Header = frame.TakeoverPayload(Framing.Header.HeaderArgumentOffset);
158144
}
159145

160146
_remainingBodyByteCount = (int)totalBodyBytes;
@@ -174,25 +160,25 @@ private void ParseBodyFrame(InboundFrame frame)
174160
throw new MalformedFrameException($"Overlong content body received - {_remainingBodyByteCount} bytes remaining, {payloadLength} bytes received");
175161
}
176162

177-
if (_bodyMemory.RentedArray is null)
163+
if (_currentCommand.Body.RentedArray is null)
178164
{
179165
// check for single frame payload for an early exit
180166
if (payloadLength == _remainingBodyByteCount)
181167
{
182-
_bodyMemory = frame.TakeoverPayload(0);
168+
_currentCommand.Body = frame.TakeoverPayload(0);
183169
_state = AssemblyState.Complete;
184170
return;
185171
}
186172

187173
// Is returned by IncomingCommand.ReturnPayload in Session.HandleFrame
188174
var rentedBodyArray = ArrayPool<byte>.Shared.Rent(_remainingBodyByteCount);
189-
_bodyMemory = new RentedMemory(new ReadOnlyMemory<byte>(rentedBodyArray, 0, _remainingBodyByteCount), rentedBodyArray);
175+
_currentCommand.Body.RentedArray = rentedBodyArray;
176+
_currentCommand.Body.Memory = new ReadOnlyMemory<byte>(rentedBodyArray, 0, _remainingBodyByteCount);
190177
}
191178

192-
frame.Payload.Span.CopyTo(_bodyMemory.RentedArray.AsSpan(_offset));
179+
frame.Payload.Span.CopyTo(_currentCommand.Body.RentedArray.AsSpan(_currentCommand.Body.Memory.Length - _remainingBodyByteCount));
193180
frame.TryReturnPayload();
194181
_remainingBodyByteCount -= payloadLength;
195-
_offset += payloadLength;
196182
UpdateContentBodyState();
197183
}
198184

projects/RabbitMQ.Client/client/impl/IncomingCommand.cs

Lines changed: 13 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -34,60 +34,29 @@
3434

3535
namespace RabbitMQ.Client.Impl
3636
{
37-
internal readonly struct IncomingCommand
37+
internal sealed class IncomingCommand
3838
{
39-
public static readonly IncomingCommand Empty = default;
39+
public ProtocolCommandId CommandId;
4040

41-
public readonly ProtocolCommandId CommandId;
41+
public RentedMemory Method;
42+
public RentedMemory Header;
43+
public RentedMemory Body;
4244

43-
public readonly RentedMemory Method;
44-
public readonly RentedMemory Header;
45-
public readonly RentedMemory Body;
45+
public ReadOnlySpan<byte> MethodSpan => Method.Memory.Span;
46+
public ReadOnlySpan<byte> HeaderSpan => Header.Memory.Span;
47+
public ReadOnlySpan<byte> BodySpan => Body.Memory.Span;
4648

47-
public readonly bool IsEmpty => CommandId is default(ProtocolCommandId);
48-
49-
public IncomingCommand(ProtocolCommandId commandId,
50-
RentedMemory method, RentedMemory header, RentedMemory body)
51-
{
52-
CommandId = commandId;
53-
Method = method;
54-
Header = header;
55-
Body = body;
56-
}
57-
58-
public ReadOnlySpan<byte> MethodSpan
49+
public RentedMemory TakeoverBody()
5950
{
60-
get
61-
{
62-
return Method.Memory.Span;
63-
}
51+
RentedMemory body = Body;
52+
Body = default;
53+
return body;
6454
}
6555

66-
public ReadOnlySpan<byte> HeaderSpan
67-
{
68-
get
69-
{
70-
return Header.Memory.Span;
71-
}
72-
}
73-
74-
public ReadOnlySpan<byte> BodySpan
75-
{
76-
get
77-
{
78-
return Body.Memory.Span;
79-
}
80-
}
81-
82-
public void ReturnMethodAndHeaderBuffers()
56+
public void ReturnBuffers()
8357
{
8458
Method.Dispose();
8559
Header.Dispose();
86-
}
87-
88-
public void ReturnBuffers()
89-
{
90-
ReturnMethodAndHeaderBuffers();
9160
Body.Dispose();
9261
}
9362
}

projects/RabbitMQ.Client/client/impl/Session.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,8 @@ public Session(Connection connection, ushort channelNumber, uint maxBodyLength)
4848

4949
public override Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken)
5050
{
51-
_assembler.HandleFrame(frame, out IncomingCommand cmd);
52-
53-
if (cmd.IsEmpty)
51+
IncomingCommand cmd = _assembler.HandleFrame(frame);
52+
if (cmd is null)
5453
{
5554
return Task.CompletedTask;
5655
}

0 commit comments

Comments
 (0)