Skip to content

Commit 7ea6ca0

Browse files
authored
Fix call cancellation impacting other calls (#1688) (#1694)
1 parent 2cf2183 commit 7ea6ca0

File tree

3 files changed

+63
-56
lines changed

3 files changed

+63
-56
lines changed

src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs

Lines changed: 8 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ internal sealed class ConnectionManager : IDisposable, IChannelControlHelper
3636
{
3737
private static readonly ServiceConfig DefaultServiceConfig = new ServiceConfig();
3838

39-
private readonly SemaphoreSlim _nextPickerLock;
4039
private readonly object _lock;
4140
internal readonly Resolver _resolver;
4241
private readonly ISubchannelTransportFactory _subchannelTransportFactory;
@@ -47,6 +46,8 @@ internal sealed class ConnectionManager : IDisposable, IChannelControlHelper
4746
// Internal for testing
4847
internal LoadBalancer? _balancer;
4948
internal SubchannelPicker? _picker;
49+
// Cache picker wrapped in task once and reuse.
50+
private Task<SubchannelPicker>? _pickerTask;
5051
private bool _resolverStarted;
5152
private TaskCompletionSource<SubchannelPicker> _nextPickerTcs;
5253
private int _currentSubchannelId;
@@ -61,7 +62,6 @@ internal ConnectionManager(
6162
LoadBalancerFactory[] loadBalancerFactories)
6263
{
6364
_lock = new object();
64-
_nextPickerLock = new SemaphoreSlim(1);
6565
_nextPickerTcs = new TaskCompletionSource<SubchannelPicker>(TaskCreationOptions.RunContinuationsAsynchronously);
6666
_resolverStartedTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
6767

@@ -200,7 +200,6 @@ private void OnResolverResult(ResolverResult result)
200200
public void Dispose()
201201
{
202202
_resolver.Dispose();
203-
_nextPickerLock.Dispose();
204203
lock (_lock)
205204
{
206205
_balancer?.Dispose();
@@ -301,6 +300,7 @@ public void UpdateState(BalancerState state)
301300
{
302301
ConnectionManagerLog.ChannelPickerUpdated(Logger);
303302
_picker = state.Picker;
303+
_pickerTask = Task.FromResult(state.Picker);
304304
_nextPickerTcs.SetResult(state.Picker);
305305
_nextPickerTcs = new TaskCompletionSource<SubchannelPicker>(TaskCreationOptions.RunContinuationsAsynchronously);
306306
}
@@ -365,70 +365,22 @@ public void UpdateState(BalancerState state)
365365
}
366366
}
367367

368-
private
369-
#if !NETSTANDARD2_0
370-
ValueTask<SubchannelPicker>
371-
#else
372-
Task<SubchannelPicker>
373-
#endif
374-
GetPickerAsync(SubchannelPicker? currentPicker, CancellationToken cancellationToken)
368+
private Task<SubchannelPicker> GetPickerAsync(SubchannelPicker? currentPicker, CancellationToken cancellationToken)
375369
{
376370
lock (_lock)
377371
{
378372
if (_picker != null && _picker != currentPicker)
379373
{
380-
#if !NETSTANDARD2_0
381-
return new ValueTask<SubchannelPicker>(_picker);
382-
#else
383-
return Task.FromResult<SubchannelPicker>(_picker);
384-
#endif
374+
Debug.Assert(_pickerTask != null);
375+
return _pickerTask;
385376
}
386377
else
387378
{
388-
return GetNextPickerAsync(cancellationToken);
389-
}
390-
}
391-
}
392-
393-
private async
394-
#if !NETSTANDARD2_0
395-
ValueTask<SubchannelPicker>
396-
#else
397-
Task<SubchannelPicker>
398-
#endif
399-
GetNextPickerAsync(CancellationToken cancellationToken)
400-
{
401-
ConnectionManagerLog.PickWaiting(Logger);
402-
403-
Debug.Assert(Monitor.IsEntered(_lock));
404-
405-
var nextPickerTcs = _nextPickerTcs;
379+
ConnectionManagerLog.PickWaiting(Logger);
406380

407-
await _nextPickerLock.WaitAsync(cancellationToken).ConfigureAwait(false);
408-
try
409-
{
410-
using (cancellationToken.Register(
411-
static s => ((TaskCompletionSource<SubchannelPicker?>)s!).TrySetCanceled(),
412-
nextPickerTcs))
413-
{
414-
try
415-
{
416-
return await nextPickerTcs.Task.ConfigureAwait(false);
417-
}
418-
finally
419-
{
420-
// Picker can throw when canceled so reset picker in finally block.
421-
lock (_lock)
422-
{
423-
_nextPickerTcs = new TaskCompletionSource<SubchannelPicker>(TaskCreationOptions.RunContinuationsAsynchronously);
424-
}
425-
}
381+
return _nextPickerTcs.Task.WaitAsync(cancellationToken);
426382
}
427383
}
428-
finally
429-
{
430-
_nextPickerLock.Release();
431-
}
432384
}
433385

434386
internal Task WaitForStateChangedAsync(ConnectivityState lastObservedState, ConnectivityState? waitForState, CancellationToken cancellationToken)

src/Shared/CompatibilityHelpers.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,16 @@ public static int IndexOf(string s, char value, StringComparison comparisonType)
4444
return s.IndexOf(value, comparisonType);
4545
#endif
4646
}
47+
48+
#if !NET6_0_OR_GREATER
49+
public async static Task<T> WaitAsync<T>(this Task<T> task, CancellationToken cancellationToken)
50+
{
51+
var tcs = new TaskCompletionSource<T>();
52+
using (cancellationToken.Register(static s => ((TaskCompletionSource<T>)s!).TrySetCanceled(), tcs))
53+
{
54+
return await (await Task.WhenAny(task, tcs.Task).ConfigureAwait(false)).ConfigureAwait(false);
55+
}
56+
}
57+
#endif
4758
}
4859
}

