Skip to content

Commit 2ff1f4e

Browse files
authored
Subchannel sockets transport cleanup and tests (#1562)
1 parent eb813a7 commit 2ff1f4e

File tree

5 files changed

+204
-48
lines changed

5 files changed

+204
-48
lines changed

src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs

Lines changed: 66 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ internal record struct ActiveStream(BalancerAddress Address, Socket Socket, Stre
5858
private readonly Timer _socketConnectedTimer;
5959

6060
private int _lastEndPointIndex;
61-
private Socket? _initialSocket;
61+
internal Socket? _initialSocket;
6262
private BalancerAddress? _initialSocketAddress;
6363
private bool _disposed;
6464
private BalancerAddress? _currentAddress;
@@ -89,19 +89,29 @@ public void Disconnect()
8989
{
9090
lock (Lock)
9191
{
92-
_initialSocket?.Dispose();
93-
_initialSocket = null;
94-
_initialSocketAddress = null;
95-
_lastEndPointIndex = 0;
96-
if (!_disposed)
92+
if (_disposed)
9793
{
98-
_socketConnectedTimer.Change(TimeSpan.Zero, TimeSpan.Zero);
94+
return;
9995
}
100-
_currentAddress = null;
96+
97+
DisconnectUnsynchronized();
98+
_socketConnectedTimer.Change(TimeSpan.Zero, TimeSpan.Zero);
10199
}
102100
_subchannel.UpdateConnectivityState(ConnectivityState.Idle, "Disconnected.");
103101
}
104102

103+
private void DisconnectUnsynchronized()
104+
{
105+
Debug.Assert(Monitor.IsEntered(Lock));
106+
Debug.Assert(!_disposed);
107+
108+
_initialSocket?.Dispose();
109+
_initialSocket = null;
110+
_initialSocketAddress = null;
111+
_lastEndPointIndex = 0;
112+
_currentAddress = null;
113+
}
114+
105115
public async ValueTask<bool> TryConnectAsync(CancellationToken cancellationToken)
106116
{
107117
Debug.Assert(CurrentAddress == null);
@@ -196,13 +206,14 @@ private async void OnCheckSocketConnection(object? state)
196206
{
197207
lock (Lock)
198208
{
209+
if (_disposed)
210+
{
211+
return;
212+
}
213+
199214
if (_initialSocket == socket)
200215
{
201-
_initialSocket.Dispose();
202-
_initialSocket = null;
203-
_initialSocketAddress = null;
204-
_currentAddress = null;
205-
_lastEndPointIndex = 0;
216+
DisconnectUnsynchronized();
206217
}
207218
}
208219
_subchannel.UpdateConnectivityState(ConnectivityState.Idle, new Status(StatusCode.Unavailable, "Lost connection to socket.", sendException));
@@ -278,45 +289,58 @@ private static bool IsSocketInBadState(Socket socket)
278289

279290
private void OnStreamDisposed(Stream streamWrapper)
280291
{
281-
var disconnect = false;
282-
lock (Lock)
292+
try
283293
{
284-
for (var i = _activeStreams.Count - 1; i >= 0; i--)
294+
var disconnect = false;
295+
lock (Lock)
285296
{
286-
var t = _activeStreams[i];
287-
if (t.Stream == streamWrapper)
297+
for (var i = _activeStreams.Count - 1; i >= 0; i--)
288298
{
289-
_activeStreams.RemoveAt(i);
290-
SocketConnectivitySubchannelTransportLog.DisposingStream(_logger, _subchannel.Id, t.Address);
299+
var t = _activeStreams[i];
300+
if (t.Stream == streamWrapper)
301+
{
302+
_activeStreams.RemoveAt(i);
303+
SocketConnectivitySubchannelTransportLog.DisposingStream(_logger, _subchannel.Id, t.Address);
291304

292-
// If the last active streams is removed then there is no active connection.
293-
disconnect = _activeStreams.Count == 0;
305+
// If the last active streams is removed then there is no active connection.
306+
disconnect = _activeStreams.Count == 0;
294307

295-
break;
308+
break;
309+
}
296310
}
297311
}
298-
}
299312

300-
if (disconnect)
313+
if (disconnect)
314+
{
315+
// What happens after disconnect depends if the load balancer requests a new connection.
316+
// For example:
317+
// - Pick first will go into an idle state.
318+
// - Round-robin will reconnect to get back to a ready state.
319+
Disconnect();
320+
}
321+
}
322+
catch (Exception ex)
301323
{
302-
// What happens after disconnect depends if the load balancer requests a new connection.
303-
// For example:
304-
// - Pick first will go into an idle state.
305-
// - Round-robin will reconnect to get back to a ready state.
306-
Disconnect();
324+
// Don't throw error to Stream.Dispose() caller.
325+
SocketConnectivitySubchannelTransportLog.ErrorOnDisposingStream(_logger, _subchannel.Id, ex);
307326
}
308327
}
309328

310329
public void Dispose()
311330
{
312331
lock (Lock)
313332
{
314-
if (!_disposed)
333+
if (_disposed)
315334
{
316-
SocketConnectivitySubchannelTransportLog.DisposingTransport(_logger, _subchannel.Id);
317-
_socketConnectedTimer.Dispose();
318-
_disposed = true;
335+
return;
319336
}
337+
338+
SocketConnectivitySubchannelTransportLog.DisposingTransport(_logger, _subchannel.Id);
339+
340+
DisconnectUnsynchronized();
341+
342+
_socketConnectedTimer.Dispose();
343+
_disposed = true;
320344
}
321345
}
322346

@@ -354,6 +378,9 @@ internal static class SocketConnectivitySubchannelTransportLog
354378
private static readonly Action<ILogger, int, Exception?> _disposingTransport =
355379
LoggerMessage.Define<int>(LogLevel.Trace, new EventId(9, "DisposingTransport"), "Subchannel id '{SubchannelId}' disposing transport.");
356380

381+
private static readonly Action<ILogger, int, Exception> _errorOnDisposingStream =
382+
LoggerMessage.Define<int>(LogLevel.Error, new EventId(10, "ErrorOnDisposingStream"), "Subchannel id '{SubchannelId}' unexpected error when reacting to transport stream dispose.");
383+
357384
public static void ConnectingSocket(ILogger logger, int subchannelId, BalancerAddress address)
358385
{
359386
_connectingSocket(logger, subchannelId, address, null);
@@ -398,6 +425,11 @@ public static void DisposingTransport(ILogger logger, int subchannelId)
398425
{
399426
_disposingTransport(logger, subchannelId, null);
400427
}
428+
429+
public static void ErrorOnDisposingStream(ILogger logger, int subchannelId, Exception ex)
430+
{
431+
_errorOnDisposingStream(logger, subchannelId, ex);
432+
}
401433
}
402434
#endif
403435
}

test/FunctionalTests/Balancer/BalancerHelpers.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public EndpointContext(GrpcTestFixture<Startup> server, Method<TRequest, TRespon
7676
public Method<TRequest, TResponse> Method { get; }
7777
public Uri Address { get; }
7878
public ILoggerFactory LoggerFactory => _server.LoggerFactory;
79+
public EndPoint EndPoint => new DnsEndPoint(Address.Host, Address.Port);
7980

8081
public void Dispose()
8182
{
@@ -172,7 +173,13 @@ public static async Task WaitForChannelStatesAsync(ILogger logger, GrpcChannel c
172173
logger.LogInformation($"Channel id {channelId}: Current channel state '{currentState}' matches expected states {statesText}.");
173174
}
174175

175-
public static async Task<Subchannel[]> WaitForSubChannelsToBeReadyAsync(ILogger logger, GrpcChannel channel, int expectedCount, Func<SubchannelPicker?, Subchannel[]>? getPickerSubchannels = null)
176+
public static async Task<Subchannel> WaitForSubchannelToBeReadyAsync(ILogger logger, GrpcChannel channel, Func<SubchannelPicker?, Subchannel[]>? getPickerSubchannels = null)
177+
{
178+
var subChannel = (await WaitForSubchannelsToBeReadyAsync(logger, channel, 1)).Single();
179+
return subChannel;
180+
}
181+
182+
public static async Task<Subchannel[]> WaitForSubchannelsToBeReadyAsync(ILogger logger, GrpcChannel channel, int expectedCount, Func<SubchannelPicker?, Subchannel[]>? getPickerSubchannels = null)
176183
{
177184
if (getPickerSubchannels == null)
178185
{

test/FunctionalTests/Balancer/LeastUsedBalancerTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ async Task<HelloReply> UnaryMethod(HelloRequest request, ServerCallContext conte
6767

6868
var channel = await BalancerHelpers.CreateChannel(LoggerFactory, new LoadBalancingConfig("least_used"), new[] { endpoint1.Address, endpoint2.Address }, connect: true);
6969

70-
await BalancerHelpers.WaitForSubChannelsToBeReadyAsync(
70+
await BalancerHelpers.WaitForSubchannelsToBeReadyAsync(
7171
Logger,
7272
channel,
7373
expectedCount: 2,

test/FunctionalTests/Balancer/PickFirstBalancerTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ Task<HelloReply> UnaryMethod(HelloRequest request, ServerCallContext context)
194194
Logger.LogInformation("Ending " + endpoint1.Address);
195195
endpoint1.Dispose();
196196

197-
await BalancerHelpers.WaitForSubChannelsToBeReadyAsync(Logger, channel, expectedCount: 1,
197+
await BalancerHelpers.WaitForSubchannelsToBeReadyAsync(Logger, channel, expectedCount: 1,
198198
getPickerSubchannels: picker=>
199199
{
200200
// We want a subchannel that has no current address

0 commit comments

Comments
 (0)