Skip to content

Commit 687fbb4

Browse files
authored
Split AsyncResolver out of Resolver (#1573)
1 parent 1215046 commit 687fbb4

File tree

8 files changed

+239
-164
lines changed

8 files changed

+239
-164
lines changed
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
#region Copyright notice and license
2+
3+
// Copyright 2019 The gRPC Authors
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the "License");
6+
// you may not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
#endregion
18+
19+
#if SUPPORT_LOAD_BALANCING
20+
using Grpc.Core;
21+
using Grpc.Net.Client.Internal;
22+
using Microsoft.Extensions.Logging;
23+
24+
namespace Grpc.Net.Client.Balancer
25+
{
26+
/// <summary>
27+
/// An abstract base type for <see cref="Resolver"/> implementations that use asynchronous logic to resolve the <see cref="Uri"/>.
28+
/// <para>
29+
/// <see cref="AsyncResolver"/> adds a virtual <see cref="ResolveAsync"/> method. The resolver runs one asynchronous
30+
/// resolve task at a time. Calling <see cref="Refresh()"/> on the resolver when a resolve task is already running has
31+
/// no effect.
32+
/// </para>
33+
/// </summary>
34+
public abstract class AsyncResolver : Resolver
35+
{
36+
// Internal for testing
37+
internal Task _resolveTask = Task.CompletedTask;
38+
private Action<ResolverResult>? _listener;
39+
private bool _disposed;
40+
41+
private readonly object _lock = new object();
42+
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
43+
private readonly ILogger _logger;
44+
45+
/// <summary>
46+
/// Gets the listener.
47+
/// </summary>
48+
protected Action<ResolverResult> Listener => _listener!;
49+
50+
/// <summary>
51+
/// Initializes a new instance of the <see cref="AsyncResolver"/>.
52+
/// </summary>
53+
/// <param name="loggerFactory">The logger factory.</param>
54+
protected AsyncResolver(ILoggerFactory loggerFactory)
55+
{
56+
if (loggerFactory == null)
57+
{
58+
throw new ArgumentNullException(nameof(loggerFactory));
59+
}
60+
61+
_logger = loggerFactory.CreateLogger<AsyncResolver>();
62+
}
63+
64+
/// <summary>
65+
/// Starts listening to resolver for results with the specified callback. Can only be called once.
66+
/// <para>
67+
/// The <see cref="ResolverResult"/> passed to the callback has addresses when successful,
68+
/// otherwise a <see cref="Status"/> details the resolution error.
69+
/// </para>
70+
/// </summary>
71+
/// <param name="listener">The callback used to receive updates on the target.</param>
72+
public override sealed void Start(Action<ResolverResult> listener)
73+
{
74+
if (listener == null)
75+
{
76+
throw new ArgumentNullException(nameof(listener));
77+
}
78+
79+
if (_listener != null)
80+
{
81+
throw new InvalidOperationException("Resolver has already been started.");
82+
}
83+
84+
_listener = (result) =>
85+
{
86+
Log.ResolverResult(_logger, GetType(), result.Status.StatusCode, result.Addresses?.Count ?? 0);
87+
listener(result);
88+
};
89+
90+
OnStarted();
91+
}
92+
93+
/// <summary>
94+
/// Executes after the resolver starts.
95+
/// </summary>
96+
protected virtual void OnStarted()
97+
{
98+
}
99+
100+
/// <summary>
101+
/// Refresh resolution. Can only be called after <see cref="Start(Action{ResolverResult})"/>.
102+
/// <para>
103+
/// This is only a hint. Implementation takes it as a signal but may not start resolution.
104+
/// </para>
105+
/// </summary>
106+
public sealed override void Refresh()
107+
{
108+
if (_disposed)
109+
{
110+
throw new ObjectDisposedException(nameof(DnsResolver));
111+
}
112+
if (_listener == null)
113+
{
114+
throw new InvalidOperationException("Resolver hasn't been started.");
115+
}
116+
117+
lock (_lock)
118+
{
119+
Log.ResolverRefreshRequested(_logger, GetType());
120+
121+
if (_resolveTask.IsCompleted)
122+
{
123+
_resolveTask = ResolveNowAsync(_cts.Token);
124+
}
125+
else
126+
{
127+
Log.ResolverRefreshIgnored(_logger, GetType());
128+
}
129+
}
130+
}
131+
132+
private async Task ResolveNowAsync(CancellationToken cancellationToken)
133+
{
134+
try
135+
{
136+
await ResolveAsync(cancellationToken).ConfigureAwait(false);
137+
}
138+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
139+
{
140+
// Ignore cancellation.
141+
}
142+
catch (Exception ex)
143+
{
144+
Log.ResolverRefreshError(_logger, GetType(), ex);
145+
146+
var status = GrpcProtocolHelpers.CreateStatusFromException("Error refreshing resolver.", ex);
147+
Listener(ResolverResult.ForFailure(status));
148+
}
149+
}
150+
151+
/// <summary>
152+
/// Resolve the target <see cref="Uri"/>. Updated results are passed to the callback
153+
/// registered by <see cref="Start(Action{ResolverResult})"/>. Can only be called
154+
/// after the resolver has started.
155+
/// <para>
156+
/// This is only a hint. Implementation takes it as a signal but may not start resolution.
157+
/// </para>
158+
/// </summary>
159+
/// <param name="cancellationToken">A cancellation token.</param>
160+
/// <returns>A task.</returns>
161+
protected abstract Task ResolveAsync(CancellationToken cancellationToken);
162+
163+
/// <summary>
164+
/// Releases the unmanaged resources used by the <see cref="LoadBalancer"/> and optionally releases
165+
/// the managed resources.
166+
/// </summary>
167+
/// <param name="disposing">
168+
/// <c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.
169+
/// </param>
170+
protected override void Dispose(bool disposing)
171+
{
172+
_cts.Cancel();
173+
_disposed = true;
174+
}
175+
176+
internal static class Log
177+
{
178+
private static readonly Action<ILogger, string, Exception?> _resolverRefreshRequested =
179+
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(1, "ResolverRefreshRequested"), "{ResolveType} refresh requested.");
180+
181+
private static readonly Action<ILogger, string, Exception?> _resolverRefreshIgnored =
182+
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(2, "ResolverRefreshIgnored"), "{ResolveType} refresh ignored because resolve is already in progress.");
183+
184+
private static readonly Action<ILogger, string, Exception?> _resolverRefreshError =
185+
LoggerMessage.Define<string>(LogLevel.Error, new EventId(3, "ResolverRefreshError"), "Error refreshing {ResolveType}.");
186+
187+
private static readonly Action<ILogger, string, StatusCode, int, Exception?> _resolverResult =
188+
LoggerMessage.Define<string, StatusCode, int>(LogLevel.Trace, new EventId(4, "ResolverResult"), "{ResolveType} result with status code '{StatusCode}' and {AddressCount} addresses.");
189+
190+
public static void ResolverRefreshRequested(ILogger logger, Type resolverType)
191+
{
192+
_resolverRefreshRequested(logger, resolverType.Name, null);
193+
}
194+
195+
public static void ResolverRefreshIgnored(ILogger logger, Type resolverType)
196+
{
197+
_resolverRefreshIgnored(logger, resolverType.Name, null);
198+
}
199+
200+
public static void ResolverRefreshError(ILogger logger, Type resolverType, Exception ex)
201+
{
202+
_resolverRefreshError(logger, resolverType.Name, ex);
203+
}
204+
205+
public static void ResolverResult(ILogger logger, Type resolverType, StatusCode statusCode, int addressCount)
206+
{
207+
_resolverResult(logger, resolverType.Name, statusCode, addressCount, null);
208+
}
209+
}
210+
}
211+
}
212+
#endif

