Skip to content

Commit fd7168f

Browse files
authored
Proof-of-concept serialization of advanced JSON types, records (#1073)
* Add system text json support for actor serialization Signed-off-by: Erik O'Leary <[email protected]> * Remove unnecessary stream; directly use serializetobytes Signed-off-by: Erik O'Leary <[email protected]> * Disable parallel test execution to make test results more repeatable/predictable Signed-off-by: Erik O'Leary <[email protected]> --------- Signed-off-by: Erik O'Leary <[email protected]>
1 parent 17f849b commit fd7168f

20 files changed

+494
-112
lines changed

src/Dapr.Actors/Client/ActorProxyFactory.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ namespace Dapr.Actors.Client
1616
using System;
1717
using System.Net.Http;
1818
using Dapr.Actors.Builder;
19+
using Dapr.Actors.Communication;
1920
using Dapr.Actors.Communication.Client;
2021

2122
/// <summary>
@@ -79,7 +80,15 @@ public object CreateActorProxy(ActorId actorId, Type actorInterfaceType, string
7980
options ??= this.DefaultOptions;
8081

8182
var daprInteractor = new DaprHttpInteractor(this.handler, options.HttpEndpoint, options.DaprApiToken, options.RequestTimeout);
82-
var remotingClient = new ActorRemotingClient(daprInteractor);
83+
84+
// provide a serializer if 'useJsonSerialization' is true and no serialization provider is provided.
85+
IActorMessageBodySerializationProvider serializationProvider = null;
86+
if (options.UseJsonSerialization)
87+
{
88+
serializationProvider = new ActorMessageBodyJsonSerializationProvider(options.JsonSerializerOptions);
89+
}
90+
91+
var remotingClient = new ActorRemotingClient(daprInteractor, serializationProvider);
8392
var proxyGenerator = ActorCodeBuilder.GetOrCreateProxyGenerator(actorInterfaceType);
8493
var actorProxy = proxyGenerator.CreateActorProxy();
8594
actorProxy.Initialize(remotingClient, actorId, actorType, options);

src/Dapr.Actors/Client/ActorProxyOptions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,10 @@ public JsonSerializerOptions JsonSerializerOptions
6262
/// The timeout allowed for an actor request. Can be set to System.Threading.Timeout.InfiniteTimeSpan to disable any timeouts.
6363
/// </summary>
6464
public TimeSpan? RequestTimeout { get; set; } = null;
65+
66+
/// <summary>
67+
/// Enable JSON serialization for actor proxy message serialization in both remoting and non-remoting invocations.
68+
/// </summary>
69+
public bool UseJsonSerialization { get; set; }
6570
}
6671
}

src/Dapr.Actors/Communication/ActorMessageBodyDataContractSerializationProvider.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ namespace Dapr.Actors.Communication
1717
using System.Collections.Generic;
1818
using System.IO;
1919
using System.Runtime.Serialization;
20+
using System.Threading.Tasks;
2021
using System.Xml;
2122

2223
/// <summary>
@@ -185,21 +186,21 @@ byte[] IActorRequestMessageBodySerializer.Serialize(IActorRequestMessageBody act
185186
return stream.ToArray();
186187
}
187188

188-
IActorRequestMessageBody IActorRequestMessageBodySerializer.Deserialize(Stream stream)
189+
ValueTask<IActorRequestMessageBody> IActorRequestMessageBodySerializer.DeserializeAsync(Stream stream)
189190
{
190191
if (stream == null)
191192
{
192-
return null;
193+
return default;
193194
}
194195

195196
if (stream.Length == 0)
196197
{
197-
return null;
198+
return default;
198199
}
199200

200201
stream.Position = 0;
201202
using var reader = this.CreateXmlDictionaryReader(stream);
202-
return (TRequest)this.serializer.ReadObject(reader);
203+
return new ValueTask<IActorRequestMessageBody>((TRequest)this.serializer.ReadObject(reader));
203204
}
204205

