Skip to content

Commit 0320f84

Browse files
authored
light up the protobuf-net capability API, for array-segment, buffer writer, and ROS (#98)
* reapply #53 (easier than fixing all the merge conflicts) * fix test expectations - need custom marshalled for both enabled+disabled at client, to prevent seeing the server work * isn't worth keeping the ArrayBufferWriter code; in reality all implementations would work * lib updates; add SetMarshaller API (overrides)
1 parent 619a5c8 commit 0320f84

File tree

10 files changed

+257
-66
lines changed

10 files changed

+257
-66
lines changed

Directory.Build.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<ExampleRefs>local</ExampleRefs> <!-- local or nuget-->
2929
<PBGRPCLibVersion>0.0.3-alpha.54</PBGRPCLibVersion>
3030
<GrpcDotNetVersion>2.29.0</GrpcDotNetVersion>
31-
<GoogleProtobufVersion>3.12.2</GoogleProtobufVersion>
31+
<GoogleProtobufVersion>3.12.3</GoogleProtobufVersion>
3232
<GrpcVersion>2.29.0</GrpcVersion>
3333
</PropertyGroup>
3434
<PropertyGroup Condition="'$(Configuration)'=='Release' or '$(Configuration)'=='VS'">

protobuf-net.Grpc.sln

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "protobuf-net.Grpc.Test.Inte
9797
EndProject
9898
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "toys", "toys", "{0DD003CC-90E1-4938-98F0-8181949CD727}"
9999
EndProject
100-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "protobuf-net.Grpc.ClientFactory", "src\protobuf-net.Grpc.ClientFactory\protobuf-net.Grpc.ClientFactory.csproj", "{4A7D8244-D6B2-4DD3-A0F3-1BF716FB1A0B}"
100+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "protobuf-net.Grpc.ClientFactory", "src\protobuf-net.Grpc.ClientFactory\protobuf-net.Grpc.ClientFactory.csproj", "{4A7D8244-D6B2-4DD3-A0F3-1BF716FB1A0B}"
101+
EndProject
102+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "protobuf-net.Grpc.Test.IntegrationUpLevel", "tests\protobuf-net.Grpc.Test.IntegrationUpLevel\protobuf-net.Grpc.Test.IntegrationUpLevel.csproj", "{B694ED60-93A4-4362-BBCF-4EA04B6F4660}"
101103
EndProject
102104
Global
103105
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -256,6 +258,12 @@ Global
256258
{4A7D8244-D6B2-4DD3-A0F3-1BF716FB1A0B}.Release|Any CPU.Build.0 = Release|Any CPU
257259
{4A7D8244-D6B2-4DD3-A0F3-1BF716FB1A0B}.VS|Any CPU.ActiveCfg = Debug|Any CPU
258260
{4A7D8244-D6B2-4DD3-A0F3-1BF716FB1A0B}.VS|Any CPU.Build.0 = Debug|Any CPU
261+
{B694ED60-93A4-4362-BBCF-4EA04B6F4660}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
262+
{B694ED60-93A4-4362-BBCF-4EA04B6F4660}.Debug|Any CPU.Build.0 = Debug|Any CPU
263+
{B694ED60-93A4-4362-BBCF-4EA04B6F4660}.Release|Any CPU.ActiveCfg = Release|Any CPU
264+
{B694ED60-93A4-4362-BBCF-4EA04B6F4660}.Release|Any CPU.Build.0 = Release|Any CPU
265+
{B694ED60-93A4-4362-BBCF-4EA04B6F4660}.VS|Any CPU.ActiveCfg = Debug|Any CPU
266+
{B694ED60-93A4-4362-BBCF-4EA04B6F4660}.VS|Any CPU.Build.0 = Debug|Any CPU
259267
EndGlobalSection
260268
GlobalSection(SolutionProperties) = preSolution
261269
HideSolutionNode = FALSE
@@ -296,6 +304,7 @@ Global
296304
{BCB98A08-8A05-41CA-B42F-9DBF3DB8D546} = {6A3AC822-52E8-4AE2-8F71-385DF0D69891}
297305
{7AF5B934-AEE9-4FD1-928D-AAE0F7098A32} = {0A84599D-2CE9-416E-888F-24652EEAB0B3}
298306
{4A7D8244-D6B2-4DD3-A0F3-1BF716FB1A0B} = {39491A90-84A2-4E13-B867-CFC3D4592084}
307+
{B694ED60-93A4-4362-BBCF-4EA04B6F4660} = {0A84599D-2CE9-416E-888F-24652EEAB0B3}
299308
EndGlobalSection
300309
GlobalSection(ExtensibilityGlobals) = postSolution
301310
SolutionGuid = {BA14B07C-CA29-430D-A600-F37A050636D3}

src/protobuf-net.Grpc/Configuration/BinderConfiguration.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using ProtoBuf.Grpc.Internal;
1+
using Grpc.Core;
2+
using ProtoBuf.Grpc.Internal;
23
using System;
34
using System.Collections.Generic;
45
using System.Linq;
@@ -40,5 +41,17 @@ public static BinderConfiguration Create(IList<MarshallerFactory>? marshallerFac
4041
// note we create a copy of the factories at this point, to prevent further mutation by the caller
4142
return new BinderConfiguration(marshallerFactories.ToArray(), binder);
4243
}
44+
45+
/// <summary>
46+
/// Gets a typed marshaller associated with this configuration
47+
/// </summary>
48+
public Marshaller<T> GetMarshaller<T>() => MarshallerCache.GetMarshaller<T>();
49+
50+
51+
/// <summary>
52+
/// Sets (or resets) a typed marshalled against this configuration
53+
/// </summary>
54+
/// <param name="marshaller">The marshaller to use - if null, the cache is reset for this type</param>
55+
public void SetMarshaller<T>(Marshaller<T>? marshaller) => MarshallerCache.SetMarshaller<T>(marshaller);
4356
}
4457
}

