@@ -49,15 +49,17 @@ namespace Grpc.Net.Client.Balancer.Internal
4949 /// </summary>
5050 internal class SocketConnectivitySubchannelTransport : ISubchannelTransport , IDisposable
5151 {
52+ internal record struct ActiveStream ( BalancerAddress Address , Socket Socket , Stream ? Stream ) ;
53+
5254 private readonly ILogger _logger ;
5355 private readonly Subchannel _subchannel ;
5456 private readonly TimeSpan _socketPingInterval ;
55- private readonly List < ( DnsEndPoint EndPoint , Socket Socket , Stream ? Stream ) > _activeStreams ;
57+ private readonly List < ActiveStream > _activeStreams ;
5658 private readonly Timer _socketConnectedTimer ;
5759
5860 private int _lastEndPointIndex ;
5961 private Socket ? _initialSocket ;
60- private DnsEndPoint ? _initialSocketEndPoint ;
62+ private BalancerAddress ? _initialSocketAddress ;
6163 private bool _disposed ;
6264 private BalancerAddress ? _currentAddress ;
6365
@@ -66,7 +68,7 @@ public SocketConnectivitySubchannelTransport(Subchannel subchannel, TimeSpan soc
6668 _logger = loggerFactory . CreateLogger < SocketConnectivitySubchannelTransport > ( ) ;
6769 _subchannel = subchannel ;
6870 _socketPingInterval = socketPingInterval ;
69- _activeStreams = new List < ( DnsEndPoint , Socket , Stream ? ) > ( ) ;
71+ _activeStreams = new List < ActiveStream > ( ) ;
7072 _socketConnectedTimer = new Timer ( OnCheckSocketConnection , state : null , Timeout . InfiniteTimeSpan , Timeout . InfiniteTimeSpan ) ;
7173 }
7274
@@ -75,7 +77,7 @@ public SocketConnectivitySubchannelTransport(Subchannel subchannel, TimeSpan soc
7577 public bool HasStream { get ; }
7678
7779 // For testing. Take a copy under lock for thread-safety.
78- internal IReadOnlyList < ( DnsEndPoint EndPoint , Socket Socket , Stream ? Stream ) > GetActiveStreams ( )
80+ internal IReadOnlyList < ActiveStream > GetActiveStreams ( )
7981 {
8082 lock ( Lock )
8183 {
@@ -89,7 +91,7 @@ public void Disconnect()
8991 {
9092 _initialSocket ? . Dispose ( ) ;
9193 _initialSocket = null ;
92- _initialSocketEndPoint = null ;
94+ _initialSocketAddress = null ;
9395 _lastEndPointIndex = 0 ;
9496 _socketConnectedTimer . Change ( TimeSpan . Zero , TimeSpan . Zero ) ;
9597 _currentAddress = null ;
@@ -119,16 +121,16 @@ public async ValueTask<bool> TryConnectAsync(CancellationToken cancellationToken
119121
120122 try
121123 {
122- SocketConnectivitySubchannelTransportLog . ConnectingSocket ( _logger , currentAddress . EndPoint ) ;
124+ SocketConnectivitySubchannelTransportLog . ConnectingSocket ( _logger , currentAddress ) ;
123125 await socket . ConnectAsync ( currentAddress . EndPoint , cancellationToken ) . ConfigureAwait ( false ) ;
124- SocketConnectivitySubchannelTransportLog . ConnectedSocket ( _logger , currentAddress . EndPoint ) ;
126+ SocketConnectivitySubchannelTransportLog . ConnectedSocket ( _logger , currentAddress ) ;
125127
126128 lock ( Lock )
127129 {
128130 _currentAddress = currentAddress ;
129131 _lastEndPointIndex = currentIndex ;
130132 _initialSocket = socket ;
131- _initialSocketEndPoint = currentAddress . EndPoint ;
133+ _initialSocketAddress = currentAddress ;
132134 _socketConnectedTimer . Change ( _socketPingInterval , _socketPingInterval ) ;
133135 }
134136
@@ -137,7 +139,7 @@ public async ValueTask<bool> TryConnectAsync(CancellationToken cancellationToken
137139 }
138140 catch ( Exception ex )
139141 {
140- SocketConnectivitySubchannelTransportLog . ErrorConnectingSocket ( _logger , currentAddress . EndPoint , ex ) ;
142+ SocketConnectivitySubchannelTransportLog . ErrorConnectingSocket ( _logger , currentAddress , ex ) ;
141143
142144 if ( firstConnectionError == null )
143145 {
@@ -167,14 +169,14 @@ private async void OnCheckSocketConnection(object? state)
167169 var socket = _initialSocket ;
168170 if ( socket != null )
169171 {
170- CompatibilityHelpers . Assert ( _initialSocketEndPoint != null ) ;
172+ CompatibilityHelpers . Assert ( _initialSocketAddress != null ) ;
171173
172174 var closeSocket = false ;
173175 Exception ? sendException = null ;
174176 try
175177 {
176178 // Check the socket is still valid by doing a zero byte send.
177- SocketConnectivitySubchannelTransportLog . CheckingSocket ( _logger , _initialSocketEndPoint ) ;
179+ SocketConnectivitySubchannelTransportLog . CheckingSocket ( _logger , _initialSocketAddress ) ;
178180 await socket . SendAsync ( Array . Empty < byte > ( ) , SocketFlags . None ) . ConfigureAwait ( false ) ;
179181
180182 // Also poll socket to check if it can be read from.
@@ -184,7 +186,7 @@ private async void OnCheckSocketConnection(object? state)
184186 {
185187 closeSocket = true ;
186188 sendException = ex ;
187- SocketConnectivitySubchannelTransportLog . ErrorCheckingSocket ( _logger , _initialSocketEndPoint , ex ) ;
189+ SocketConnectivitySubchannelTransportLog . ErrorCheckingSocket ( _logger , _initialSocketAddress , ex ) ;
188190 }
189191
190192 if ( closeSocket )
@@ -195,7 +197,7 @@ private async void OnCheckSocketConnection(object? state)
195197 {
196198 _initialSocket . Dispose ( ) ;
197199 _initialSocket = null ;
198- _initialSocketEndPoint = null ;
200+ _initialSocketAddress = null ;
199201 _currentAddress = null ;
200202 _lastEndPointIndex = 0 ;
201203 }
@@ -210,20 +212,20 @@ private async void OnCheckSocketConnection(object? state)
210212 }
211213 }
212214
213- public async ValueTask < Stream > GetStreamAsync ( DnsEndPoint endPoint , CancellationToken cancellationToken )
215+ public async ValueTask < Stream > GetStreamAsync ( BalancerAddress address , CancellationToken cancellationToken )
214216 {
215- SocketConnectivitySubchannelTransportLog . CreatingStream ( _logger , endPoint ) ;
217+ SocketConnectivitySubchannelTransportLog . CreatingStream ( _logger , address ) ;
216218
217219 Socket ? socket = null ;
218220 lock ( Lock )
219221 {
220222 if ( _initialSocket != null &&
221- _initialSocketEndPoint != null &&
222- Equals ( _initialSocketEndPoint , endPoint ) )
223+ _initialSocketAddress != null &&
224+ Equals ( _initialSocketAddress , address ) )
223225 {
224226 socket = _initialSocket ;
225227 _initialSocket = null ;
226- _initialSocketEndPoint = null ;
228+ _initialSocketAddress = null ;
227229 }
228230 }
229231
@@ -242,7 +244,7 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
242244 if ( socket == null )
243245 {
244246 socket = new Socket ( SocketType . Stream , ProtocolType . Tcp ) { NoDelay = true } ;
245- await socket . ConnectAsync ( endPoint , cancellationToken ) . ConfigureAwait ( false ) ;
247+ await socket . ConnectAsync ( address . EndPoint , cancellationToken ) . ConfigureAwait ( false ) ;
246248 }
247249
248250 var networkStream = new NetworkStream ( socket , ownsSocket : true ) ;
@@ -252,7 +254,7 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
252254
253255 lock ( Lock )
254256 {
255- _activeStreams . Add ( ( endPoint , socket , stream ) ) ;
257+ _activeStreams . Add ( new ActiveStream ( address , socket , stream ) ) ;
256258 }
257259
258260 return stream ;
@@ -282,7 +284,7 @@ private void OnStreamDisposed(Stream streamWrapper)
282284 if ( t . Stream == streamWrapper )
283285 {
284286 _activeStreams . RemoveAt ( i ) ;
285- SocketConnectivitySubchannelTransportLog . DisposingStream ( _logger , t . EndPoint ) ;
287+ SocketConnectivitySubchannelTransportLog . DisposingStream ( _logger , t . Address ) ;
286288
287289 // If the last active streams is removed then there is no active connection.
288290 disconnect = _activeStreams . Count == 0 ;
@@ -318,51 +320,51 @@ public void OnRequestComplete(CompletionContext context)
318320
319321 internal static class SocketConnectivitySubchannelTransportLog
320322 {
321- private static readonly Action < ILogger , DnsEndPoint , Exception ? > _connectingSocket =
322- LoggerMessage . Define < DnsEndPoint > ( LogLevel . Trace , new EventId ( 1 , "ConnectingSocket" ) , "Connecting socket to ' {Address}' ." ) ;
323+ private static readonly Action < ILogger , BalancerAddress , Exception ? > _connectingSocket =
324+ LoggerMessage . Define < BalancerAddress > ( LogLevel . Trace , new EventId ( 1 , "ConnectingSocket" ) , "Connecting socket to {Address}." ) ;
323325
324- private static readonly Action < ILogger , DnsEndPoint , Exception ? > _connectedSocket =
325- LoggerMessage . Define < DnsEndPoint > ( LogLevel . Debug , new EventId ( 2 , "ConnectedSocket" ) , "Connected to socket ' {Address}' ." ) ;
326+ private static readonly Action < ILogger , BalancerAddress , Exception ? > _connectedSocket =
327+ LoggerMessage . Define < BalancerAddress > ( LogLevel . Debug , new EventId ( 2 , "ConnectedSocket" ) , "Connected to socket {Address}." ) ;
326328
327- private static readonly Action < ILogger , DnsEndPoint , Exception ? > _errorConnectingSocket =
328- LoggerMessage . Define < DnsEndPoint > ( LogLevel . Error , new EventId ( 3 , "ErrorConnectingSocket" ) , "Error connecting to socket ' {Address}' ." ) ;
329+ private static readonly Action < ILogger , BalancerAddress , Exception ? > _errorConnectingSocket =
330+ LoggerMessage . Define < BalancerAddress > ( LogLevel . Error , new EventId ( 3 , "ErrorConnectingSocket" ) , "Error connecting to socket {Address}." ) ;
329331
330- private static readonly Action < ILogger , DnsEndPoint , Exception ? > _checkingSocket =
331- LoggerMessage . Define < DnsEndPoint > ( LogLevel . Trace , new EventId ( 4 , "CheckingSocket" ) , "Checking socket ' {Address}' ." ) ;
332+ private static readonly Action < ILogger , BalancerAddress , Exception ? > _checkingSocket =
333+ LoggerMessage . Define < BalancerAddress > ( LogLevel . Trace , new EventId ( 4 , "CheckingSocket" ) , "Checking socket {Address}." ) ;
332334
333- private static readonly Action < ILogger , DnsEndPoint , Exception ? > _errorCheckingSocket =
334- LoggerMessage . Define < DnsEndPoint > ( LogLevel . Error , new EventId ( 5 , "ErrorCheckingSocket" ) , "Error checking socket ' {Address}' ." ) ;
335+ private static readonly Action < ILogger , BalancerAddress , Exception ? > _errorCheckingSocket =
336+ LoggerMessage . Define < BalancerAddress > ( LogLevel . Error , new EventId ( 5 , "ErrorCheckingSocket" ) , "Error checking socket {Address}." ) ;
335337
336338 private static readonly Action < ILogger , Exception ? > _errorSocketTimer =
337- LoggerMessage . Define ( LogLevel . Error , new EventId ( 1 , "ErrorSocketTimer" ) , "Unexpected error in check socket timer." ) ;
339+ LoggerMessage . Define ( LogLevel . Error , new EventId ( 6 , "ErrorSocketTimer" ) , "Unexpected error in check socket timer." ) ;
338340
339- private static readonly Action < ILogger , DnsEndPoint , Exception ? > _creatingStream =
340- LoggerMessage . Define < DnsEndPoint > ( LogLevel . Trace , new EventId ( 6 , "CreatingStream" ) , "Creating stream for ' {Address}' ." ) ;
341+ private static readonly Action < ILogger , BalancerAddress , Exception ? > _creatingStream =
342+ LoggerMessage . Define < BalancerAddress > ( LogLevel . Trace , new EventId ( 7 , "CreatingStream" ) , "Creating stream for {Address}." ) ;
341343
342- private static readonly Action < ILogger , DnsEndPoint , Exception ? > _disposingStream =
343- LoggerMessage . Define < DnsEndPoint > ( LogLevel . Trace , new EventId ( 7 , "DisposingStream" ) , "Disposing stream for ' {Address}' ." ) ;
344+ private static readonly Action < ILogger , BalancerAddress , Exception ? > _disposingStream =
345+ LoggerMessage . Define < BalancerAddress > ( LogLevel . Trace , new EventId ( 8 , "DisposingStream" ) , "Disposing stream for {Address}." ) ;
344346
345- public static void ConnectingSocket ( ILogger logger , DnsEndPoint address )
347+ public static void ConnectingSocket ( ILogger logger , BalancerAddress address )
346348 {
347349 _connectingSocket ( logger , address , null ) ;
348350 }
349351
350- public static void ConnectedSocket ( ILogger logger , DnsEndPoint address )
352+ public static void ConnectedSocket ( ILogger logger , BalancerAddress address )
351353 {
352354 _connectedSocket ( logger , address , null ) ;
353355 }
354356
355- public static void ErrorConnectingSocket ( ILogger logger , DnsEndPoint address , Exception ex )
357+ public static void ErrorConnectingSocket ( ILogger logger , BalancerAddress address , Exception ex )
356358 {
357359 _errorConnectingSocket ( logger , address , ex ) ;
358360 }
359361
360- public static void CheckingSocket ( ILogger logger , DnsEndPoint address )
362+ public static void CheckingSocket ( ILogger logger , BalancerAddress address )
361363 {
362364 _checkingSocket ( logger , address , null ) ;
363365 }
364366
365- public static void ErrorCheckingSocket ( ILogger logger , DnsEndPoint address , Exception ex )
367+ public static void ErrorCheckingSocket ( ILogger logger , BalancerAddress address , Exception ex )
366368 {
367369 _errorCheckingSocket ( logger , address , ex ) ;
368370 }
@@ -372,12 +374,12 @@ public static void ErrorSocketTimer(ILogger logger, Exception ex)
372374 _errorSocketTimer ( logger , ex ) ;
373375 }
374376
375- public static void CreatingStream ( ILogger logger , DnsEndPoint address )
377+ public static void CreatingStream ( ILogger logger , BalancerAddress address )
376378 {
377379 _creatingStream ( logger , address , null ) ;
378380 }
379381
380- public static void DisposingStream ( ILogger logger , DnsEndPoint address )
382+ public static void DisposingStream ( ILogger logger , BalancerAddress address )
381383 {
382384 _disposingStream ( logger , address , null ) ;
383385 }
0 commit comments