205206
byte[] IActorResponseMessageBodySerializer.Serialize(IActorResponseMessageBody actorResponseMessageBody)
@@ -217,11 +218,11 @@ byte[] IActorResponseMessageBodySerializer.Serialize(IActorResponseMessageBody a
217218
return stream.ToArray();
218219
}
219220

220-
IActorResponseMessageBody IActorResponseMessageBodySerializer.Deserialize(Stream messageBody)
221+
ValueTask<IActorResponseMessageBody> IActorResponseMessageBodySerializer.DeserializeAsync(Stream messageBody)
221222
{
222223
if (messageBody == null)
223224
{
224-
return null;
225+
return default;
225226
}
226227

227228
// TODO check performance
@@ -231,11 +232,11 @@ IActorResponseMessageBody IActorResponseMessageBodySerializer.Deserialize(Stream
231232

232233
if (stream.Capacity == 0)
233234
{
234-
return null;
235+
return default;
235236
}
236237

237238
using var reader = this.CreateXmlDictionaryReader(stream);
238-
return (TResponse)this.serializer.ReadObject(reader);
239+
return new ValueTask<IActorResponseMessageBody>((TResponse)this.serializer.ReadObject(reader));
239240
}
240241

241242
/// <summary>
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2021 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using System;
15+
using System.Collections.Generic;
16+
using System.Text.Json;
17+
using System.Text.Json.Serialization;
18+
19+
namespace Dapr.Actors.Communication
20+
{
21+
internal class ActorMessageBodyJsonConverter<T> : JsonConverter<T>
22+
{
23+
private readonly List<Type> methodRequestParameterTypes;
24+
private readonly List<Type> wrappedRequestMessageTypes;
25+
private readonly Type wrapperMessageType;
26+
27+
public ActorMessageBodyJsonConverter(
28+
List<Type> methodRequestParameterTypes,
29+
List<Type> wrappedRequestMessageTypes = null
30+
)
31+
{
32+
this.methodRequestParameterTypes = methodRequestParameterTypes;
33+
this.wrappedRequestMessageTypes = wrappedRequestMessageTypes;
34+
35+
if (this.wrappedRequestMessageTypes != null && this.wrappedRequestMessageTypes.Count == 1)
36+
{
37+
this.wrapperMessageType = this.wrappedRequestMessageTypes[0];
38+
}
39+
}
40+
41+
public override T Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
42+
{
43+
// Ensure start-of-object, then advance
44+
if (reader.TokenType != JsonTokenType.StartObject) throw new JsonException();
45+
reader.Read();
46+
47+
// Ensure property name, then advance
48+
if (reader.TokenType != JsonTokenType.PropertyName || reader.GetString() != "value") throw new JsonException();
49+
reader.Read();
50+
51+
// If the value is null, return null.
52+
if (reader.TokenType == JsonTokenType.Null)
53+
{
54+
// Read the end object token.
55+
reader.Read();
56+
return default;
57+
}
58+
59+
// If the value is an object, deserialize it to wrapper message type
60+
if (this.wrapperMessageType != null)
61+
{
62+
var value = JsonSerializer.Deserialize(ref reader, this.wrapperMessageType, options);
63+
64+
// Construct a new WrappedMessageBody with the deserialized value.
65+
var wrapper = new WrappedMessageBody()
66+
{
67+
Value = value,
68+
};
69+
70+
// Read the end object token.
71+
reader.Read();
72+
73+
// Coerce the type to T; required because WrappedMessageBody inherits from two separate interfaces, which
74+
// cannot both be used as generic constraints
75+
return (T)(object)wrapper;
76+
}
77+
78+
return JsonSerializer.Deserialize<T>(ref reader, options);
79+
}
80+
81+
public override void Write(Utf8JsonWriter writer, T value, JsonSerializerOptions options)
82+
{
83+
writer.WriteStartObject();
84+
writer.WritePropertyName("value");
85+
86+
if (value is WrappedMessageBody body)
87+
{
88+
JsonSerializer.Serialize(writer, body.Value, body.Value.GetType(), options);
89+
}
90+
else
91+
writer.WriteNullValue();
92+
writer.WriteEndObject();
93+
}
94+
}
95+
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2021 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
namespace Dapr.Actors.Communication
15+
{
16+
using System;
17+
using System.Collections.Generic;
18+
using System.IO;
19+
using System.Text.Json;
20+
using System.Threading.Tasks;
21+
using System.Xml;
22+
23+
/// <summary>
24+
/// This is the implmentation for <see cref="IActorMessageBodySerializationProvider"/>used by remoting service and client during
25+
/// request/response serialization . It uses request Wrapping and data contract for serialization.
26+
/// </summary>
27+
internal class ActorMessageBodyJsonSerializationProvider : IActorMessageBodySerializationProvider
28+
{
29+
public JsonSerializerOptions Options { get; }
30+
31+
/// <summary>
32+
/// Initializes a new instance of the <see cref="ActorMessageBodyJsonSerializationProvider"/> class.
33+
/// </summary>
34+
public ActorMessageBodyJsonSerializationProvider(JsonSerializerOptions options)
35+
{
36+
Options = options;
37+
}
38+
39+
/// <summary>
40+
/// Creates a MessageFactory for Wrapped Message Json Remoting Types. This is used to create Remoting Request/Response objects.
41+
/// </summary>
42+
/// <returns>
43+
/// <see cref="IActorMessageBodyFactory" /> that provides an instance of the factory for creating
44+
/// remoting request and response message bodies.
45+
/// </returns>
46+
public IActorMessageBodyFactory CreateMessageBodyFactory()
47+
{
48+
return new WrappedRequestMessageFactory();
49+
}
50+
51+
/// <summary>
52+
/// Creates IActorRequestMessageBodySerializer for a serviceInterface using Wrapped Message Json implementation.
53+
/// </summary>
54+
/// <param name="serviceInterfaceType">The remoted service interface.</param>
55+
/// <param name="methodRequestParameterTypes">The union of parameter types of all of the methods of the specified interface.</param>
56+
/// <param name="wrappedRequestMessageTypes">Wrapped Request Types for all Methods.</param>
57+
/// <returns>
58+
/// An instance of the <see cref="IActorRequestMessageBodySerializer" /> that can serialize the service
59+
/// actor request message body to a messaging body for transferring over the transport.
60+
/// </returns>
61+
public IActorRequestMessageBodySerializer CreateRequestMessageBodySerializer(
62+
Type serviceInterfaceType,
63+
IEnumerable<Type> methodRequestParameterTypes,
64+
IEnumerable<Type> wrappedRequestMessageTypes = null)
65+
{
66+
return new MemoryStreamMessageBodySerializer<WrappedMessageBody, WrappedMessageBody>(Options, serviceInterfaceType, methodRequestParameterTypes, wrappedRequestMessageTypes);
67+
}
68+
69+
/// <summary>
70+
/// Creates IActorResponseMessageBodySerializer for a serviceInterface using Wrapped Message Json implementation.
71+
/// </summary>
72+
/// <param name="serviceInterfaceType">The remoted service interface.</param>
73+
/// <param name="methodReturnTypes">The return types of all of the methods of the specified interface.</param>
74+
/// <param name="wrappedResponseMessageTypes">Wrapped Response Types for all remoting methods.</param>
75+
/// <returns>
76+
/// An instance of the <see cref="IActorResponseMessageBodySerializer" /> that can serialize the service
77+
/// actor response message body to a messaging body for transferring over the transport.
78+
/// </returns>
79+
public IActorResponseMessageBodySerializer CreateResponseMessageBodySerializer(
80+
Type serviceInterfaceType,
81+
IEnumerable<Type> methodReturnTypes,
82+
IEnumerable<Type> wrappedResponseMessageTypes = null)
83+
{
84+
return new MemoryStreamMessageBodySerializer<WrappedMessageBody, WrappedMessageBody>(Options, serviceInterfaceType, methodReturnTypes, wrappedResponseMessageTypes);
85+
}
86+
87+
/// <summary>
88+
/// Default serializer for service remoting request and response message body that uses the
89+
/// memory stream to create outgoing message buffers.
90+
/// </summary>
91+
private class MemoryStreamMessageBodySerializer<TRequest, TResponse> :
92+
IActorRequestMessageBodySerializer,
93+
IActorResponseMessageBodySerializer
94+
where TRequest : IActorRequestMessageBody
95+
where TResponse : IActorResponseMessageBody
96+
{
97+
private readonly JsonSerializerOptions serializerOptions;
98+
99+
public MemoryStreamMessageBodySerializer(
100+
JsonSerializerOptions serializerOptions,
101+
Type serviceInterfaceType,
102+
IEnumerable<Type> methodRequestParameterTypes,
103+
IEnumerable<Type> wrappedRequestMessageTypes = null)
104+
{
105+
var _methodRequestParameterTypes = new List<Type>(methodRequestParameterTypes);
106+
var _wrappedRequestMessageTypes = new List<Type>(wrappedRequestMessageTypes);
107+
108+
this.serializerOptions = new(serializerOptions)
109+
{
110+
// Workaround since WrappedMessageBody creates an object
111+
// with parameters as fields
112+
IncludeFields = true,
113+
};
114+
115+
this.serializerOptions.Converters.Add(new ActorMessageBodyJsonConverter<TRequest>(_methodRequestParameterTypes, _wrappedRequestMessageTypes));
116+
this.serializerOptions.Converters.Add(new ActorMessageBodyJsonConverter<TResponse>(_methodRequestParameterTypes, _wrappedRequestMessageTypes));
117+
}
118+
119+
byte[] IActorRequestMessageBodySerializer.Serialize(IActorRequestMessageBody actorRequestMessageBody)
120+
{
121+
if (actorRequestMessageBody == null)
122+
{
123+
return null;
124+
}
125+
126+
return JsonSerializer.SerializeToUtf8Bytes<object>(actorRequestMessageBody, this.serializerOptions);
127+
}
128+
129+
async ValueTask<IActorRequestMessageBody> IActorRequestMessageBodySerializer.DeserializeAsync(Stream stream)
130+
{
131+
if (stream == null)
132+
{
133+
return default;
134+
}
135+
136+
if (stream.Length == 0)
137+
{
138+
return default;
139+
}
140+
141+
stream.Position = 0;
142+
return await JsonSerializer.DeserializeAsync<TRequest>(stream, this.serializerOptions);
143+
}
144+
145+
byte[] IActorResponseMessageBodySerializer.Serialize(IActorResponseMessageBody actorResponseMessageBody)
146+
{
147+
if (actorResponseMessageBody == null)
148+
{
149+
return null;
150+
}
151+
152+
return JsonSerializer.SerializeToUtf8Bytes<object>(actorResponseMessageBody, this.serializerOptions);
153+
}
154+
155+
async ValueTask<IActorResponseMessageBody> IActorResponseMessageBodySerializer.DeserializeAsync(Stream messageBody)
156+
{
157+
if (messageBody == null)
158+
{
159+
return null;
160+
}
161+
162+
using var stream = new MemoryStream();
163+
messageBody.CopyTo(stream);
164+
stream.Position = 0;
165+
166+
if (stream.Capacity == 0)
167+
{
168+
return null;
169+
}
170+
171+
return await JsonSerializer.DeserializeAsync<TResponse>(stream, this.serializerOptions);
172+
}
173+
}
174+
}
175+
}

src/Dapr.Actors/Communication/IActorRequestMessageBodySerializer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
namespace Dapr.Actors.Communication
1515
{
1616
using System.IO;
17+
using System.Threading.Tasks;
1718

1819
/// <summary>
1920
/// Defines the interface that must be implemented to provide a serializer/deserializer for remoting request message body.
@@ -32,6 +33,6 @@ internal interface IActorRequestMessageBodySerializer
3233
/// </summary>
3334
/// <param name="messageBody">Serialized message body.</param>
3435
/// <returns>Deserialized remoting request message body object.</returns>
35-
IActorRequestMessageBody Deserialize(Stream messageBody);
36+
ValueTask<IActorRequestMessageBody> DeserializeAsync(Stream messageBody);
3637
}
3738
}

0 commit comments

Comments
 (0)