test/FunctionalTests/Balancer/PickFirstBalancerTests.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,50 @@ private GrpcChannel CreateGrpcWebChannel(TestServerEndpointName endpointName, Ve
6161
return channel;
6262
}
6363

64+
[Test]
65+
public async Task UnaryCall_CallAfterCancellation_Success()
66+
{
67+
// Ignore errors
68+
SetExpectedErrorsFilter(writeContext =>
69+
{
70+
return true;
71+
});
72+
73+
string? host = null;
74+
Task<HelloReply> UnaryMethod(HelloRequest request, ServerCallContext context)
75+
{
76+
host = context.Host;
77+
return Task.FromResult(new HelloReply { Message = request.Name });
78+
}
79+
80+
// Arrange
81+
using var endpoint = BalancerHelpers.CreateGrpcEndpoint<HelloRequest, HelloReply>(50051, UnaryMethod, nameof(UnaryMethod));
82+
83+
var channel = await BalancerHelpers.CreateChannel(LoggerFactory, new PickFirstConfig(), new[] { endpoint.Address }).DefaultTimeout();
84+
var client = TestClientFactory.Create(channel, endpoint.Method);
85+
86+
// Kill endpoint so client can't connect.
87+
endpoint.Dispose();
88+
89+
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(0.5));
90+
cts.Token.Register(() => Logger.LogInformation("Cancellation token raised"));
91+
92+
// Start call that is canceled while getting picker.
93+
await ExceptionAssert.ThrowsAsync<RpcException>(() => client.UnaryCall(
94+
new HelloRequest { Name = "Balancer" },
95+
new CallOptions(cancellationToken: cts.Token)).ResponseAsync).DefaultTimeout();
96+
97+
// Restart endpoint.
98+
using var endpoint1 = BalancerHelpers.CreateGrpcEndpoint<HelloRequest, HelloReply>(50051, UnaryMethod, nameof(UnaryMethod));
99+
100+
// Act
101+
var reply = await client.UnaryCall(new HelloRequest { Name = "Balancer" }, new CallOptions().WithWaitForReady(true)).ResponseAsync.DefaultTimeout();
102+
103+
// Assert
104+
Assert.AreEqual("Balancer", reply.Message);
105+
Assert.AreEqual("127.0.0.1:50051", host);
106+
}
107+
64108
[Test]
65109
public async Task UnaryCall_ReconnectBetweenCalls_Success()
66110
{

0 commit comments

Comments
 (0)