Skip to content

Commit c637a4e

Browse files
authored
Improve streaming error messages when call is complete (#1121)
1 parent dfd2cb3 commit c637a4e

File tree

9 files changed

+347
-12
lines changed

9 files changed

+347
-12
lines changed

src/Grpc.AspNetCore.Server/Internal/CallHandlers/ClientStreamingServerCallHandler.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,17 @@ protected override async Task HandleCallAsyncCore(HttpContext httpContext, HttpC
4444
// Disable request body data rate for client streaming
4545
DisableMinRequestBodyDataRateAndMaxRequestBodySize(httpContext);
4646

47+
TResponse? response;
48+
4749
var streamReader = new HttpContextStreamReader<TRequest>(serverCallContext, MethodInvoker.Method.RequestMarshaller.ContextualDeserializer);
48-
var response = await _invoker.Invoke(httpContext, serverCallContext, streamReader);
50+
try
51+
{
52+
response = await _invoker.Invoke(httpContext, serverCallContext, streamReader);
53+
}
54+
finally
55+
{
56+
streamReader.Complete();
57+
}
4958

5059
if (response == null)
5160
{

src/Grpc.AspNetCore.Server/Internal/CallHandlers/DuplexStreamingServerCallHandler.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,22 @@ public DuplexStreamingServerCallHandler(
4242
_invoker = invoker;
4343
}
4444

45-
protected override Task HandleCallAsyncCore(HttpContext httpContext, HttpContextServerCallContext serverCallContext)
45+
protected override async Task HandleCallAsyncCore(HttpContext httpContext, HttpContextServerCallContext serverCallContext)
4646
{
4747
// Disable request body data rate for client streaming
4848
DisableMinRequestBodyDataRateAndMaxRequestBodySize(httpContext);
4949

5050
var streamReader = new HttpContextStreamReader<TRequest>(serverCallContext, MethodInvoker.Method.RequestMarshaller.ContextualDeserializer);
5151
var streamWriter = new HttpContextStreamWriter<TResponse>(serverCallContext, MethodInvoker.Method.ResponseMarshaller.ContextualSerializer);
52-
53-
return _invoker.Invoke(httpContext, serverCallContext, streamReader, streamWriter);
52+
try
53+
{
54+
await _invoker.Invoke(httpContext, serverCallContext, streamReader, streamWriter);
55+
}
56+
finally
57+
{
58+
streamReader.Complete();
59+
streamWriter.Complete();
60+
}
5461
}
5562
}
5663
}

src/Grpc.AspNetCore.Server/Internal/CallHandlers/ServerStreamingServerCallHandler.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,14 @@ protected override async Task HandleCallAsyncCore(HttpContext httpContext, HttpC
4848
var request = await httpContext.Request.BodyReader.ReadSingleMessageAsync<TRequest>(serverCallContext, MethodInvoker.Method.RequestMarshaller.ContextualDeserializer);
4949

5050
var streamWriter = new HttpContextStreamWriter<TResponse>(serverCallContext, MethodInvoker.Method.ResponseMarshaller.ContextualSerializer);
51-
await _invoker.Invoke(httpContext, serverCallContext, request, streamWriter);
51+
try
52+
{
53+
await _invoker.Invoke(httpContext, serverCallContext, request, streamWriter);
54+
}
55+
finally
56+
{
57+
streamWriter.Complete();
58+
}
5259
}
5360
}
5461
}

src/Grpc.AspNetCore.Server/Internal/HttpContextStreamReader.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ internal class HttpContextStreamReader<TRequest> : IAsyncStreamReader<TRequest>
3030

3131
private readonly HttpContextServerCallContext _serverCallContext;
3232
private readonly Func<DeserializationContext, TRequest> _deserializer;
33+
private bool _completed;
3334

3435
public HttpContextStreamReader(HttpContextServerCallContext serverCallContext, Func<DeserializationContext, TRequest> deserializer)
3536
{
@@ -57,6 +58,11 @@ async Task<bool> MoveNextAsync(ValueTask<TRequest?> readStreamTask)
5758
return Task.FromCanceled<bool>(cancellationToken);
5859
}
5960

61+
if (_completed || _serverCallContext.CancellationToken.IsCancellationRequested)
62+
{
63+
return Task.FromException<bool>(new InvalidOperationException("Can't read messages after the request is complete."));
64+
}
65+
6066
var request = _serverCallContext.HttpContext.Request.BodyReader.ReadStreamMessageAsync(_serverCallContext, _deserializer, cancellationToken);
6167
if (!request.IsCompletedSuccessfully)
6268
{
@@ -78,5 +84,10 @@ private bool ProcessPayload(TRequest? request)
7884
Current = request;
7985
return true;
8086
}
87+
88+
public void Complete()
89+
{
90+
_completed = true;
91+
}
8192
}
8293
}

src/Grpc.AspNetCore.Server/Internal/HttpContextStreamWriter.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ internal class HttpContextStreamWriter<TResponse> : IServerStreamWriter<TRespons
2929
private readonly Action<TResponse, SerializationContext> _serializer;
3030
private readonly object _writeLock;
3131
private Task? _writeTask;
32+
private bool _completed;
3233

3334
public HttpContextStreamWriter(HttpContextServerCallContext context, Action<TResponse, SerializationContext> serializer)
3435
{
@@ -50,9 +51,9 @@ public Task WriteAsync(TResponse message)
5051
return Task.FromException(new ArgumentNullException(nameof(message)));
5152
}
5253

53-
if (_context.CancellationToken.IsCancellationRequested)
54+
if (_completed || _context.CancellationToken.IsCancellationRequested)
5455
{
55-
return Task.FromException(new InvalidOperationException("Cannot write message after request is complete."));
56+
return Task.FromException(new InvalidOperationException("Can't write the message because the request is complete."));
5657
}
5758

5859
lock (_writeLock)
@@ -70,6 +71,11 @@ public Task WriteAsync(TResponse message)
7071
return _writeTask;
7172
}
7273

74+
public void Complete()
75+
{
76+
_completed = true;
77+
}
78+
7379
/// <summary>
7480
/// A value indicating whether there is an async write already in progress.
7581
/// Should only check this property when holding the write lock.

test/FunctionalTests/Client/CancellationTests.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,14 @@ private async Task CancelInParallel(int tasks, bool waitForHeaders, int interati
6565

6666
// Cancellation when service is receiving message
6767
if (writeContext.Exception is InvalidOperationException &&
68-
writeContext.Exception.Message == "Cannot write message after request is complete.")
68+
writeContext.Exception.Message == "Can't read messages after the request is complete.")
69+
{
70+
return true;
71+
}
72+
73+
// Cancellation when service is writing message
74+
if (writeContext.Exception is InvalidOperationException &&
75+
writeContext.Exception.Message == "Can't write the message because the request is complete.")
6976
{
7077
return true;
7178
}

0 commit comments

Comments
 (0)