Skip to content

Commit 2dc26fd

Browse files
authored
Functional tests of parallel streaming and cancellation (#473)
1 parent 39e68b3 commit 2dc26fd

File tree

5 files changed

+246
-4
lines changed

5 files changed

+246
-4
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#region Copyright notice and license
2+
3+
// Copyright 2019 The gRPC Authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#endregion
18+
19+
using System;
20+
using System.Diagnostics;
21+
using System.IO;
22+
using System.Threading;
23+
using System.Threading.Tasks;
24+
using Google.Protobuf;
25+
using Grpc.Core;
26+
using Grpc.Tests.Shared;
27+
using NUnit.Framework;
28+
using Streaming;
29+
30+
namespace Grpc.AspNetCore.FunctionalTests.Client
31+
{
32+
[TestFixture]
33+
public class CancellationTests : FunctionalTestBase
34+
{
35+
[TestCase(1)]
36+
[TestCase(5)]
37+
[TestCase(20)]
38+
public async Task DuplexStreaming_CancelAfterHeadersInParallel_Success(int tasks)
39+
{
40+
await CancelInParallel(tasks, waitForHeaders: true, interations: 10);
41+
}
42+
43+
[TestCase(1)]
44+
[TestCase(5)]
45+
[TestCase(20)]
46+
public async Task DuplexStreaming_CancelWithoutHeadersInParallel_Success(int tasks)
47+
{
48+
await CancelInParallel(tasks, waitForHeaders: false, interations: 10);
49+
}
50+
51+
private async Task CancelInParallel(int tasks, bool waitForHeaders, int interations)
52+
{
53+
SetExpectedErrorsFilter(writeContext =>
54+
{
55+
if (writeContext.LoggerName == "SERVER FunctionalTestsWebsite.Services.StreamService")
56+
{
57+
// Kestrel cancellation error message
58+
if (writeContext.Exception is IOException &&
59+
writeContext.Exception.Message == "The client reset the request stream.")
60+
{
61+
return true;
62+
}
63+
64+
// Cancellation when service is receiving message
65+
if (writeContext.Exception is InvalidOperationException &&
66+
writeContext.Exception.Message == "Cannot write message after request is complete.")
67+
{
68+
return true;
69+
}
70+
71+
// Cancellation before service writes message
72+
if (writeContext.Exception is TaskCanceledException &&
73+
writeContext.Exception.Message == "A task was canceled.")
74+
{
75+
return true;
76+
}
77+
}
78+
79+
if (writeContext.LoggerName == "Grpc.Net.Client.Internal.GrpcCall")
80+
{
81+
// Cancellation when call hasn't returned headers
82+
if (writeContext.EventId.Name == "ErrorStartingCall" &&
83+
writeContext.Exception is TaskCanceledException)
84+
{
85+
return true;
86+
}
87+
}
88+
89+
return false;
90+
});
91+
92+
// Arrange
93+
var data = new byte[1024 * 64];
94+
95+
var client = new StreamService.StreamServiceClient(Channel);
96+
97+
await TestHelpers.RunParallel(tasks, async () =>
98+
{
99+
for (int i = 0; i < interations; i++)
100+
{
101+
var cts = new CancellationTokenSource();
102+
var headers = new Metadata();
103+
if (waitForHeaders)
104+
{
105+
headers.Add("flush-headers", bool.TrueString);
106+
}
107+
var call = client.EchoAllData(cancellationToken: cts.Token, headers: headers);
108+
109+
if (waitForHeaders)
110+
{
111+
await call.ResponseHeadersAsync.DefaultTimeout();
112+
}
113+
114+
await call.RequestStream.WriteAsync(new DataMessage
115+
{
116+
Data = ByteString.CopyFrom(data)
117+
}).DefaultTimeout();
118+
119+
cts.Cancel();
120+
}
121+
});
122+
}
123+
}
124+
}

test/FunctionalTests/Client/StreamingTests.cs

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
using System.Threading;
2323
using System.Threading.Tasks;
2424
using Google.Protobuf;
25+
using Grpc.AspNetCore.FunctionalTests.Infrastructure;
2526
using Grpc.Core;
2627
using Grpc.Tests.Shared;
2728
using Microsoft.Extensions.Logging;
@@ -47,7 +48,7 @@ public async Task DuplexStream_SendLargeFileBatchedAndRecieveLargeFileBatched_Su
4748
var client = new StreamService.StreamServiceClient(Channel);
4849

4950
// Act
50-
var call = client.DuplexData();
51+
var call = client.BufferAllData();
5152

5253
var sent = 0;
5354
while (sent < data.Length)
@@ -81,7 +82,7 @@ await call.RequestStream.WriteAsync(new DataMessage
8182
}
8283

8384
[Test]
84-
public async Task ClientStream_SendLargeFileBatchedAndRecieveLargeFileBatched_Success()
85+
public async Task ClientStream_SendLargeFileBatched_Success()
8586
{
8687
// Arrange
8788
var total = 1024 * 1024 * 64; // 64 MB
@@ -238,5 +239,88 @@ public async Task DuplexStream_SendToUnimplementedMethodAfterResponseReceived_Mo
238239
Assert.AreEqual(StatusCode.Unimplemented, status.StatusCode);
239240
}
240241
}
242+
243+
const int Size64MB = 1024 * 1024 * 64;
244+
const int Size1MB = 1024 * 1024 * 1;
245+
const int Size64KB = 1024 * 64;
246+
247+
[TestCase(0, 0)]
248+
[TestCase(1, 1)]
249+
[TestCase(2, 1)]
250+
[TestCase(3, 2)]
251+
[TestCase(Size64MB, Size64KB)]
252+
[TestCase(Size64MB, Size1MB)]
253+
public async Task DuplexStreaming_SimultaniousSendAndReceive_Success(int total, int batchSize)
254+
{
255+
// Arrange
256+
var data = new byte[batchSize];
257+
258+
var client = new StreamService.StreamServiceClient(Channel);
259+
260+
var (sent, received) = await EchoData(total, data, client);
261+
262+
// Assert
263+
Assert.AreEqual(sent, total);
264+
Assert.AreEqual(received, total);
265+
}
266+
267+
private async Task<(int sent, int received)> EchoData(int total, byte[] data, StreamService.StreamServiceClient client)
268+
{
269+
var sent = 0;
270+
var received = 0;
271+
var call = client.EchoAllData();
272+
273+
var readTask = Task.Run(async () =>
274+
{
275+
await foreach (var message in call.ResponseStream.ReadAllAsync())
276+
{
277+
received += message.Data.Length;
278+
279+
Logger.LogInformation($"Received {sent} bytes");
280+
}
281+
});
282+
283+
while (sent < total)
284+
{
285+
var writeCount = Math.Min(total - sent, data.Length);
286+
287+
await call.RequestStream.WriteAsync(new DataMessage
288+
{
289+
Data = ByteString.CopyFrom(data, 0, writeCount)
290+
}).DefaultTimeout();
291+
292+
sent += writeCount;
293+
294+
Logger.LogInformation($"Sent {sent} bytes");
295+
}
296+
297+
await call.RequestStream.CompleteAsync().DefaultTimeout();
298+
await readTask;
299+
300+
return (sent, received);
301+
}
302+
303+
[TestCase(1)]
304+
[TestCase(5)]
305+
[TestCase(20)]
306+
public async Task DuplexStreaming_SimultaniousSendAndReceiveInParallel_Success(int tasks)
307+
{
308+
// Arrange
309+
const int total = 1024 * 1024 * 1;
310+
const int batchSize = 1024 * 64;
311+
312+
var data = new byte[batchSize];
313+
314+
var client = new StreamService.StreamServiceClient(Channel);
315+
316+
await TestHelpers.RunParallel(tasks, async () =>
317+
{
318+
var (sent, received) = await EchoData(total, data, client);
319+
320+
// Assert
321+
Assert.AreEqual(sent, total);
322+
Assert.AreEqual(received, total);
323+
});
324+
}
241325
}
242326
}

