Skip to content

Commit 5e68abb

Browse files
GerardSmitlukebakken
authored andcommitted
Merge sequential memories
1 parent 892088d commit 5e68abb

File tree

7 files changed

+217
-98
lines changed

7 files changed

+217
-98
lines changed

projects/RabbitMQ.Client/client/ChunckedSequence.cs

Lines changed: 0 additions & 76 deletions
This file was deleted.

projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ internal class RentedOutgoingMemory : IDisposable, IResettable
1313
private static readonly ObjectPool<RentedOutgoingMemory> s_pool = ObjectPool.Create<RentedOutgoingMemory>();
1414

1515
private bool _disposedValue;
16+
private byte[]? _rentedArray;
1617
private TaskCompletionSource<bool>? _sendCompletionSource;
1718

18-
internal ReadOnlySequence<byte> Data;
19-
internal byte[]? RentedArray;
20-
2119
internal int Size => (int) Data.Length;
2220

21+
internal ReadOnlySequence<byte> Data { get; private set; }
22+
2323
/// <summary>
2424
/// Mark the data as sent.
2525
/// </summary>
@@ -28,6 +28,7 @@ public void DidSend()
2828
if (_sendCompletionSource is null)
2929
{
3030
Dispose();
31+
s_pool.Return(this);
3132
}
3233
else
3334
{
@@ -47,6 +48,7 @@ async ValueTask WaitForFinishCore()
4748
{
4849
await _sendCompletionSource.Task.ConfigureAwait(false);
4950
Dispose();
51+
s_pool.Return(this);
5052
}
5153
}
5254

@@ -65,9 +67,14 @@ private void Dispose(bool disposing)
6567
return;
6668
}
6769

68-
if (disposing && RentedArray != null)
70+
if (disposing)
6971
{
70-
ClientArrayPool.Return(RentedArray);
72+
if (_rentedArray != null)
73+
{
74+
ClientArrayPool.Return(_rentedArray);
75+
Data = default;
76+
_rentedArray = null;
77+
}
7178
}
7279

7380
_disposedValue = true;
@@ -87,18 +94,18 @@ bool IResettable.TryReset()
8794
}
8895

8996
_disposedValue = false;
90-
RentedArray = default;
91-
_sendCompletionSource = default;
97+
_rentedArray = default;
9298
Data = default;
99+
_sendCompletionSource = default;
93100
return true;
94101
}
95102

96-
public static RentedOutgoingMemory Create(ReadOnlySequence<byte> mem, byte[] buffer, bool waitSend = false)
103+
public static RentedOutgoingMemory GetAndInitialize(ReadOnlySequence<byte> mem, byte[] buffer, bool waitSend = false)
97104
{
98105
var rented = s_pool.Get();
99106

100107
rented.Data = mem;
101-
rented.RentedArray = buffer;
108+
rented._rentedArray = buffer;
102109

103110
if (waitSend)
104111
{
@@ -108,9 +115,9 @@ public static RentedOutgoingMemory Create(ReadOnlySequence<byte> mem, byte[] buf
108115
return rented;
109116
}
110117

111-
public static RentedOutgoingMemory Create(ReadOnlyMemory<byte> mem, byte[] buffer, bool waitSend = false)
118+
public static RentedOutgoingMemory GetAndInitialize(ReadOnlyMemory<byte> mem, byte[] buffer, bool waitSend = false)
112119
{
113-
return Create(new ReadOnlySequence<byte>(mem), buffer, waitSend);
120+
return GetAndInitialize(new ReadOnlySequence<byte>(mem), buffer, waitSend);
114121
}
115122
}
116123
}

