Skip to content

Commit 530cd4c

Browse files
authored
Support IObservable input/output (#236)
1 parent 7e93f20 commit 530cd4c

File tree

10 files changed

+1023
-245
lines changed

10 files changed

+1023
-245
lines changed

src/protobuf-net.Grpc/Configuration/ServerBinder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public int Bind(object state, Type serviceType, BinderConfiguration? binderConfi
5252
var bindCtx = new ServiceBindContext(serviceContract, serviceType, state, binderConfiguration.Binder);
5353
foreach (var op in ContractOperation.FindOperations(binderConfiguration, serviceContract, this))
5454
{
55-
if (ServerInvokerLookup.TryGetValue(op.MethodType, op.Context, op.Result, op.Void, out var invoker)
55+
if (ServerInvokerLookup.TryGetValue(op.MethodType, op.Context, op.Arg, op.Result, op.Void, out var invoker)
5656
&& AddMethod(op.From, op.To, op.Name, op.Method, op.MethodType, invoker, bindCtx,
5757
serviceContractSimplifiedExceptions || op.Method.IsDefined(typeof(SimpleRpcExceptionsAttribute))
5858
))

src/protobuf-net.Grpc/Internal/ContractOperation.cs

Lines changed: 132 additions & 91 deletions
Large diffs are not rendered by default.

src/protobuf-net.Grpc/Internal/MetadataContext.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,10 @@ internal void SetTrailers(RpcException fault)
7373
}
7474
}
7575

76-
internal void SetTrailers<T>(T call, Func<T, Status> getStatus, Func<T, Metadata> getMetadata)
76+
internal void SetTrailers<T>(T? call, Func<T, Status> getStatus, Func<T, Metadata> getMetadata)
7777
where T : class
7878
{
79+
if (call is null) return;
7980
try
8081
{
8182
_trailers = getMetadata(call) ?? Metadata.Empty;
@@ -88,10 +89,15 @@ internal void SetTrailers<T>(T call, Func<T, Status> getStatus, Func<T, Metadata
8889
}
8990
}
9091

91-
internal ValueTask SetHeadersAsync(Task<Metadata> headers)
92+
internal ValueTask SetHeadersAsync(Task<Metadata>? headers)
9293
{
94+
if (headers is null) return default;
9395
var tcs = Interlocked.CompareExchange(ref _headersTaskOrSource, headers, null) as TaskCompletionSource<Metadata>;
94-
if (headers.RanToCompletion())
96+
if (tcs is null)
97+
{
98+
return new ValueTask(headers);
99+
}
100+
else if (headers.RanToCompletion())
95101
{
96102
// headers are sync; update TCS if one
97103
tcs?.TrySetResult(headers.Result);
@@ -103,11 +109,11 @@ internal ValueTask SetHeadersAsync(Task<Metadata> headers)
103109
return Awaited(this, tcs, headers);
104110
}
105111

106-
static async ValueTask Awaited(MetadataContext context, TaskCompletionSource<Metadata>? tcs, Task<Metadata> headers)
112+
static async ValueTask Awaited(MetadataContext context, TaskCompletionSource<Metadata> tcs, Task<Metadata> headers)
107113
{
108114
try
109115
{
110-
tcs?.TrySetResult(await headers.ConfigureAwait(false));
116+
tcs.TrySetResult(await headers.ConfigureAwait(false));
111117
}
112118
catch (RpcException fault)
113119
{

0 commit comments

Comments
 (0)