Skip to content

Commit 6a15319

Browse files
authored
Merge pull request #1015 from AArnott/callScopedLifetime
Add support for call-scoped marshalable objects
2 parents 8c653af + c1740da commit 6a15319

17 files changed

+504
-166
lines changed

doc/rpc_marshalable_objects.md

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,45 @@ StreamJsonRpc allows transmitting marshalable objects (i.e., objects implementin
77

88
Marshalable interfaces must:
99

10-
1. Extend `IDisposable`.
10+
1. Extend `IDisposable` (unless interface is call-scoped).
1111
1. Not include any properties.
1212
1. Not include any events.
1313

1414
The object that implements a marshalable interface may include properties and events as well as other additional members but only the methods defined by the marshalable interface will be available on the proxy, and the data will not be serialized.
1515

16+
The `RpcMarshalableAttribute` must be applied directly to the interface used as the return type, parameter type, or member type within a return type or parameter type's object graph.
17+
The attribute is not inherited.
18+
In fact different interfaces in a type hierarchy can have this attribute applied with distinct settings, and only the settings on the attribute applied directly to the interface used will apply.
19+
20+
## Call-scoped vs. explicitly scoped
21+
22+
### Explicit lifetime
23+
24+
An RPC marshalable interface has an explicit lifetime by default.
25+
This means that the receiver of a marshaled object owns its lifetime, which may extend beyond an individual RPC call.
26+
Memory for the marshaled object and its proxy are not released until the receiver either disposes of its proxy or the JSON-RPC connection is closed.
27+
28+
### Call-scoped lifetime
29+
30+
A call-scoped interface produces a proxy that is valid only during the RPC call that delivered it.
31+
It may only be used as part of a method request as or within an argument.
32+
Using it as or within a return value or exception will result in an error.
33+
34+
This is the preferred model when an interface is expected to only be used within request arguments because it mitigates the risk of a memory leak due to the receiver failing to dispose of the proxy.
35+
This model also allows the sender to retain control over the lifetime of the marshaled object.
36+
37+
Special allowance is made for `IAsyncEnumerable<T>`-returning RPC methods so that the lifetime of the marshaled object is extended to the lifetime of the enumeration.
38+
An `IAsyncEnumerable<T>` in an exception thrown from the method will *not* have access to the call-scoped marshaled object because exceptions thrown by the server always cause termination of objects marshaled by the request.
39+
40+
Opt into call-scoped lifetimes by setting the `CallScopedLifetime` property to `true` on the attribute applied to the interface:
41+
42+
```css
43+
[RpcMarshalable(CallScopedLifetime = true)]
44+
```
45+
46+
It is not possible to customize the lifetime of an RPC marshaled object except on its own interface.
47+
For example, applying this attribute to the parameter that uses the interface is not allowed.
48+
1649
## Use cases
1750

1851
In all cases, the special handling of a marshalable object only occurs if the container of that value is typed as the corresponding marshalable interface.
@@ -104,6 +137,8 @@ class RpcServer : IRpcServer
104137
}
105138
```
106139

140+
Call-scoped marshalable interfaces may not be used as a return type or member of its object graph.
141+
107142
### Method argument
108143

109144
In this use case the RPC *client* provides the marshalable object to the server:
@@ -119,6 +154,8 @@ var counter = new Counter();
119154
await client.ProvideCounterAsync(counter);
120155
```
121156

157+
Call-scoped marshalable interfaces may only appear as a method parameter or a part of its object graph.
158+
122159
### Value within a single argument's object graph
123160

