Skip to content

Commit 518a636

Browse files
authored
Fix load balancing subchannel compare and dispose error (#1554)
1 parent a04684a commit 518a636

File tree

12 files changed

+250
-142
lines changed

12 files changed

+250
-142
lines changed

build/dependencies.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<PropertyGroup>
33
<BenchmarkDotNetPackageVersion>0.12.1</BenchmarkDotNetPackageVersion>
44
<GoogleProtobufPackageVersion>3.18.1</GoogleProtobufPackageVersion>
5-
<GrpcDotNetPackageVersion>2.41.0-pre1</GrpcDotNetPackageVersion> <!-- Used by example projects -->
5+
<GrpcDotNetPackageVersion>2.42.0-pre1</GrpcDotNetPackageVersion> <!-- Used by example projects -->
66
<GrpcPackageVersion>2.42.0</GrpcPackageVersion>
77
<MicrosoftAspNetCoreAppPackageVersion>6.0.0</MicrosoftAspNetCoreAppPackageVersion>
88
<MicrosoftAspNetCoreApp5PackageVersion>5.0.3</MicrosoftAspNetCoreApp5PackageVersion>

examples/Container/Frontend/Balancer/ConfigurableResolverFactory.cs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
using System.Linq;
2222
using System.Threading;
2323
using System.Threading.Tasks;
24+
using Grpc.Core;
2425
using Grpc.Net.Client;
2526
using Grpc.Net.Client.Balancer;
2627
using Grpc.Net.Client.Configuration;
@@ -42,7 +43,7 @@ public ConfigurableResolverFactory(ResolverFactory innerResolverFactory, Balance
4243

4344
public override Resolver Create(ResolverOptions options)
4445
{
45-
return new ConfigurableResolver(_innerResolverFactory.Create(options), _balancerConfiguration);
46+
return new ConfigurableResolver(options.LoggerFactory, _innerResolverFactory.Create(options), _balancerConfiguration);
4647
}
4748

4849
private class ConfigurableResolver : Resolver
@@ -51,9 +52,9 @@ private class ConfigurableResolver : Resolver
5152
private readonly BalancerConfiguration _balancerConfiguration;
5253

5354
private ResolverResult? _lastResult;
54-
private Action<ResolverResult>? _listener;
5555

56-
public ConfigurableResolver(Resolver innerResolver, BalancerConfiguration balancerConfiguration)
56+
public ConfigurableResolver(ILoggerFactory loggerFactory, Resolver innerResolver, BalancerConfiguration balancerConfiguration)
57+
: base(loggerFactory)
5758
{
5859
_innerResolver = innerResolver;
5960
_balancerConfiguration = balancerConfiguration;
@@ -63,29 +64,29 @@ public ConfigurableResolver(Resolver innerResolver, BalancerConfiguration balanc
6364
private void OnConfigurationUpdated(object? sender, EventArgs e)
6465
{
6566
// Can't just call RefreshAsync and get new results because of rate limiting.
66-
if (_listener != null && _lastResult != null)
67+
if (Listener != null && _lastResult != null)
6768
{
68-
RaiseResult(_listener, _lastResult);
69+
RaiseResult(_lastResult);
6970
}
7071
}
7172

72-
public override Task RefreshAsync(CancellationToken cancellationToken)
73+
protected override Task ResolveAsync(CancellationToken cancellationToken)
7374
{
74-
return _innerResolver.RefreshAsync(cancellationToken);
75+
_innerResolver.Refresh();
76+
return Task.CompletedTask;
7577
}
7678

77-
public override void Start(Action<ResolverResult> listener)
79+
protected override void OnStarted()
7880
{
79-
_listener = listener;
8081
_innerResolver.Start(result =>
8182
{
8283
_lastResult = result;
8384

84-
RaiseResult(_listener, result);
85+
RaiseResult(result);
8586
});
8687
}
8788

88-
private void RaiseResult(Action<ResolverResult> listener, ResolverResult result)
89+
private void RaiseResult(ResolverResult result)
8990
{
9091
if (result.Addresses != null)
9192
{
@@ -103,12 +104,12 @@ private void RaiseResult(Action<ResolverResult> listener, ResolverResult result)
103104

104105
// DNS results change order between refreshes.
105106
// Explicitly order by host to keep result order consistent.
106-
var orderedAddresses = result.Addresses!.OrderBy(a => a.EndPoint.Host).ToList();
107-
listener(ResolverResult.ForResult(orderedAddresses, serviceConfig));
107+
var orderedAddresses = result.Addresses.OrderBy(a => a.EndPoint.Host).ToList();
108+
Listener(ResolverResult.ForResult(orderedAddresses, serviceConfig, Status.DefaultSuccess));
108109
}
109110
else
110111
{
111-
listener(ResolverResult.ForFailure(result.Status));
112+
Listener(ResolverResult.ForFailure(result.Status));
112113
}
113114
}
114115
}

src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs

Lines changed: 58 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ public void Disconnect()
9393
_initialSocket = null;
9494
_initialSocketAddress = null;
9595
_lastEndPointIndex = 0;
96-
_socketConnectedTimer.Change(TimeSpan.Zero, TimeSpan.Zero);
96+
if (!_disposed)
97+
{
98+
_socketConnectedTimer.Change(TimeSpan.Zero, TimeSpan.Zero);
99+
}
97100
_currentAddress = null;
98101
}
99102
_subchannel.UpdateConnectivityState(ConnectivityState.Idle, "Disconnected.");
@@ -121,9 +124,9 @@ public async ValueTask<bool> TryConnectAsync(CancellationToken cancellationToken
121124

122125
try
123126
{
124-
SocketConnectivitySubchannelTransportLog.ConnectingSocket(_logger, currentAddress);
127+
SocketConnectivitySubchannelTransportLog.ConnectingSocket(_logger, _subchannel.Id, currentAddress);
125128
await socket.ConnectAsync(currentAddress.EndPoint, cancellationToken).ConfigureAwait(false);
126-
SocketConnectivitySubchannelTransportLog.ConnectedSocket(_logger, currentAddress);
129+
SocketConnectivitySubchannelTransportLog.ConnectedSocket(_logger, _subchannel.Id, currentAddress);
127130

128131
lock (Lock)
129132
{
@@ -139,7 +142,7 @@ public async ValueTask<bool> TryConnectAsync(CancellationToken cancellationToken
139142
}
140143
catch (Exception ex)
141144
{
142-
SocketConnectivitySubchannelTransportLog.ErrorConnectingSocket(_logger, currentAddress, ex);
145+
SocketConnectivitySubchannelTransportLog.ErrorConnectingSocket(_logger, _subchannel.Id, currentAddress, ex);
143146

144147
if (firstConnectionError == null)
145148
{
@@ -176,7 +179,7 @@ private async void OnCheckSocketConnection(object? state)
176179
try
177180
{
178181
// Check the socket is still valid by doing a zero byte send.
179-
SocketConnectivitySubchannelTransportLog.CheckingSocket(_logger, _initialSocketAddress);
182+
SocketConnectivitySubchannelTransportLog.CheckingSocket(_logger, _subchannel.Id, _initialSocketAddress);
180183
await socket.SendAsync(Array.Empty<byte>(), SocketFlags.None).ConfigureAwait(false);
181184

182185
// Also poll socket to check if it can be read from.
@@ -186,7 +189,7 @@ private async void OnCheckSocketConnection(object? state)
186189
{
187190
closeSocket = true;
188191
sendException = ex;
189-
SocketConnectivitySubchannelTransportLog.ErrorCheckingSocket(_logger, _initialSocketAddress, ex);
192+
SocketConnectivitySubchannelTransportLog.ErrorCheckingSocket(_logger, _subchannel.Id, _initialSocketAddress, ex);
190193
}
191194

192195
if (closeSocket)
@@ -208,13 +211,13 @@ private async void OnCheckSocketConnection(object? state)
208211
}
209212
catch (Exception ex)
210213
{
211-
SocketConnectivitySubchannelTransportLog.ErrorSocketTimer(_logger, ex);
214+
SocketConnectivitySubchannelTransportLog.ErrorSocketTimer(_logger, _subchannel.Id, ex);
212215
}
213216
}
214217

215218
public async ValueTask<Stream> GetStreamAsync(BalancerAddress address, CancellationToken cancellationToken)
216219
{
217-
SocketConnectivitySubchannelTransportLog.CreatingStream(_logger, address);
220+
SocketConnectivitySubchannelTransportLog.CreatingStream(_logger, _subchannel.Id, address);
218221

219222
Socket? socket = null;
220223
lock (Lock)
@@ -284,7 +287,7 @@ private void OnStreamDisposed(Stream streamWrapper)
284287
if (t.Stream == streamWrapper)
285288
{
286289
_activeStreams.RemoveAt(i);
287-
SocketConnectivitySubchannelTransportLog.DisposingStream(_logger, t.Address);
290+
SocketConnectivitySubchannelTransportLog.DisposingStream(_logger, _subchannel.Id, t.Address);
288291

289292
// If the last active streams is removed then there is no active connection.
290293
disconnect = _activeStreams.Count == 0;
@@ -308,8 +311,12 @@ public void Dispose()
308311
{
309312
lock (Lock)
310313
{
311-
_socketConnectedTimer.Dispose();
312-
_disposed = true;
314+
if (!_disposed)
315+
{
316+
SocketConnectivitySubchannelTransportLog.DisposingTransport(_logger, _subchannel.Id);
317+
_socketConnectedTimer.Dispose();
318+
_disposed = true;
319+
}
313320
}
314321
}
315322

@@ -320,68 +327,76 @@ public void OnRequestComplete(CompletionContext context)
320327

321328
internal static class SocketConnectivitySubchannelTransportLog
322329
{
323-
private static readonly Action<ILogger, BalancerAddress, Exception?> _connectingSocket =
324-
LoggerMessage.Define<BalancerAddress>(LogLevel.Trace, new EventId(1, "ConnectingSocket"), "Connecting socket to {Address}.");
330+
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _connectingSocket =
331+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(1, "ConnectingSocket"), "Subchannel id '{SubchannelId}' connecting socket to {Address}.");
332+
333+
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _connectedSocket =
334+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Debug, new EventId(2, "ConnectedSocket"), "Subchannel id '{SubchannelId}' connected to socket {Address}.");
335+
336+
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _errorConnectingSocket =
337+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Error, new EventId(3, "ErrorConnectingSocket"), "Subchannel id '{SubchannelId}' error connecting to socket {Address}.");
325338

326-
private static readonly Action<ILogger, BalancerAddress, Exception?> _connectedSocket =
327-
LoggerMessage.Define<BalancerAddress>(LogLevel.Debug, new EventId(2, "ConnectedSocket"), "Connected to socket {Address}.");
339+
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _checkingSocket =
340+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(4, "CheckingSocket"), "Subchannel id '{SubchannelId}' checking socket {Address}.");
328341

329-
private static readonly Action<ILogger, BalancerAddress, Exception?> _errorConnectingSocket =
330-
LoggerMessage.Define<BalancerAddress>(LogLevel.Error, new EventId(3, "ErrorConnectingSocket"), "Error connecting to socket {Address}.");
342+
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _errorCheckingSocket =
343+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Error, new EventId(5, "ErrorCheckingSocket"), "Subchannel id '{SubchannelId}' error checking socket {Address}.");
331344

332-
private static readonly Action<ILogger, BalancerAddress, Exception?> _checkingSocket =
333-
LoggerMessage.Define<BalancerAddress>(LogLevel.Trace, new EventId(4, "CheckingSocket"), "Checking socket {Address}.");
345+
private static readonly Action<ILogger, int, Exception?> _errorSocketTimer =
346+
LoggerMessage.Define<int>(LogLevel.Error, new EventId(6, "ErrorSocketTimer"), "Subchannel id '{SubchannelId}' unexpected error in check socket timer.");
334347

335-
private static readonly Action<ILogger, BalancerAddress, Exception?> _errorCheckingSocket =
336-
LoggerMessage.Define<BalancerAddress>(LogLevel.Error, new EventId(5, "ErrorCheckingSocket"), "Error checking socket {Address}.");
348+
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _creatingStream =
349+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(7, "CreatingStream"), "Subchannel id '{SubchannelId}' creating stream for {Address}.");
337350

338-
private static readonly Action<ILogger, Exception?> _errorSocketTimer =
339-
LoggerMessage.Define(LogLevel.Error, new EventId(6, "ErrorSocketTimer"), "Unexpected error in check socket timer.");
351+
private static readonly Action<ILogger, int, BalancerAddress, Exception?> _disposingStream =
352+
LoggerMessage.Define<int, BalancerAddress>(LogLevel.Trace, new EventId(8, "DisposingStream"), "Subchannel id '{SubchannelId}' disposing stream for {Address}.");
340353

341-
private static readonly Action<ILogger, BalancerAddress, Exception?> _creatingStream =
342-
LoggerMessage.Define<BalancerAddress>(LogLevel.Trace, new EventId(7, "CreatingStream"), "Creating stream for {Address}.");
354+
private static readonly Action<ILogger, int, Exception?> _disposingTransport =
355+
LoggerMessage.Define<int>(LogLevel.Trace, new EventId(9, "DisposingTransport"), "Subchannel id '{SubchannelId}' disposing transport.");
343356

344-
private static readonly Action<ILogger, BalancerAddress, Exception?> _disposingStream =
345-
LoggerMessage.Define<BalancerAddress>(LogLevel.Trace, new EventId(8, "DisposingStream"), "Disposing stream for {Address}.");
357+
public static void ConnectingSocket(ILogger logger, int subchannelId, BalancerAddress address)
358+
{
359+
_connectingSocket(logger, subchannelId, address, null);
360+
}
346361

347-
public static void ConnectingSocket(ILogger logger, BalancerAddress address)
362+
public static void ConnectedSocket(ILogger logger, int subchannelId, BalancerAddress address)
348363
{
349-
_connectingSocket(logger, address, null);
364+
_connectedSocket(logger, subchannelId, address, null);
350365
}
351366

352-
public static void ConnectedSocket(ILogger logger, BalancerAddress address)
367+
public static void ErrorConnectingSocket(ILogger logger, int subchannelId, BalancerAddress address, Exception ex)
353368
{
354-
_connectedSocket(logger, address, null);
369+
_errorConnectingSocket(logger, subchannelId, address, ex);
355370
}
356371

357-
public static void ErrorConnectingSocket(ILogger logger, BalancerAddress address, Exception ex)
372+
public static void CheckingSocket(ILogger logger, int subchannelId, BalancerAddress address)
358373
{
359-
_errorConnectingSocket(logger, address, ex);
374+
_checkingSocket(logger, subchannelId, address, null);
360375
}
361376

362-
public static void CheckingSocket(ILogger logger, BalancerAddress address)
377+
public static void ErrorCheckingSocket(ILogger logger, int subchannelId, BalancerAddress address, Exception ex)
363378
{
364-
_checkingSocket(logger, address, null);
379+
_errorCheckingSocket(logger, subchannelId, address, ex);
365380
}
366381

367-
public static void ErrorCheckingSocket(ILogger logger, BalancerAddress address, Exception ex)
382+
public static void ErrorSocketTimer(ILogger logger, int subchannelId, Exception ex)
368383
{
369-
_errorCheckingSocket(logger, address, ex);
384+
_errorSocketTimer(logger, subchannelId, ex);
370385
}
371386

372-
public static void ErrorSocketTimer(ILogger logger, Exception ex)
387+
public static void CreatingStream(ILogger logger, int subchannelId, BalancerAddress address)
373388
{
374-
_errorSocketTimer(logger, ex);
389+
_creatingStream(logger, subchannelId, address, null);
375390
}
376391

377-
public static void CreatingStream(ILogger logger, BalancerAddress address)
392+
public static void DisposingStream(ILogger logger, int subchannelId, BalancerAddress address)
378393
{
379-
_creatingStream(logger, address, null);
394+
_disposingStream(logger, subchannelId, address, null);
380395
}
381396

382-
public static void DisposingStream(ILogger logger, BalancerAddress address)
397+
public static void DisposingTransport(ILogger logger, int subchannelId)
383398
{
384-
_disposingStream(logger, address, null);
399+
_disposingTransport(logger, subchannelId, null);
385400
}
386401
}
387402
#endif

0 commit comments

Comments
 (0)