projects/RabbitMQ.Client/client/impl/EmptyBasicProperty.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace RabbitMQ.Client.client.impl
88
#nullable enable
99
internal readonly struct EmptyBasicProperty : IReadOnlyBasicProperties, IAmqpHeader
1010
{
11-
internal static readonly EmptyBasicProperty Empty;
11+
internal static readonly EmptyBasicProperty Empty = default;
1212

1313
ushort IAmqpHeader.ProtocolClassId => ClassConstants.Basic;
1414

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ internal static class BodySegment
113113
public const int FrameSize = BaseFrameSize;
114114

115115
[MethodImpl(MethodImplOptions.AggressiveInlining)]
116-
public static int WriteTo(ref ChunkedSequence<byte> data, Memory<byte> buffer, ushort channel, ReadOnlySequence<byte> body, bool copyBody)
116+
public static int WriteTo(ref SequenceBuilder<byte> data, Memory<byte> buffer, ushort channel, ReadOnlySequence<byte> body, bool copyBody)
117117
{
118118
const int StartBodyArgument = StartPayload;
119119
NetworkOrderSerializer.WriteUInt64(ref buffer.Span.GetStart(), ((ulong)Constants.FrameBody << 56) | ((ulong)channel << 40) | ((ulong)body.Length << 8));
@@ -161,7 +161,7 @@ public static RentedOutgoingMemory GetHeartbeatFrame()
161161
byte[] buffer = ClientArrayPool.Rent(FrameSize);
162162
Payload.CopyTo(buffer);
163163
var mem = new ReadOnlyMemory<byte>(buffer, 0, FrameSize);
164-
return RentedOutgoingMemory.Create(mem, buffer);
164+
return RentedOutgoingMemory.GetAndInitialize(mem, buffer);
165165
}
166166
}
167167

@@ -177,7 +177,7 @@ public static RentedOutgoingMemory SerializeToFrames<T>(ref T method, ushort cha
177177

178178
System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}");
179179
var mem = new ReadOnlyMemory<byte>(array, 0, size);
180-
return RentedOutgoingMemory.Create(mem, array);
180+
return RentedOutgoingMemory.GetAndInitialize(mem, array);
181181
}
182182

183183
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -198,28 +198,28 @@ public static RentedOutgoingMemory SerializeToFrames<TMethod, THeader>(ref TMeth
198198
// Will be returned by SocketFrameWriter.WriteLoop
199199
byte[] array = ClientArrayPool.Rent(size);
200200

201-
ChunkedSequence<byte> sequence = new ChunkedSequence<byte>();
201+
SequenceBuilder<byte> sequenceBuilder = new SequenceBuilder<byte>();
202202
Memory<byte> buffer = array.AsMemory();
203203

204204
int offset = Method.WriteTo(array, channelNumber, ref method);
205205
offset += Header.WriteTo(array.AsSpan(offset), channelNumber, ref header, remainingBodyBytes);
206206

207-
sequence.Append(buffer.Slice(0, offset));
207+
sequenceBuilder.Append(buffer.Slice(0, offset));
208208
buffer = buffer.Slice(offset);
209209

210210
ReadOnlySequence<byte> remainingBody = body;
211211
while (remainingBodyBytes > 0)
212212
{
213213
int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes;
214-
int segmentSize = BodySegment.WriteTo(ref sequence, buffer, channelNumber, remainingBody.Slice(remainingBody.Length - remainingBodyBytes, frameSize), copyBody);
214+
int segmentSize = BodySegment.WriteTo(ref sequenceBuilder, buffer, channelNumber, remainingBody.Slice(remainingBody.Length - remainingBodyBytes, frameSize), copyBody);
215215

216216
buffer = buffer.Slice(segmentSize);
217217
offset += segmentSize;
218218
remainingBodyBytes -= frameSize;
219219
}
220220

221221
System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}");
222-
return RentedOutgoingMemory.Create(sequence.GetSequence(), array, waitSend: !copyBody);
222+
return RentedOutgoingMemory.GetAndInitialize(sequenceBuilder.Build(), array, waitSend: !copyBody);
223223
}
224224

