Skip to content

Commit a25ee97

Browse files
[EventHubs] Optimize body eager copying (#31322)
* Optimize EagerCopyingMessageBody by making sure the array doesn't need to grow * Optimize CopyingOnConversionMessageBody by making sure the array doesn't need to grow * Rename loop variable
1 parent 25b9d3c commit a25ee97

File tree

1 file changed

+68
-33
lines changed

1 file changed

+68
-33
lines changed

sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/MessageBody.cs

Lines changed: 68 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using System.Linq;
88
using Azure.Core;
99
using Azure.Core.Amqp;
10-
using Azure.Messaging.EventHubs;
1110
using Microsoft.Azure.Amqp.Framing;
1211

1312
namespace Azure.Messaging.EventHubs.Amqp
@@ -150,11 +149,7 @@ protected override ReadOnlyMemory<byte> WrittenMemory
150149
{
151150
if (_lazySegments != null)
152151
{
153-
foreach (var segment in _lazySegments)
154-
{
155-
AppendSegment(segment);
156-
}
157-
152+
AppendSegments(_lazySegments);
158153
_lazySegments = null;
159154
}
160155

@@ -183,21 +178,43 @@ public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator() =>
183178
_segments?.GetEnumerator() ?? _lazySegments.GetEnumerator();
184179

185180
/// <summary>
186-
/// Appends a memory segment to the continuous buffer.
181+
/// Appends memory segments to the continuous buffer.
187182
/// </summary>
188183
///
189-
/// <param name="segment">The memory segment to append.</param>
184+
/// <param name="dataSegments">The memory segments to append.</param>
190185
///
191-
private void AppendSegment(ReadOnlyMemory<byte> segment)
186+
private void AppendSegments(IEnumerable<ReadOnlyMemory<byte>> dataSegments)
192187
{
193-
_writer ??= new ArrayBufferWriter<byte>();
194-
_segments ??= new List<ReadOnlyMemory<byte>>();
188+
int length = 0;
189+
int numberOfSegments = 0;
190+
List<ReadOnlyMemory<byte>> segments = null;
191+
foreach (var segment in dataSegments)
192+
{
193+
segments ??= dataSegments is IReadOnlyCollection<ReadOnlyMemory<byte>> readOnlyList
194+
? new List<ReadOnlyMemory<byte>>(readOnlyList.Count)
195+
: new List<ReadOnlyMemory<byte>>();
196+
length += segment.Length;
197+
numberOfSegments++;
198+
segments.Add(segment);
199+
}
200+
201+
if (segments == null)
202+
{
203+
return;
204+
}
195205

196-
var memory = _writer.GetMemory(segment.Length);
197-
segment.CopyTo(memory);
206+
// fields are lazy initialized to not occupy unnecessary memory when there are no data segments
207+
_writer = length > 0 ? new ArrayBufferWriter<byte>(length) : new ArrayBufferWriter<byte>();
208+
_segments = segments;
198209

199-
_writer.Advance(segment.Length);
200-
_segments.Add(memory.Slice(0, segment.Length));
210+
for (var segmentIndex = 0; segmentIndex < numberOfSegments; segmentIndex++)
211+
{
212+
var dataToAppend = segments[segmentIndex];
213+
var memory = _writer.GetMemory(dataToAppend.Length);
214+
dataToAppend.CopyTo(memory);
215+
_writer.Advance(dataToAppend.Length);
216+
segments[segmentIndex] = memory.Slice(0, dataToAppend.Length);
217+
}
201218
}
202219
}
203220

@@ -230,10 +247,7 @@ private sealed class EagerCopyingMessageBody : MessageBody
230247
///
231248
public EagerCopyingMessageBody(IEnumerable<Data> dataSegments)
232249
{
233-
foreach (var segment in dataSegments)
234-
{
235-
AppendSegment(segment);
236-
}
250+
AppendSegments(dataSegments);
237251
}
238252

239253
/// <summary>
@@ -245,28 +259,49 @@ public EagerCopyingMessageBody(IEnumerable<Data> dataSegments)
245259
public override IEnumerator<ReadOnlyMemory<byte>> GetEnumerator() => _segments.GetEnumerator();
246260

247261
/// <summary>
248-
/// Appends a memory segment to the continuous buffer.
262+
/// Appends memory segments to the continuous buffer.
249263
/// </summary>
250264
///
251-
/// <param name="segment">The memory segment to append.</param>
265+
/// <param name="dataSegments">The memory segments to append.</param>
252266
///
253-
private void AppendSegment(Data segment)
267+
private void AppendSegments(IEnumerable<Data> dataSegments)
254268
{
255-
_writer ??= new ArrayBufferWriter<byte>();
256-
_segments ??= new List<ReadOnlyMemory<byte>>();
269+
int length = 0;
270+
int numberOfSegments = 0;
271+
List<ReadOnlyMemory<byte>> segments = null;
272+
foreach (var segment in dataSegments)
273+
{
274+
segments ??= dataSegments is IReadOnlyCollection<Data> readOnlyList
275+
? new List<ReadOnlyMemory<byte>>(readOnlyList.Count)
276+
: new List<ReadOnlyMemory<byte>>();
277+
ReadOnlyMemory<byte> dataToAppend = segment.Value switch
278+
{
279+
byte[] byteArray => byteArray,
280+
ArraySegment<byte> arraySegment => arraySegment,
281+
_ => ReadOnlyMemory<byte>.Empty
282+
};
283+
length += dataToAppend.Length;
284+
numberOfSegments++;
285+
segments.Add(dataToAppend);
286+
}
257287

258-
ReadOnlyMemory<byte> dataToAppend = segment.Value switch
288+
if (segments == null)
259289
{
260-
byte[] byteArray => byteArray,
261-
ArraySegment<byte> arraySegment => arraySegment,
262-
_ => ReadOnlyMemory<byte>.Empty
263-
};
290+
return;
291+
}
264292

265-
var memory = _writer.GetMemory(dataToAppend.Length);
266-
dataToAppend.CopyTo(memory);
293+
// fields are lazy initialized to not occupy unnecessary memory when there are no data segments
294+
_writer = length > 0 ? new ArrayBufferWriter<byte>(length) : new ArrayBufferWriter<byte>();
295+
_segments = segments;
267296

268-
_writer.Advance(dataToAppend.Length);
269-
_segments.Add(memory.Slice(0, dataToAppend.Length));
297+
for (var segmentIndex = 0; segmentIndex < numberOfSegments; segmentIndex++)
298+
{
299+
var dataToAppend = segments[segmentIndex];
300+
var memory = _writer.GetMemory(dataToAppend.Length);
301+
dataToAppend.CopyTo(memory);
302+
_writer.Advance(dataToAppend.Length);
303+
segments[segmentIndex] = memory.Slice(0, dataToAppend.Length);
304+
}
270305
}
271306
}
272307
}

0 commit comments

Comments
 (0)