test/Shared/TestHelpers.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,16 @@ public static async Task AssertIsTrueRetryAsync(Func<bool> assert, string messag
5050

5151
throw new Exception($"Assert failed after {Retrys} retries: {message}");
5252
}
53+
54+
public static Task RunParallel(int count, Func<Task> action)
55+
{
56+
var actionTasks = new Task[count];
57+
for (int i = 0; i < actionTasks.Length; i++)
58+
{
59+
actionTasks[i] = action();
60+
}
61+
62+
return Task.WhenAll(actionTasks);
63+
}
5364
}
5465
}

testassets/FunctionalTestsWebsite/Services/StreamService.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
using System;
2020
using System.IO;
21+
using System.Linq;
2122
using System.Threading;
2223
using System.Threading.Tasks;
2324
using Google.Protobuf;
@@ -36,7 +37,7 @@ public StreamService(ILoggerFactory loggerFactory)
3637
_logger = loggerFactory.CreateLogger<StreamService>();
3738
}
3839

39-
public override async Task DuplexData(
40+
public override async Task BufferAllData(
4041
IAsyncStreamReader<DataMessage> requestStream,
4142
IServerStreamWriter<DataMessage> responseStream,
4243
ServerCallContext context)
@@ -67,6 +68,27 @@ await responseStream.WriteAsync(new DataMessage
6768
}
6869
}
6970

71+
72+
public override async Task EchoAllData(
73+
IAsyncStreamReader<DataMessage> requestStream,
74+
IServerStreamWriter<DataMessage> responseStream,
75+
ServerCallContext context)
76+
{
77+
var flushHeaders = context.RequestHeaders.Any(x => x.Key == "flush-headers");
78+
if (flushHeaders)
79+
{
80+
await context.WriteResponseHeadersAsync(new Metadata());
81+
}
82+
83+
await foreach (var message in requestStream.ReadAllAsync())
84+
{
85+
await responseStream.WriteAsync(new DataMessage
86+
{
87+
Data = message.Data
88+
});
89+
}
90+
}
91+
7092
public override async Task<DataComplete> ClientStreamedData(
7193
IAsyncStreamReader<DataMessage> requestStream,
7294
ServerCallContext context)

testassets/Proto/streaming.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ syntax = "proto3";
1717
package Streaming;
1818

1919
service StreamService {
20-
rpc DuplexData (stream DataMessage) returns (stream DataMessage);
20+
rpc BufferAllData (stream DataMessage) returns (stream DataMessage);
21+
rpc EchoAllData (stream DataMessage) returns (stream DataMessage);
2122
rpc ClientStreamedData (stream DataMessage) returns (DataComplete);
2223
}
2324

0 commit comments

Comments
 (0)