124161
In this use case the RPC client again provides the marshalable object to the server,
@@ -144,6 +181,7 @@ await client.ProvideClassAsync(arg);
144181
```
145182

146183
⚠️ While this use case is supported, be very wary of this pattern because it becomes less obvious to the receiver that an `IDisposable` value is tucked into the object tree of an argument somewhere that *must* be disposed to avoid a resource leak.
184+
This risk can be mitigated by using call-scoped marshalable interfaces.
147185

148186
### As an argument without a proxy for an RPC interface
149187

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

4-
using Microsoft;
4+
namespace StreamJsonRpc;
55

66
internal class DisposableAction : IDisposableObservable
77
{
8-
private readonly Action? disposeAction;
8+
private static readonly Action EmptyAction = () => { };
9+
private Action? disposeAction;
910

1011
internal DisposableAction(Action? disposeAction)
1112
{
12-
this.disposeAction = disposeAction;
13+
this.disposeAction = disposeAction ?? EmptyAction;
1314
}
1415

15-
public bool IsDisposed { get; private set; }
16+
public bool IsDisposed => this.disposeAction is null;
1617

1718
public void Dispose()
1819
{
19-
this.IsDisposed = true;
20-
this.disposeAction?.Invoke();
20+
Interlocked.Exchange(ref this.disposeAction, null)?.Invoke();
2121
}
2222
}

src/StreamJsonRpc/FormatterBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ JsonRpc IJsonRpcInstanceContainer.Rpc
8989
this.rpc = value;
9090

9191
this.formatterProgressTracker = new MessageFormatterProgressTracker(value, this);
92-
this.enumerableTracker = new MessageFormatterEnumerableTracker(value, this);
93-
this.duplexPipeTracker = new MessageFormatterDuplexPipeTracker(value, this) { MultiplexingStream = this.MultiplexingStream };
9492
this.rpcMarshaledContextTracker = new MessageFormatterRpcMarshaledContextTracker(value, this);
93+
this.enumerableTracker = new MessageFormatterEnumerableTracker(value, this, this.rpcMarshaledContextTracker);
94+
this.duplexPipeTracker = new MessageFormatterDuplexPipeTracker(value, this) { MultiplexingStream = this.MultiplexingStream };
9595
}
9696
}
9797
}

src/StreamJsonRpc/JsonMessageFormatter.cs

Lines changed: 58 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ public JToken Serialize(JsonRpcMessage message)
351351
Protocol.JsonRpcError IJsonRpcMessageFactory.CreateErrorMessage() => new JsonRpcError(this.JsonSerializer);
352352

353353
/// <inheritdoc/>
354-
Protocol.JsonRpcResult IJsonRpcMessageFactory.CreateResultMessage() => new JsonRpcResult(this.JsonSerializer);
354+
Protocol.JsonRpcResult IJsonRpcMessageFactory.CreateResultMessage() => new JsonRpcResult(this);
355355

356356
/// <inheritdoc />
357357
protected override void Dispose(bool disposing)
@@ -570,9 +570,9 @@ private JTokenWriter CreateJTokenWriter()
570570

571571
private bool TryGetMarshaledJsonConverter(Type type, [NotNullWhen(true)] out RpcMarshalableConverter? converter)
572572
{
573-
if (MessageFormatterRpcMarshaledContextTracker.TryGetMarshalOptionsForType(type, out JsonRpcProxyOptions? proxyOptions, out JsonRpcTargetOptions? targetOptions))
573+
if (MessageFormatterRpcMarshaledContextTracker.TryGetMarshalOptionsForType(type, out JsonRpcProxyOptions? proxyOptions, out JsonRpcTargetOptions? targetOptions, out RpcMarshalableAttribute? rpcMarshalableAttribute))
574574
{
575-
converter = new RpcMarshalableConverter(type, this, proxyOptions, targetOptions);
575+
converter = new RpcMarshalableConverter(type, this, proxyOptions, targetOptions, rpcMarshalableAttribute);
576576
return true;
577577
}
578578

@@ -616,7 +616,7 @@ private JsonRpcResult ReadResult(JToken json)
616616
RequestId id = this.ExtractRequestId(json);
617617
JToken? result = json["result"];
618618

619-
return new JsonRpcResult(this.JsonSerializer)
619+
return new JsonRpcResult(this)
620620
{
621621
RequestId = id,
622622
Result = result,
@@ -837,15 +837,27 @@ public override bool TryGetArgumentByNameOrIndex(string? name, int position, Typ
837837
[DataContract]
838838
private class JsonRpcResult : JsonRpcResultBase
839839
{
840-
private readonly JsonSerializer jsonSerializer;
840+
private readonly JsonMessageFormatter formatter;
841+
private bool resultDeserialized;
842+
private JsonSerializationException? resultDeserializationException;
841843

842-
internal JsonRpcResult(JsonSerializer jsonSerializer)
844+
internal JsonRpcResult(JsonMessageFormatter formatter)
843845
{
844-
this.jsonSerializer = jsonSerializer ?? throw new ArgumentNullException(nameof(jsonSerializer));
846+
this.formatter = formatter;
845847
}
846848

847849
public override T GetResult<T>()
848850
{
851+
if (this.resultDeserializationException is not null)
852+
{
853+
ExceptionDispatchInfo.Capture(this.resultDeserializationException).Throw();
854+
}
855+
856+
if (this.resultDeserialized)
857+
{
858+
return (T)this.Result!;
859+
}
860+
849861
Verify.Operation(this.Result is not null, "This instance hasn't been initialized with a result yet.");
850862
var result = (JToken)this.Result;
851863
if (result.Type == JTokenType.Null)
@@ -856,15 +868,45 @@ public override T GetResult<T>()
856868

857869
try
858870
{
859-
return result.ToObject<T>(this.jsonSerializer)!;
871+
using (this.formatter.TrackDeserialization(this))
872+
{
873+
return result.ToObject<T>(this.formatter.JsonSerializer)!;
874+
}
860875
}
861876
catch (Exception exception)
862877
{
863878
throw new JsonSerializationException(string.Format(CultureInfo.CurrentCulture, Resources.FailureDeserializingRpcResult, typeof(T).Name, exception.GetType().Name, exception.Message), exception);
864879
}
865880
}
866881

867-
protected override TopLevelPropertyBagBase? CreateTopLevelPropertyBag() => new TopLevelPropertyBag(this.jsonSerializer);
882+
protected internal override void SetExpectedResultType(Type resultType)
883+
{
884+
Verify.Operation(this.Result is not null, "This instance hasn't been initialized with a result yet.");
885+
Verify.Operation(!this.resultDeserialized, "Result is no longer available or has already been deserialized.");
886+
887+
var result = (JToken)this.Result;
888+
if (result.Type == JTokenType.Null)
889+
{
890+
Verify.Operation(!resultType.GetTypeInfo().IsValueType || Nullable.GetUnderlyingType(resultType) is not null, "null result is not assignable to a value type.");
891+
return;
892+
}
893+
894+
try
895+
{
896+
using (this.formatter.TrackDeserialization(this))
897+
{
898+
this.Result = result.ToObject(resultType, this.formatter.JsonSerializer)!;
899+
this.resultDeserialized = true;
900+
}
901+
}
902+
catch (Exception exception)
903+
{
904+
// This was a best effort anyway. We'll throw again later at a more convenient time for JsonRpc.
905+
this.resultDeserializationException = new JsonSerializationException(string.Format(CultureInfo.CurrentCulture, Resources.FailureDeserializingRpcResult, resultType.Name, exception.GetType().Name, exception.Message), exception);
906+
}
907+
}
908+
909+
protected override TopLevelPropertyBagBase? CreateTopLevelPropertyBag() => new TopLevelPropertyBag(this.formatter.JsonSerializer);
868910
}
869911

870912
private class JsonRpcError : JsonRpcErrorBase
@@ -1207,29 +1249,16 @@ public override void WriteJson(JsonWriter writer, Stream? value, JsonSerializer
12071249
}
12081250

12091251
[DebuggerDisplay("{" + nameof(DebuggerDisplay) + "}")]
1210-
private class RpcMarshalableConverter : JsonConverter
1252+
private class RpcMarshalableConverter(Type interfaceType, JsonMessageFormatter jsonMessageFormatter, JsonRpcProxyOptions proxyOptions, JsonRpcTargetOptions targetOptions, RpcMarshalableAttribute rpcMarshalableAttribute) : JsonConverter
12111253
{
1212-
private readonly Type interfaceType;
1213-
private readonly JsonMessageFormatter jsonMessageFormatter;
1214-
private readonly JsonRpcProxyOptions proxyOptions;
1215-
private readonly JsonRpcTargetOptions targetOptions;
1216-
1217-
public RpcMarshalableConverter(Type interfaceType, JsonMessageFormatter jsonMessageFormatter, JsonRpcProxyOptions proxyOptions, JsonRpcTargetOptions targetOptions)
1218-
{
1219-
this.interfaceType = interfaceType;
1220-
this.jsonMessageFormatter = jsonMessageFormatter;
1221-
this.proxyOptions = proxyOptions;
1222-
this.targetOptions = targetOptions;
1223-
}
1224-
1225-
private string DebuggerDisplay => $"Converter for marshalable objects of type {this.interfaceType.FullName}";
1254+
private string DebuggerDisplay => $"Converter for marshalable objects of type {interfaceType.FullName}";
12261255

1227-
public override bool CanConvert(Type objectType) => objectType == this.interfaceType;
1256+
public override bool CanConvert(Type objectType) => objectType == interfaceType;
12281257

12291258
public override object? ReadJson(JsonReader reader, Type objectType, object? existingValue, JsonSerializer serializer)
12301259
{
12311260
var token = (MessageFormatterRpcMarshaledContextTracker.MarshalToken?)JToken.Load(reader).ToObject(typeof(MessageFormatterRpcMarshaledContextTracker.MarshalToken), serializer);
1232-
return this.jsonMessageFormatter.RpcMarshaledContextTracker.GetObject(objectType, token, this.proxyOptions);
1261+
return jsonMessageFormatter.RpcMarshaledContextTracker.GetObject(objectType, token, proxyOptions);
12331262
}
12341263

12351264
public override void WriteJson(JsonWriter writer, object? value, JsonSerializer serializer)
@@ -1238,13 +1267,13 @@ public override void WriteJson(JsonWriter writer, object? value, JsonSerializer
12381267
{
12391268
writer.WriteNull();
12401269
}
1241-
else if (!this.interfaceType.IsAssignableFrom(value.GetType()))
1270+
else if (!interfaceType.IsAssignableFrom(value.GetType()))
12421271
{
1243-
throw new InvalidOperationException($"Type {value.GetType().FullName} doesn't implement {this.interfaceType.FullName}");
1272+
throw new InvalidOperationException($"Type {value.GetType().FullName} doesn't implement {interfaceType.FullName}");
12441273
}
12451274
else
12461275
{
1247-
MessageFormatterRpcMarshaledContextTracker.MarshalToken token = this.jsonMessageFormatter.RpcMarshaledContextTracker.GetToken(value, this.targetOptions, this.interfaceType);
1276+
MessageFormatterRpcMarshaledContextTracker.MarshalToken token = jsonMessageFormatter.RpcMarshaledContextTracker.GetToken(value, targetOptions, interfaceType, rpcMarshalableAttribute);
12481277
serializer.Serialize(writer, token);
12491278
}
12501279
}

src/StreamJsonRpc/JsonRpc.cs

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2569,56 +2569,66 @@ private async Task HandleRpcAsync(JsonRpcMessage rpc)
25692569
}
25702570
else if (rpc is IJsonRpcMessageWithId resultOrError)
25712571
{
2572-
this.OnResponseReceived(rpc);
2573-
JsonRpcResult? result = resultOrError as JsonRpcResult;
2574-
JsonRpcError? error = resultOrError as JsonRpcError;
2575-
2576-
lock (this.dispatcherMapLock)
2572+
try
25772573
{
2578-
if (this.resultDispatcherMap.TryGetValue(resultOrError.RequestId, out data))
2579-
{
2580-
this.resultDispatcherMap.Remove(resultOrError.RequestId);
2581-
}
2582-
}
2574+
JsonRpcResult? result = resultOrError as JsonRpcResult;
2575+
JsonRpcError? error = resultOrError as JsonRpcError;
25832576

2584-
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
2585-
{
2586-
if (result is not null)
2587-
{
2588-
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEvents.ReceivedResult, "Received result for request \"{0}\".", result.RequestId);
2589-
}
2590-
else if (error?.Error is object)
2577+
lock (this.dispatcherMapLock)
25912578
{
2592-
this.TraceSource.TraceEvent(TraceEventType.Warning, (int)TraceEvents.ReceivedError, "Received error response for request {0}: {1} \"{2}\": ", error.RequestId, error.Error.Code, error.Error.Message);
2579+
if (this.resultDispatcherMap.TryGetValue(resultOrError.RequestId, out data))
2580+
{
2581+
this.resultDispatcherMap.Remove(resultOrError.RequestId);
2582+
}
25932583
}
2594-
}
25952584

2596-
if (data is object)
2597-
{
2598-
if (data.ExpectedResultType is not null && rpc is JsonRpcResult resultMessage)
2585+
if (this.TraceSource.Switch.ShouldTrace(TraceEventType.Information))
25992586
{
2600-
resultMessage.SetExpectedResultType(data.ExpectedResultType);
2587+
if (result is not null)
2588+
{
2589+
this.TraceSource.TraceEvent(TraceEventType.Information, (int)TraceEvents.ReceivedResult, "Received result for request \"{0}\".", result.RequestId);
2590+
}
2591+
else if (error?.Error is object)
2592+
{
2593+
this.TraceSource.TraceEvent(TraceEventType.Warning, (int)TraceEvents.ReceivedError, "Received error response for request {0}: {1} \"{2}\": ", error.RequestId, error.Error.Code, error.Error.Message);
2594+
}
26012595
}
2602-
else if (rpc is JsonRpcError errorMessage && errorMessage.Error is not null)
2596+
2597+
if (data is object)
26032598
{
2604-
Type? errorType = this.GetErrorDetailsDataType(errorMessage);
2605-
if (errorType is not null)
2599+
if (data.ExpectedResultType is not null && rpc is JsonRpcResult resultMessage)
26062600
{
2607-
errorMessage.Error.SetExpectedDataType(errorType);
2601+
resultMessage.SetExpectedResultType(data.ExpectedResultType);
2602+
}
2603+
else if (rpc is JsonRpcError errorMessage && errorMessage.Error is not null)
2604+
{
2605+
Type? errorType = this.GetErrorDetailsDataType(errorMessage);
2606+
if (errorType is not null)
2607+
{
2608+
errorMessage.Error.SetExpectedDataType(errorType);
2609+
}
26082610
}
2611+
2612+
this.OnResponseReceived(rpc);
2613+
2614+
// Complete the caller's request with the response asynchronously so it doesn't delay handling of other JsonRpc messages.
2615+
await TaskScheduler.Default.SwitchTo(alwaysYield: true);
2616+
data.CompletionHandler(rpc);
2617+
data = null; // avoid invoking again if we throw later
26092618
}
2619+
else
2620+
{
2621+
this.OnResponseReceived(rpc);
26102622

2611-
// Complete the caller's request with the response asynchronously so it doesn't delay handling of other JsonRpc messages.
2612-
await TaskScheduler.Default.SwitchTo(alwaysYield: true);
2613-
data.CompletionHandler(rpc);
2614-
data = null; // avoid invoking again if we throw later
2623+
// Unexpected "response" to no request we have a record of. Raise disconnected event.
2624+
this.OnJsonRpcDisconnected(new JsonRpcDisconnectedEventArgs(
2625+
Resources.UnexpectedResponseWithNoMatchingRequest,
2626+
DisconnectedReason.RemoteProtocolViolation));
2627+
}
26152628
}
2616-
else
2629+
catch
26172630
{
2618-
// Unexpected "response" to no request we have a record of. Raise disconnected event.
2619-
this.OnJsonRpcDisconnected(new JsonRpcDisconnectedEventArgs(
2620-
Resources.UnexpectedResponseWithNoMatchingRequest,
2621-
DisconnectedReason.RemoteProtocolViolation));
2631+
this.OnResponseReceived(rpc);
26222632
}
26232633
}
26242634
else

0 commit comments

Comments
 (0)