src/Grpc.Net.Client/Balancer/DnsResolver.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace Grpc.Net.Client.Balancer
3737
/// Note: Experimental API that can change or be removed without any prior notice.
3838
/// </para>
3939
/// </summary>
40-
internal sealed class DnsResolver : Resolver
40+
internal sealed class DnsResolver : AsyncResolver
4141
{
4242
// To prevent excessive re-resolution, we enforce a rate limit on DNS resolution requests.
4343
private static readonly TimeSpan MinimumDnsResolutionRate = TimeSpan.FromSeconds(15);

src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -358,13 +358,13 @@ internal static class SocketConnectivitySubchannelTransportLog
358358
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(2, "ConnectedSocket"), "Subchannel id '{SubchannelId}' connected to socket {Address}.");
359359

360360
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _errorConnectingSocket =
361-
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Error, new EventId(3, "ErrorConnectingSocket"), "Subchannel id '{SubchannelId}' error connecting to socket {Address}.");
361+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(3, "ErrorConnectingSocket"), "Subchannel id '{SubchannelId}' error connecting to socket {Address}.");
362362

363363
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _checkingSocket =
364364
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(4, "CheckingSocket"), "Subchannel id '{SubchannelId}' checking socket {Address}.");
365365

366366
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _errorCheckingSocket =
367-
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Error, new EventId(5, "ErrorCheckingSocket"), "Subchannel id '{SubchannelId}' error checking socket {Address}.");
367+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(5, "ErrorCheckingSocket"), "Subchannel id '{SubchannelId}' error checking socket {Address}.");
368368

369369
private static readonly Action<ILogger, int, Exception?> _errorSocketTimer =
370370
LoggerMessage.Define<int>(LogLevel.Error, new EventId(6, "ErrorSocketTimer"), "Subchannel id '{SubchannelId}' unexpected error in check socket timer.");

0 commit comments

Comments
 (0)