Skip to content

Commit e20dcc3

Browse files
authored
Avoid array copy on Avro deserialization (#2483)
1 parent 922a41b commit e20dcc3

File tree

3 files changed

+102
-11
lines changed

3 files changed

+102
-11
lines changed

src/Confluent.SchemaRegistry.Serdes.Avro/GenericDeserializerImpl.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public override async Task<GenericRecord> DeserializeAsync(ReadOnlyMemory<byte>
5858
: await Deserialize(context.Topic, context.Headers, data,
5959
context.Component == MessageComponentType.Key).ConfigureAwait(false);
6060
}
61-
62-
public async Task<GenericRecord> Deserialize(string topic, Headers headers, ReadOnlyMemory<byte> array, bool isKey)
61+
62+
private async Task<GenericRecord> Deserialize(string topic, Headers headers, ReadOnlyMemory<byte> array, bool isKey)
6363
{
6464
try
6565
{
@@ -105,7 +105,7 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, Read
105105
DatumReader<GenericRecord> datumReader;
106106
if (migrations.Count > 0)
107107
{
108-
using (var stream = new MemoryStream(payload.ToArray()))
108+
using (var stream = new ReadOnlyMemoryStream(payload))
109109
{
110110
data = new GenericReader<GenericRecord>(writerSchema, writerSchema)
111111
.Read(default(GenericRecord), new BinaryDecoder(stream));
@@ -149,7 +149,7 @@ public async Task<GenericRecord> Deserialize(string topic, Headers headers, Read
149149
}
150150
datumReader = await GetDatumReader(writerSchema, readerSchema).ConfigureAwait(false);
151151

152-
using (var stream = new MemoryStream(payload.ToArray()))
152+
using (var stream = new ReadOnlyMemoryStream(payload))
153153
{
154154
data = datumReader.Read(default(GenericRecord), new BinaryDecoder(stream));
155155
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright 2025 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.IO;
19+
20+
namespace Confluent.SchemaRegistry.Serdes;
21+
22+
/// <summary>
23+
/// A read-only <see cref="Stream"/> implementation over a <see cref="ReadOnlyMemory{Byte}"/>.
24+
/// This class is needed to avoid calling <c>ToArray()</c> on <see cref="ReadOnlyMemory{Byte}"/>
25+
/// when passing data to APIs (such as Avro's <c>BinaryDecoder</c>) that require a <see cref="Stream"/>.
26+
/// </summary>
27+
internal class ReadOnlyMemoryStream : Stream
28+
{
29+
private readonly ReadOnlyMemory<byte> data;
30+
31+
public ReadOnlyMemoryStream(ReadOnlyMemory<byte> data)
32+
{
33+
this.data = data;
34+
}
35+
36+
public override void Flush()
37+
{
38+
}
39+
40+
public override int Read(byte[] buffer, int offset, int count)
41+
{
42+
if (buffer == null)
43+
throw new ArgumentNullException(nameof(buffer));
44+
if (offset < 0 || count < 0 || offset + count > buffer.Length)
45+
throw new ArgumentOutOfRangeException();
46+
47+
var remaining = data.Length - (int) Position;
48+
if (remaining <= 0)
49+
return 0;
50+
51+
var toRead = Math.Min(count, remaining);
52+
data.Slice((int) Position, toRead).Span.CopyTo(buffer.AsSpan(offset, toRead));
53+
Position += toRead;
54+
return toRead;
55+
}
56+
57+
public override long Seek(long offset, SeekOrigin origin)
58+
{
59+
switch (origin)
60+
{
61+
case SeekOrigin.Begin:
62+
Position = offset;
63+
break;
64+
case SeekOrigin.Current:
65+
Position += offset;
66+
break;
67+
case SeekOrigin.End:
68+
Position = Length + offset;
69+
break;
70+
default:
71+
throw new ArgumentOutOfRangeException(nameof(origin), origin, null);
72+
}
73+
74+
if (Position < 0 || Position > Length)
75+
throw new IOException("Seek operation resulted in an invalid position.");
76+
77+
return Position;
78+
}
79+
80+
public override void SetLength(long value)
81+
{
82+
throw new NotSupportedException();
83+
}
84+
85+
public override void Write(byte[] buffer, int offset, int count)
86+
{
87+
throw new NotSupportedException();
88+
}
89+
90+
public override bool CanRead => true;
91+
public override bool CanSeek => true;
92+
public override bool CanWrite => false;
93+
public override long Length => data.Length;
94+
public override long Position { get; set; }
95+
}

src/Confluent.SchemaRegistry.Serdes.Avro/SpecificDeserializerImpl.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,7 @@ public async Task<T> Deserialize(string topic, Headers headers, ReadOnlyMemory<b
154154
DatumReader<T> datumReader = null;
155155
if (migrations.Count > 0)
156156
{
157-
// TODO: We should be able to write a wrapper around ReadOnlyMemory<byte> that allows us to
158-
// pass it to the BinaryDecoder without copying the data to a MemoryStream.
159-
using (var memoryStream = new MemoryStream(payload.ToArray()))
157+
using (var memoryStream = new ReadOnlyMemoryStream(payload))
160158
{
161159
data = new GenericReader<GenericRecord>(writerSchema, writerSchema)
162160
.Read(default(GenericRecord), new BinaryDecoder(memoryStream));
@@ -187,10 +185,8 @@ public async Task<T> Deserialize(string topic, Headers headers, ReadOnlyMemory<b
187185
else
188186
{
189187
datumReader = await GetDatumReader(writerSchema, ReaderSchema).ConfigureAwait(false);
190-
191-
// TODO: We should be able to write a wrapper around ReadOnlyMemory<byte> that allows us to
192-
// pass it to the BinaryDecoder without copying the data to a MemoryStream.
193-
using var stream = new MemoryStream(payload.ToArray());
188+
189+
using var stream = new ReadOnlyMemoryStream(payload);
194190
data = Read(datumReader, new BinaryDecoder(stream));
195191
}
196192

0 commit comments

Comments
 (0)