src/protobuf-net.Grpc/Configuration/ProtoBufMarshallerFactory.cs

Lines changed: 123 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
using System.Buffers;
44
using System.IO;
55
using System.Runtime.CompilerServices;
6+
using System.Runtime.InteropServices;
7+
using System.Threading;
68

79
namespace ProtoBuf.Grpc.Configuration
810
{
911
/// <summary>
1012
/// Provides protobuf-net implementation of a per-type marshaller
1113
/// </summary>
12-
public class ProtoBufMarshallerFactory : MarshallerFactory
14+
public partial class ProtoBufMarshallerFactory : MarshallerFactory
1315
{
1416
/// <summary>
1517
/// Options that control protobuf-net marshalling
@@ -24,69 +26,153 @@ public enum Options
2426
/// <summary>
2527
/// Enforce that only contract-types should be allowed
2628
/// </summary>
27-
ContractTypesOnly = 1,
29+
ContractTypesOnly = 1 << 0,
30+
/// <summary>
31+
/// Disable 'contextual' serializer usage (this means serializers that use <see cref="ReadOnlySequence{byte}"/> or <see cref="IBufferWriter{byte}"/>),
32+
/// using the legacy <see cref="byte[]"/> APIs.
33+
/// </summary>
34+
DisableContextualSerializer = 1 << 1,
2835
}
2936

3037
/// <summary>
3138
/// Uses the default protobuf-net serializer
3239
/// </summary>
33-
public static MarshallerFactory Default { get; } = new ProtoBufMarshallerFactory(RuntimeTypeModel.Default, Options.None);
40+
public static MarshallerFactory Default { get; } = new ProtoBufMarshallerFactory(RuntimeTypeModel.Default, Options.None, default);
3441

35-
private readonly RuntimeTypeModel _model;
42+
private readonly TypeModel _model;
43+
private readonly SerializationContext? _userState;
3644
private readonly Options _options;
45+
// note: these are the same *object*, but pre-checked for optional API support, for efficiency
46+
// (the minimum .NET object size means that the extra fields don't cost anything)
47+
private readonly IMeasuredProtoOutput<IBufferWriter<byte>>? _measuredWriterModel;
48+
private readonly IProtoInput<ReadOnlySequence<byte>>? _squenceReaderModel;
49+
3750
/// <summary>
3851
/// Create a new factory using a specific protobuf-net model
3952
/// </summary>
40-
public static MarshallerFactory Create(RuntimeTypeModel? model = null, Options options = Options.None)
53+
public static MarshallerFactory Create(TypeModel? model = null, Options options = Options.None, object? userState = null)
4154
{
42-
if (model == null) model = RuntimeTypeModel.Default;
43-
if (options == Options.None && model == RuntimeTypeModel.Default) return Default;
44-
return new ProtoBufMarshallerFactory(model, options);
55+
model ??= RuntimeTypeModel.Default;
56+
if (options == Options.None && model == RuntimeTypeModel.Default && userState is null) return Default;
57+
return new ProtoBufMarshallerFactory(model, options, userState);
4558
}
4659

47-
internal ProtoBufMarshallerFactory(RuntimeTypeModel model, Options options)
60+
/// <summary>
61+
/// Create a new factory using a specific protobuf-net model
62+
/// </summary>
63+
public static MarshallerFactory Create(RuntimeTypeModel? model, Options options)
64+
=> Create((TypeModel?)model, options, null);
65+
66+
internal ProtoBufMarshallerFactory(TypeModel model, Options options, object? userState = null)
4867
{
4968
_model = model;
5069
_options = options;
70+
_userState = userState switch
71+
{
72+
null => null,
73+
SerializationContext ctx => ctx,
74+
_ => new SerializationContext { Context = userState }
75+
};
76+
77+
if (UseContextualSerializer)
78+
{
79+
// test these once rather than every time
80+
_measuredWriterModel = model as IMeasuredProtoOutput<IBufferWriter<byte>>;
81+
_squenceReaderModel = model as IProtoInput<ReadOnlySequence<byte>>;
82+
}
5183
}
5284

85+
private bool UseContextualSerializer => (_options & Options.DisableContextualSerializer) == 0;
86+
5387
[MethodImpl(MethodImplOptions.AggressiveInlining)]
5488
private bool Has(Options option) => (_options & option) == option;
5589

56-
/* see: https://github.com/grpc/grpc/pull/19471 / https://github.com/grpc/grpc/issues/19470
5790
/// <summary>
5891
/// Deserializes an object from a payload
5992
/// </summary>
6093
protected internal override global::Grpc.Core.Marshaller<T> CreateMarshaller<T>()
61-
=> new global::Grpc.Core.Marshaller<T>(ContextualSerialize<T>, ContextualDeserialize<T>);
62-
*/
94+
=> UseContextualSerializer
95+
? new global::Grpc.Core.Marshaller<T>(ContextualSerialize<T>, ContextualDeserialize<T>)
96+
: base.CreateMarshaller<T>();
97+
98+
#if DEBUG
99+
private int _uplevelBufferReadCount, _uplevelBufferWriteCount;
100+
public int UplevelBufferReadCount => Volatile.Read(ref _uplevelBufferReadCount);
101+
public int UplevelBufferWriteCount => Volatile.Read(ref _uplevelBufferWriteCount);
102+
103+
partial void RecordUplevelBufferRead() => Interlocked.Increment(ref _uplevelBufferReadCount);
104+
partial void RecordUplevelBufferWrite() => Interlocked.Increment(ref _uplevelBufferWriteCount);
105+
#endif
106+
partial void RecordUplevelBufferRead();
107+
partial void RecordUplevelBufferWrite();
108+
109+
private bool TryGetBufferWriter(global::Grpc.Core.SerializationContext context, out IBufferWriter<byte>? writer)
110+
{
111+
// the managed implementation does not yet implement this API
112+
try { writer = context.GetBufferWriter(); }
113+
catch (NotSupportedException) { writer = default; }
114+
catch (NotImplementedException) { writer = default; }
115+
return writer is object;
116+
}
63117

64118
private void ContextualSerialize<T>(T value, global::Grpc.Core.SerializationContext context)
65-
=> context.Complete(Serialize(value));
119+
{
120+
if (_measuredWriterModel is object)
121+
{ // forget what we think we know about TypeModel; if we have protobuf-net 3.*, we can do this
122+
123+
RecordUplevelBufferWrite();
124+
125+
using var measured = _measuredWriterModel.Measure(value, userState: _userState);
126+
int len = checked((int)measured.Length);
127+
context.SetPayloadLength(len);
128+
129+
if (TryGetBufferWriter(context, out var writer))
130+
{ // write to the buffer-writer API
131+
_measuredWriterModel.Serialize(measured, writer!);
132+
context.Complete();
133+
}
134+
else
135+
{
136+
// the buffer-writer API wasn't supported, but we can still optimize by right-sizing
137+
// a MemoryStream to write to, to avoid a resize etc
138+
context.Complete(Serialize<T>(value, len));
139+
}
140+
}
141+
else
142+
{
143+
context.Complete(Serialize<T>(value));
144+
}
145+
}
66146

67147
private T ContextualDeserialize<T>(global::Grpc.Core.DeserializationContext context)
68148
{
69149
var ros = context.PayloadAsReadOnlySequence();
70-
#if PLAT_PBN_NOSPAN
71-
// copy the data out of the ROS into a rented buffer, and deserialize
72-
// from that
150+
if (_squenceReaderModel is object)
151+
{ // forget what we think we know about TypeModel; if we have protobuf-net 3.*, we can do this
152+
RecordUplevelBufferRead();
153+
return _squenceReaderModel.Deserialize<T>(ros, userState: _userState);
154+
}
155+
156+
// 2.4.2+ can use array-segments
157+
IProtoInput<ArraySegment<byte>> segmentReader = _model;
158+
159+
// can we go direct to a single segment?
160+
if (ros.IsSingleSegment && MemoryMarshal.TryGetArray(ros.First, out var segment))
161+
{
162+
return segmentReader.Deserialize<T>(segment, userState: _userState);
163+
}
164+
165+
// otherwise; linearize the data
73166
var oversized = ArrayPool<byte>.Shared.Rent(context.PayloadLength);
74167
try
75168
{
76169
ros.CopyTo(oversized);
77-
return Deserialize<T>(oversized, 0, context.PayloadLength);
170+
return segmentReader.Deserialize<T>(new ArraySegment<byte>(oversized, 0, context.PayloadLength), userState: _userState);
78171
}
79172
finally
80173
{
81174
ArrayPool<byte>.Shared.Return(oversized);
82175
}
83-
#else
84-
// create a reader directly on the ROS
85-
using (var reader = ProtoReader.Create(out var state, ros, _model))
86-
{
87-
return (T)_model.Deserialize(reader, ref state, null, typeof(T));
88-
}
89-
#endif
90176
}
91177

92178
/// <summary>
@@ -101,21 +187,9 @@ protected internal override bool CanSerialize(Type type)
101187
/// Deserializes an object from a payload
102188
/// </summary>
103189
protected override T Deserialize<T>(byte[] payload)
104-
=> Deserialize<T>(payload, 0, payload.Length);
105-
106-
private T Deserialize<T>(byte[] payload, int offset, int count)
107190
{
108-
#if PLAT_PBN_NOSPAN
109-
using var ms = new MemoryStream(payload, offset, count);
110-
using var reader = ProtoReader.Create(ms, _model);
111-
return (T)_model.Deserialize(reader, null, typeof(T));
112-
#else
113-
var range = new ReadOnlyMemory<byte>(payload, offset, count);
114-
using (var reader = ProtoReader.Create(out var state, range, _model))
115-
{
116-
return (T)_model.Deserialize(reader, ref state, null, typeof(T));
117-
}
118-
#endif
191+
IProtoInput<byte[]> segmentReader = _model;
192+
return segmentReader.Deserialize<T>(payload, userState: _userState);
119193
}
120194

121195
/// <summary>
@@ -127,5 +201,16 @@ protected override byte[] Serialize<T>(T value)
127201
_model.Serialize(ms, value, context: null);
128202
return ms.ToArray();
129203
}
204+
205+
private byte[] Serialize<T>(T value, int length)
206+
{
207+
if (length == 0) return Array.Empty<byte>();
208+
var arr = new byte[length];
209+
using var ms = new MemoryStream(arr);
210+
_model.Serialize(ms, value, context: null);
211+
if (length != ms.Length) throw new InvalidOperationException(
212+
$"Length miscalculated; expected {length}, got {ms.Length}");
213+
return arr;
214+
}
130215
}
131216
}

