Skip to content

Commit 113a771

Browse files
Havretrayokota
andauthored
Protobuf serialization perf improvements (#2459)
* Refactor ProtobufSerializer for improved performance Replace MemoryStream with pre-allocated byte buffers to reduce allocations and improve performance. * Regenerate protos * Regenerate protos --------- Co-authored-by: Robert Yokota <[email protected]>
1 parent a8296c3 commit 113a771

File tree

14 files changed

+15467
-575
lines changed

14 files changed

+15467
-575
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2024 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Buffers.Binary;
19+
using System.Runtime.CompilerServices;
20+
using System.Runtime.InteropServices;
21+
22+
namespace Confluent.SchemaRegistry.Serdes.Protobuf;
23+
24+
internal static class BinaryConverter
25+
{
26+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
27+
public static int WriteInt32(Span<byte> destination, int value)
28+
{
29+
Unsafe.WriteUnaligned(ref MemoryMarshal.GetReference(destination), BitConverter.IsLittleEndian ? BinaryPrimitives.ReverseEndianness(value) : value);
30+
return sizeof(int);
31+
}
32+
33+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
34+
public static int WriteByte(Span<byte> destination, byte value)
35+
{
36+
destination[0] = value;
37+
return sizeof(byte);
38+
}
39+
40+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
41+
public static int WriteBytes(Span<byte> destination, byte[] value)
42+
{
43+
value.CopyTo(destination);
44+
return value.Length;
45+
}
46+
}

src/Confluent.SchemaRegistry.Serdes.Protobuf/ProtobufSerializer.cs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
using System.Collections.Generic;
2323
using System.IO;
2424
using System.Linq;
25-
using System.Net;
2625
using System.Threading.Tasks;
2726
using Confluent.Kafka;
27+
using Confluent.SchemaRegistry.Serdes.Protobuf;
2828
using Google.Protobuf;
2929
using ProtobufNet::Google.Protobuf.Reflection;
3030
using FileDescriptor = Google.Protobuf.Reflection.FileDescriptor;
@@ -307,15 +307,20 @@ await RegisterOrGetReferences(value.Descriptor.File, context, autoRegisterSchema
307307
.ConfigureAwait(continueOnCapturedContext: false);
308308
}
309309

310-
using (var stream = new MemoryStream(initialBufferSize))
311-
using (var writer = new BinaryWriter(stream))
312-
{
313-
stream.WriteByte(Constants.MagicByte);
314-
writer.Write(IPAddress.HostToNetworkOrder(schemaId.Value));
315-
writer.Write(this.indexArray);
316-
value.WriteTo(stream);
317-
return stream.ToArray();
318-
}
310+
int bufferSize = sizeof(byte) // Magic byte
311+
+ sizeof(int) // Schema ID
312+
+ indexArray.Length // Index array size
313+
+ value.CalculateSize(); // Serialized message size
314+
315+
var buffer = new byte[bufferSize];
316+
317+
var offset = 0;
318+
offset += BinaryConverter.WriteByte(buffer, Constants.MagicByte);
319+
offset += BinaryConverter.WriteInt32(buffer.AsSpan(offset), schemaId.Value!);
320+
offset += BinaryConverter.WriteBytes(buffer.AsSpan(offset), this.indexArray);
321+
value.WriteTo(buffer.AsSpan(offset));
322+
323+
return buffer;
319324
}
320325
catch (AggregateException e)
321326
{

test/Confluent.SchemaRegistry.IntegrationTests/Tests/Person.cs

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// Generated by the protocol buffer compiler. DO NOT EDIT!
33
// source: person.proto
44
// </auto-generated>
5-
#pragma warning disable 1591, 0612, 3021
5+
#pragma warning disable 1591, 0612, 3021, 8981
66
#region Designer generated code
77

88
using pb = global::Google.Protobuf;
@@ -30,45 +30,55 @@ static PersonReflection() {
3030
"TmFtZRILCgNBZ2UYAiABKAViBnByb3RvMw=="));
3131
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
3232
new pbr::FileDescriptor[] { global::Confluent.Kafka.Examples.Protobuf.PersonNameReflection.Descriptor, },
33-
new pbr::GeneratedClrTypeInfo(null, new pbr::GeneratedClrTypeInfo[] {
34-
new pbr::GeneratedClrTypeInfo(typeof(global::Confluent.Kafka.Examples.Protobuf.Person), global::Confluent.Kafka.Examples.Protobuf.Person.Parser, new[]{ "Name", "Age" }, null, null, null)
33+
new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] {
34+
new pbr::GeneratedClrTypeInfo(typeof(global::Confluent.Kafka.Examples.Protobuf.Person), global::Confluent.Kafka.Examples.Protobuf.Person.Parser, new[]{ "Name", "Age" }, null, null, null, null)
3535
}));
3636
}
3737
#endregion
3838

