Skip to content

Commit 2029852

Browse files
authored
Refactor client load balancing resolver (#1491)
1 parent e48a036 commit 2029852

File tree

13 files changed

+273
-253
lines changed

13 files changed

+273
-253
lines changed

src/Grpc.Net.Client/Balancer/DnsResolver.cs

Lines changed: 9 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,9 @@ internal sealed class DnsResolver : Resolver
4444

4545
private readonly Uri _address;
4646
private readonly TimeSpan _refreshInterval;
47-
private readonly ILogger<DnsResolver> _logger;
48-
private readonly CancellationTokenSource _cts;
49-
private readonly object _lock = new object();
47+
private readonly ILogger _logger;
5048

5149
private Timer? _timer;
52-
private Action<ResolverResult>? _listener;
53-
private bool _disposed;
54-
private Task _refreshTask;
5550
private DateTime _lastResolveStart;
5651

5752
// Internal for testing.
@@ -63,28 +58,16 @@ internal sealed class DnsResolver : Resolver
6358
/// <param name="address">The target <see cref="Uri"/>.</param>
6459
/// <param name="loggerFactory">The logger factory.</param>
6560
/// <param name="refreshInterval">An interval for automatically refreshing the DNS hostname.</param>
66-
public DnsResolver(Uri address, ILoggerFactory loggerFactory, TimeSpan refreshInterval)
61+
public DnsResolver(Uri address, ILoggerFactory loggerFactory, TimeSpan refreshInterval) : base(loggerFactory)
6762
{
6863
_address = address;
6964
_refreshInterval = refreshInterval;
70-
_cts = new CancellationTokenSource();
7165
_logger = loggerFactory.CreateLogger<DnsResolver>();
72-
_refreshTask = Task.CompletedTask;
7366
}
7467

75-
/// <inheritdoc />
76-
public override void Start(Action<ResolverResult> listener)
68+
protected override void OnStarted()
7769
{
78-
if (_disposed)
79-
{
80-
throw new ObjectDisposedException(nameof(DnsResolver));
81-
}
82-
if (_listener != null)
83-
{
84-
throw new InvalidOperationException("Resolver has already been started.");
85-
}
86-
87-
_listener = listener;
70+
base.OnStarted();
8871

8972
if (_refreshInterval != Timeout.InfiniteTimeSpan)
9073
{
@@ -94,32 +77,8 @@ public override void Start(Action<ResolverResult> listener)
9477
}
9578

9679
/// <inheritdoc />
97-
public override Task RefreshAsync(CancellationToken cancellationToken)
80+
protected override async Task ResolveAsync(CancellationToken cancellationToken)
9881
{
99-
if (_disposed)
100-
{
101-
throw new ObjectDisposedException(nameof(DnsResolver));
102-
}
103-
if (_listener == null)
104-
{
105-
throw new InvalidOperationException("Resolver hasn't been started.");
106-
}
107-
108-
lock (_lock)
109-
{
110-
if (_refreshTask.IsCompleted)
111-
{
112-
_refreshTask = RefreshCoreAsync(cancellationToken);
113-
}
114-
}
115-
116-
return _refreshTask;
117-
}
118-
119-
private async Task RefreshCoreAsync(CancellationToken cancellationToken)
120-
{
121-
CompatibilityHelpers.Assert(_listener != null);
122-
12382
try
12483
{
12584
var elapsedTimeSinceLastRefresh = SystemClock.UtcNow - _lastResolveStart;
@@ -148,14 +107,14 @@ private async Task RefreshCoreAsync(CancellationToken cancellationToken)
148107
var resolvedPort = _address.Port == -1 ? 80 : _address.Port;
149108
var endpoints = addresses.Select(a => new BalancerAddress(a.ToString(), resolvedPort)).ToArray();
150109
var resolverResult = ResolverResult.ForResult(endpoints);
151-
_listener(resolverResult);
110+
Listener(resolverResult);
152111
}
153112
catch (Exception ex)
154113
{
155114
var message = $"Error getting DNS hosts for address '{_address}'.";
156115

157116
DnsResolverLog.ErrorQueryingDns(_logger, _address, ex);
158-
_listener(ResolverResult.ForFailure(GrpcProtocolHelpers.CreateStatusFromException(message, ex, StatusCode.Unavailable)));
117+
Listener(ResolverResult.ForFailure(GrpcProtocolHelpers.CreateStatusFromException(message, ex, StatusCode.Unavailable)));
159118
}
160119
}
161120

@@ -165,33 +124,13 @@ protected override void Dispose(bool disposing)
165124
base.Dispose(disposing);
166125

167126
_timer?.Dispose();
168-
_cts.Cancel();
169-
_listener = null;
170-
_disposed = true;
171127
}
172128

173-
private async void OnTimerCallback(object? state)
129+
private void OnTimerCallback(object? state)
174130
{
175131
try
176132
{
177-
var awaitRefresh = false;
178-
lock (_lock)
179-
{
180-
if (_refreshTask.IsCompleted)
181-
{
182-
_refreshTask = RefreshCoreAsync(_cts.Token);
183-
awaitRefresh = true;
184-
}
185-
}
186-
187-
if (awaitRefresh)
188-
{
189-
await _refreshTask.ConfigureAwait(false);
190-
}
191-
}
192-
catch (OperationCanceledException)
193-
{
194-
// Don't log cancellation.
133+
Refresh();
195134
}
196135
catch (Exception ex)
197136
{

src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs

Lines changed: 25 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,14 @@
2525
using System.Threading;
2626
using System.Threading.Tasks;
2727
using Grpc.Core;
28-
using Grpc.Net.Client.Balancer.Internal;
2928
using Grpc.Net.Client.Configuration;
3029
using Grpc.Net.Client.Internal;
3130
using Grpc.Shared;
3231
using Microsoft.Extensions.Logging;
3332

3433
namespace Grpc.Net.Client.Balancer.Internal
3534
{
36-
internal class ConnectionManager : IDisposable, IChannelControlHelper
35+
internal sealed class ConnectionManager : IDisposable, IChannelControlHelper
3736
{
3837
private static readonly ServiceConfig DefaultServiceConfig = new ServiceConfig();
3938

@@ -43,13 +42,12 @@ internal class ConnectionManager : IDisposable, IChannelControlHelper
4342
private readonly ISubchannelTransportFactory _subchannelTransportFactory;
4443
private readonly List<Subchannel> _subchannels;
4544
private readonly List<StateWatcher> _stateWatchers;
46-
private readonly CancellationTokenSource _cts;
45+
private readonly TaskCompletionSource<object?> _resolverStartedTcs;
4746

4847
// Internal for testing
4948
internal LoadBalancer? _balancer;
5049
internal SubchannelPicker? _picker;
51-
private Task? _resolverRefreshTask;
52-
private Task? _resolveTask;
50+
private bool _resolverStarted;
5351
private TaskCompletionSource<SubchannelPicker> _nextPickerTcs;
5452
private int _currentSubchannelId;
5553
private ServiceConfig? _previousServiceConfig;
@@ -64,9 +62,9 @@ internal ConnectionManager(
6462
_lock = new object();
6563
_nextPickerLock = new SemaphoreSlim(1);
6664
_nextPickerTcs = new TaskCompletionSource<SubchannelPicker>(TaskCreationOptions.RunContinuationsAsynchronously);
67-
_cts = new CancellationTokenSource();
65+
_resolverStartedTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
6866

69-
Logger = loggerFactory.CreateLogger(GetType());
67+
Logger = loggerFactory.CreateLogger<ConnectionManager>();
7068
LoggerFactory = loggerFactory;
7169

7270
_subchannels = new List<Subchannel>();
@@ -117,31 +115,7 @@ Subchannel IChannelControlHelper.CreateSubchannel(SubchannelOptions options)
117115

118116
void IChannelControlHelper.RefreshResolver()
119117
{
120-
lock (_lock)
121-
{
122-
ConnectionManagerLog.ResolverRefreshRequested(Logger);
123-
124-
if (_resolveTask == null || !_resolveTask.IsCompleted)
125-
{
126-
_resolveTask = ResolveNowAsync(_cts.Token);
127-
}
128-
else
129-
{
130-
ConnectionManagerLog.ResolverRefreshIgnored(Logger);
131-
}
132-
}
133-
}
134-
135-
private async Task ResolveNowAsync(CancellationToken cancellationToken)
136-
{
137-
try
138-
{
139-
await _resolver.RefreshAsync(cancellationToken).ConfigureAwait(false);
140-
}
141-
catch (Exception ex)
142-
{
143-
ConnectionManagerLog.ResolverRefreshError(Logger, ex);
144-
}
118+
_resolver.Refresh();
145119
}
146120

147121
private void OnResolverResult(ResolverResult result)
@@ -217,12 +191,12 @@ private void OnResolverResult(ResolverResult result)
217191
lock (_lock)
218192
{
219193
_balancer.UpdateChannelState(state);
194+
_resolverStartedTcs.TrySetResult(null);
220195
}
221196
}
222197

223198
public void Dispose()
224199
{
225-
_cts.Cancel();
226200
_resolver.Dispose();
227201
_nextPickerLock.Dispose();
228202
lock (_lock)
@@ -279,19 +253,21 @@ private Task EnsureResolverStartedAsync()
279253
{
280254
// Ensure that the resolver has started and has resolved at least once.
281255
// This ensures an inner load balancer has been created and is running.
282-
if (_resolverRefreshTask == null)
256+
if (!_resolverStarted)
283257
{
284258
lock (_lock)
285259
{
286-
if (_resolverRefreshTask == null)
260+
if (!_resolverStarted)
287261
{
288262
_resolver.Start(OnResolverResult);
289-
_resolverRefreshTask = _resolver.RefreshAsync(_cts.Token);
263+
_resolver.Refresh();
264+
265+
_resolverStarted = true;
290266
}
291267
}
292268
}
293269

294-
return _resolverRefreshTask;
270+
return _resolverStartedTcs.Task;
295271
}
296272

297273
public void UpdateState(BalancerState state)
@@ -509,65 +485,41 @@ private record StateWatcher(CancellationToken CancellationToken, ConnectivitySta
509485

510486
internal static class ConnectionManagerLog
511487
{
512-
private static readonly Action<ILogger, Exception?> _resolverRefreshRequested =
513-
LoggerMessage.Define(LogLevel.Trace, new EventId(1, "ResolverRefreshRequested"), "Resolver refresh requested.");
514-
515-
private static readonly Action<ILogger, Exception?> _resolverRefreshIgnored =
516-
LoggerMessage.Define(LogLevel.Trace, new EventId(2, "ResolverRefreshIgnored"), "Resolver refresh ignored because resolve is already in progress.");
517-
518-
private static readonly Action<ILogger, Exception?> _resolverRefreshError =
519-
LoggerMessage.Define(LogLevel.Error, new EventId(3, "ResolverRefreshError"), "Error refreshing resolver.");
520-
521488
private static readonly Action<ILogger, string, Exception?> _resolverUnsupportedLoadBalancingConfig =
522-
LoggerMessage.Define<string>(LogLevel.Warning, new EventId(4, "ResolverUnsupportedLoadBalancingConfig"), "Service config returned by the resolver contains unsupported load balancer policies: {LoadBalancingConfigs}. Load balancer unchanged.");
489+
LoggerMessage.Define<string>(LogLevel.Warning, new EventId(1, "ResolverUnsupportedLoadBalancingConfig"), "Service config returned by the resolver contains unsupported load balancer policies: {LoadBalancingConfigs}. Load balancer unchanged.");
523490

524491
private static readonly Action<ILogger, Exception?> _resolverServiceConfigNotUsed =
525-
LoggerMessage.Define(LogLevel.Debug, new EventId(5, "ResolverServiceConfigNotUsed"), "Service config returned by the resolver not used.");
492+
LoggerMessage.Define(LogLevel.Debug, new EventId(2, "ResolverServiceConfigNotUsed"), "Service config returned by the resolver not used.");
526493

527494
private static readonly Action<ILogger, ConnectivityState, Exception?> _channelStateUpdated =
528-
LoggerMessage.Define<ConnectivityState>(LogLevel.Debug, new EventId(6, "ChannelStateUpdated"), "Channel state updated to {State}.");
495+
LoggerMessage.Define<ConnectivityState>(LogLevel.Debug, new EventId(3, "ChannelStateUpdated"), "Channel state updated to {State}.");
529496

530497
private static readonly Action<ILogger, Exception?> _channelPickerUpdated =
531-
LoggerMessage.Define(LogLevel.Debug, new EventId(7, "ChannelPickerUpdated"), "Channel picker updated.");
498+
LoggerMessage.Define(LogLevel.Debug, new EventId(4, "ChannelPickerUpdated"), "Channel picker updated.");
532499

533500
private static readonly Action<ILogger, Exception?> _pickStarted =
534-
LoggerMessage.Define(LogLevel.Trace, new EventId(8, "PickStarted"), "Pick started.");
501+
LoggerMessage.Define(LogLevel.Trace, new EventId(5, "PickStarted"), "Pick started.");
535502

536503
private static readonly Action<ILogger, int, DnsEndPoint, Exception?> _pickResultSuccessful =
537-
LoggerMessage.Define<int, DnsEndPoint>(LogLevel.Debug, new EventId(9, "PickResultSuccessful"), "Successfully picked subchannel id '{SubchannelId}' with address {CurrentAddress}.");
504+
LoggerMessage.Define<int, DnsEndPoint>(LogLevel.Debug, new EventId(6, "PickResultSuccessful"), "Successfully picked subchannel id '{SubchannelId}' with address {CurrentAddress}.");
538505

539506
private static readonly Action<ILogger, int, Exception?> _pickResultSubchannelNoCurrentAddress =
540-
LoggerMessage.Define<int>(LogLevel.Debug, new EventId(10, "PickResultSubchannelNoCurrentAddress"), "Picked subchannel id '{SubchannelId}' doesn't have a current address.");
507+
LoggerMessage.Define<int>(LogLevel.Debug, new EventId(7, "PickResultSubchannelNoCurrentAddress"), "Picked subchannel id '{SubchannelId}' doesn't have a current address.");
541508

542509
private static readonly Action<ILogger, Exception?> _pickResultQueued =
543-
LoggerMessage.Define(LogLevel.Debug, new EventId(11, "PickResultQueued"), "Picked queued.");
510+
LoggerMessage.Define(LogLevel.Debug, new EventId(8, "PickResultQueued"), "Picked queued.");
544511

545512
private static readonly Action<ILogger, Status, Exception?> _pickResultFailure =
546-
LoggerMessage.Define<Status>(LogLevel.Debug, new EventId(12, "PickResultFailure"), "Picked failure with status: {Status}");
513+
LoggerMessage.Define<Status>(LogLevel.Debug, new EventId(9, "PickResultFailure"), "Picked failure with status: {Status}");
547514

548515
private static readonly Action<ILogger, Status, Exception?> _pickResultFailureWithWaitForReady =
549-
LoggerMessage.Define<Status>(LogLevel.Debug, new EventId(13, "PickResultFailureWithWaitForReady"), "Picked failure with status: {Status}. Retrying because wait for ready is enabled.");
516+
LoggerMessage.Define<Status>(LogLevel.Debug, new EventId(10, "PickResultFailureWithWaitForReady"), "Picked failure with status: {Status}. Retrying because wait for ready is enabled.");
550517

551518
private static readonly Action<ILogger, Exception?> _pickWaiting =
552-
LoggerMessage.Define(LogLevel.Trace, new EventId(14, "PickWaiting"), "Waiting for a new picker.");
519+
LoggerMessage.Define(LogLevel.Trace, new EventId(11, "PickWaiting"), "Waiting for a new picker.");
553520

554521
private static readonly Action<ILogger, Status, Exception?> _resolverServiceConfigFallback =
555-
LoggerMessage.Define<Status>(LogLevel.Debug, new EventId(15, "ResolverServiceConfigFallback"), "Falling back to previously loaded service config. Resolver failure when retreiving or parsing service config with status: {Status}");
556-
557-
public static void ResolverRefreshRequested(ILogger logger)
558-
{
559-
_resolverRefreshRequested(logger, null);
560-
}
561-
562-
public static void ResolverRefreshIgnored(ILogger logger)
563-
{
564-
_resolverRefreshIgnored(logger, null);
565-
}
566-
567-
public static void ResolverRefreshError(ILogger logger, Exception ex)
568-
{
569-
_resolverRefreshError(logger, ex);
570-
}
522+
LoggerMessage.Define<Status>(LogLevel.Debug, new EventId(12, "ResolverServiceConfigFallback"), "Falling back to previously loaded service config. Resolver failure when retreiving or parsing service config with status: {Status}");
571523

572524
public static void ResolverUnsupportedLoadBalancingConfig(ILogger logger, IList<LoadBalancingConfig> loadBalancingConfigs)
573525
{

0 commit comments

Comments
 (0)