225225
[MethodImpl(MethodImplOptions.AggressiveInlining)]
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#nullable enable
2+
using System;
3+
using System.Buffers;
4+
using System.Runtime.InteropServices;
5+
using RabbitMQ.Client.Impl;
6+
7+
namespace RabbitMQ.Util;
8+
9+
internal ref struct SequenceBuilder<T>
10+
{
11+
private Segment? _first;
12+
private Segment? _current;
13+
14+
public SequenceBuilder()
15+
{
16+
_first = _current = null;
17+
}
18+
19+
public void Append(ReadOnlySequence<T> sequence)
20+
{
21+
SequencePosition pos = sequence.Start;
22+
while (sequence.TryGet(ref pos, out ReadOnlyMemory<T> mem))
23+
{
24+
Append(mem);
25+
}
26+
}
27+
28+
public void Append(ReadOnlyMemory<T> memory)
29+
{
30+
if (_current == null)
31+
{
32+
_first = _current = new Segment(memory);
33+
}
34+
else if (!_current.TryMerge(memory))
35+
{
36+
_current = _current.Append(memory);
37+
}
38+
}
39+
40+
public ReadOnlySequence<T> Build()
41+
{
42+
if (_first == null || _current == null)
43+
{
44+
return default;
45+
}
46+
47+
return new ReadOnlySequence<T>(_first, 0, _current, _current.Memory.Length);
48+
}
49+
50+
private sealed class Segment : ReadOnlySequenceSegment<T>
51+
{
52+
public Segment(ReadOnlyMemory<T> memory)
53+
{
54+
Memory = memory;
55+
}
56+
57+
/// <summary>
58+
/// Try to merge the next memory into this chunk.
59+
/// </summary>
60+
/// <remarks>
61+
/// Used in <see cref="Framing.BodySegment.WriteTo"/> that can write the same array when the body is being copied.
62+
/// </remarks>
63+
/// <param name="next">The next memory.</param>
64+
/// <returns><c>true</c> if the memory was merged; otherwise <c>false</c>.</returns>
65+
public bool TryMerge(ReadOnlyMemory<T> next)
66+
{
67+
if (MemoryMarshal.TryGetArray(Memory, out ArraySegment<T> segment) &&
68+
MemoryMarshal.TryGetArray(next, out ArraySegment<T> nextSegment) &&
69+
segment.Array == nextSegment.Array &&
70+
nextSegment.Offset == segment.Offset + segment.Count)
71+
{
72+
Memory = segment.Array.AsMemory(segment.Offset, segment.Count + nextSegment.Count);
73+
return true;
74+
}
75+
76+
return false;
77+
}
78+
79+
public Segment Append(ReadOnlyMemory<T> memory)
80+
{
81+
Segment nextChunk = new(memory)
82+
{
83+
RunningIndex = RunningIndex + Memory.Length
84+
};
85+
86+
Next = nextChunk;
87+
return nextChunk;
88+
}
89+
}
90+
}

projects/Test/Unit/TestFrameFormatting.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
using RabbitMQ.Client;
3535
using RabbitMQ.Client.Framing.Impl;
3636
using RabbitMQ.Client.Impl;
37+
using RabbitMQ.Util;
3738
using Xunit;
3839

3940
namespace Test.Unit
@@ -147,11 +148,19 @@ public void BodySegmentFrame(bool copyBody)
147148
byte[] payload = new byte[4];
148149
byte[] buffer = new byte[RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize + (copyBody ? payload.Length : 0)];
149150

150-
ChunkedSequence<byte> segment = new ChunkedSequence<byte>();
151+
SequenceBuilder<byte> builder = new SequenceBuilder<byte>();
151152

152-
RabbitMQ.Client.Impl.Framing.BodySegment.WriteTo(ref segment, buffer, Channel, new ReadOnlySequence<byte>(payload), copyBody);
153+
RabbitMQ.Client.Impl.Framing.BodySegment.WriteTo(ref builder, buffer, Channel, new ReadOnlySequence<byte>(payload), copyBody);
153154

154-
byte[] frameBytes = segment.GetSequence().ToArray();
155+
var sequence = builder.Build();
156+
157+
if (copyBody)
158+
{
159+
// When copying the body, the memory is sequential
160+
Assert.True(sequence.IsSingleSegment);
161+
}
162+
163+
byte[] frameBytes = sequence.ToArray();
155164

156165
Assert.Equal(8, RabbitMQ.Client.Impl.Framing.BodySegment.FrameSize);
157166
Assert.Equal(Constants.FrameBody, frameBytes[0]);

0 commit comments

Comments
 (0)