src/protobuf-net.Grpc/Internal/MarshallerCache.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
using Grpc.Core;
22
using ProtoBuf.Grpc.Configuration;
33
using System;
4-
using System.Collections.Generic;
5-
using System.Linq;
6-
using System.Runtime.CompilerServices;
74
using System.Collections.Concurrent;
85
using System.Reflection;
9-
using System.Diagnostics;
6+
using System.Runtime.CompilerServices;
107

118
namespace ProtoBuf.Grpc.Internal
129
{
@@ -42,6 +39,18 @@ internal Marshaller<T> GetMarshaller<T>()
4239
static Marshaller<T> Throw() => throw new InvalidOperationException("No marshaller available for " + typeof(T).FullName);
4340
}
4441

42+
internal void SetMarshaller<T>(Marshaller<T>? marshaller)
43+
{
44+
if (marshaller is null)
45+
{
46+
_marshallers.TryRemove(typeof(T), out _);
47+
}
48+
else
49+
{
50+
_marshallers[typeof(T)] = marshaller;
51+
}
52+
}
53+
4554
[MethodImpl(MethodImplOptions.NoInlining)]
4655
private Marshaller<T>? CreateAndAdd<T>()
4756
{
Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

3-
<PropertyGroup>
4-
<TargetFrameworks>net461;netstandard2.1;netstandard2.0</TargetFrameworks>
5-
<RootNamespace>ProtoBuf.Grpc</RootNamespace>
6-
<LangVersion>preview</LangVersion>
3+
<PropertyGroup>
4+
<TargetFrameworks>net461;netstandard2.1;netstandard2.0</TargetFrameworks>
5+
<RootNamespace>ProtoBuf.Grpc</RootNamespace>
6+
<LangVersion>preview</LangVersion>
77

8-
<!-- note: stick with RTM versions of protobuf-net for now; update all later-->
9-
<DefineConstants>$(DefineConstants);PLAT_PBN_NOSPAN;PLAT_NO_CHANNEL_READALLASYNC</DefineConstants>
10-
</PropertyGroup>
11-
<ItemGroup>
12-
<PackageReference Include="Grpc.Core.Api" Version="$(GrpcVersion)" />
13-
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
14-
<PackageReference Include="protobuf-net" Version="2.4.6" />
15-
</ItemGroup>
16-
<ItemGroup Condition="'$(TargetFramework)' != 'netstandard2.1'">
17-
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
18-
</ItemGroup>
8+
<DefineConstants>$(DefineConstants);PLAT_NO_CHANNEL_READALLASYNC</DefineConstants>
9+
</PropertyGroup>
10+
<ItemGroup>
11+
<PackageReference Include="Grpc.Core.Api" Version="$(GrpcVersion)" />
12+
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
13+
<PackageReference Include="protobuf-net" Version="2.4.6" />
14+
</ItemGroup>
15+
<ItemGroup Condition="'$(TargetFramework)' != 'netstandard2.1'">
16+
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
17+
</ItemGroup>
1918
</Project>

0 commit comments

Comments
 (0)