3939
}
4040
#region Messages
41-
public sealed partial class Person : pb::IMessage<Person> {
41+
public sealed partial class Person : pb::IMessage<Person>
42+
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
43+
, pb::IBufferMessage
44+
#endif
45+
{
4246
private static readonly pb::MessageParser<Person> _parser = new pb::MessageParser<Person>(() => new Person());
4347
private pb::UnknownFieldSet _unknownFields;
4448
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
49+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
4550
public static pb::MessageParser<Person> Parser { get { return _parser; } }
4651

4752
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
53+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
4854
public static pbr::MessageDescriptor Descriptor {
4955
get { return global::Confluent.Kafka.Examples.Protobuf.PersonReflection.Descriptor.MessageTypes[0]; }
5056
}
5157

5258
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
59+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
5360
pbr::MessageDescriptor pb::IMessage.Descriptor {
5461
get { return Descriptor; }
5562
}
5663

5764
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
65+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
5866
public Person() {
5967
OnConstruction();
6068
}
6169

6270
partial void OnConstruction();
6371

6472
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
73+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
6574
public Person(Person other) : this() {
6675
name_ = other.name_ != null ? other.name_.Clone() : null;
6776
age_ = other.age_;
6877
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
6978
}
7079

7180
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
81+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
7282
public Person Clone() {
7383
return new Person(this);
7484
}
@@ -77,6 +87,7 @@ public Person Clone() {
7787
public const int NameFieldNumber = 1;
7888
private global::Confluent.Kafka.Examples.Protobuf.PersonName name_;
7989
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
90+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
8091
public global::Confluent.Kafka.Examples.Protobuf.PersonName Name {
8192
get { return name_; }
8293
set {
@@ -88,6 +99,7 @@ public Person Clone() {
8899
public const int AgeFieldNumber = 2;
89100
private int age_;
90101
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
102+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
91103
public int Age {
92104
get { return age_; }
93105
set {
@@ -96,11 +108,13 @@ public int Age {
96108
}
97109

98110
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
111+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
99112
public override bool Equals(object other) {
100113
return Equals(other as Person);
101114
}
102115

103116
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
117+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
104118
public bool Equals(Person other) {
105119
if (ReferenceEquals(other, null)) {
106120
return false;
@@ -114,6 +128,7 @@ public bool Equals(Person other) {
114128
}
115129

116130
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
131+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
117132
public override int GetHashCode() {
118133
int hash = 1;
119134
if (name_ != null) hash ^= Name.GetHashCode();
@@ -125,12 +140,17 @@ public override int GetHashCode() {
125140
}
126141

127142
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
143+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
128144
public override string ToString() {
129145
return pb::JsonFormatter.ToDiagnosticString(this);
130146
}
131147

132148
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
149+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
133150
public void WriteTo(pb::CodedOutputStream output) {
151+
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
152+
output.WriteRawMessage(this);
153+
#else
134154
if (name_ != null) {
135155
output.WriteRawTag(10);
136156
output.WriteMessage(Name);
@@ -142,9 +162,29 @@ public void WriteTo(pb::CodedOutputStream output) {
142162
if (_unknownFields != null) {
143163
_unknownFields.WriteTo(output);
144164
}
165+
#endif
145166
}
146167

168+
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
147169
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
170+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
171+
void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) {
172+
if (name_ != null) {
173+
output.WriteRawTag(10);
174+
output.WriteMessage(Name);
175+
}
176+
if (Age != 0) {
177+
output.WriteRawTag(16);
178+
output.WriteInt32(Age);
179+
}
180+
if (_unknownFields != null) {
181+
_unknownFields.WriteTo(ref output);
182+
}
183+
}
184+
#endif
185+
186+
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
187+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
148188
public int CalculateSize() {
149189
int size = 0;
150190
if (name_ != null) {
@@ -160,13 +200,14 @@ public int CalculateSize() {
160200
}
161201

162202
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
203+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
163204
public void MergeFrom(Person other) {
164205
if (other == null) {
165206
return;
166207
}
167208
if (other.name_ != null) {
168209
if (name_ == null) {
169-
name_ = new global::Confluent.Kafka.Examples.Protobuf.PersonName();
210+
Name = new global::Confluent.Kafka.Examples.Protobuf.PersonName();
170211
}
171212
Name.MergeFrom(other.Name);
172213
}
@@ -177,7 +218,11 @@ public void MergeFrom(Person other) {
177218
}
178219

179220
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
221+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
180222
public void MergeFrom(pb::CodedInputStream input) {
223+
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
224+
input.ReadRawMessage(this);
225+
#else
181226
uint tag;
182227
while ((tag = input.ReadTag()) != 0) {
183228
switch(tag) {
@@ -186,9 +231,35 @@ public void MergeFrom(pb::CodedInputStream input) {
186231
break;
187232
case 10: {
188233
if (name_ == null) {
189-
name_ = new global::Confluent.Kafka.Examples.Protobuf.PersonName();
234+
Name = new global::Confluent.Kafka.Examples.Protobuf.PersonName();
235+
}
236+
input.ReadMessage(Name);
237+
break;
238+
}
239+
case 16: {
240+
Age = input.ReadInt32();
241+
break;
242+
}
243+
}
244+
}
245+
#endif
246+
}
247+
248+
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
249+
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
250+
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
251+
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
252+
uint tag;
253+
while ((tag = input.ReadTag()) != 0) {
254+
switch(tag) {
255+
default:
256+
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
257+
break;
258+
case 10: {
259+
if (name_ == null) {
260+
Name = new global::Confluent.Kafka.Examples.Protobuf.PersonName();
190261
}
191-
input.ReadMessage(name_);
262+
input.ReadMessage(Name);
192263
break;
193264
}
194265
case 16: {
@@ -198,6 +269,7 @@ public void MergeFrom(pb::CodedInputStream input) {
198269
}
199270
}
200271
}
272+
#endif
201273

202274
}
203275

0 commit comments

Comments
 (0)