77
88namespace StackExchange . Redis . Tests ;
99
10+ [ Collection ( NonParallelCollection . Name ) ]
1011public class FailoverTests : TestBase , IAsyncLifetime
1112{
1213 protected override string GetConfiguration ( ) => GetPrimaryReplicaConfig ( ) . ToString ( ) ;
@@ -196,6 +197,104 @@ public async Task DereplicateGoesToPrimary()
196197 }
197198
198199#if DEBUG
200+ [ Fact ]
201+ public async Task SubscriptionsSurviveConnectionFailureAsync ( )
202+ {
203+ using var conn = ( Create ( allowAdmin : true , shared : false , log : Writer , syncTimeout : 1000 ) as ConnectionMultiplexer ) ! ;
204+
205+ var profiler = conn . AddProfiler ( ) ;
206+ RedisChannel channel = Me ( ) ;
207+ var sub = conn . GetSubscriber ( ) ;
208+ int counter = 0 ;
209+ Assert . True ( sub . IsConnected ( ) ) ;
210+ await sub . SubscribeAsync ( channel , delegate
211+ {
212+ Interlocked . Increment ( ref counter ) ;
213+ } ) . ConfigureAwait ( false ) ;
214+
215+ var profile1 = Log ( profiler ) ;
216+
217+ Assert . Equal ( 1 , conn . GetSubscriptionsCount ( ) ) ;
218+
219+ await Task . Delay ( 200 ) . ConfigureAwait ( false ) ;
220+
221+ await sub . PublishAsync ( channel , "abc" ) . ConfigureAwait ( false ) ;
222+ sub . Ping ( ) ;
223+ await Task . Delay ( 200 ) . ConfigureAwait ( false ) ;
224+
225+ var counter1 = Thread . VolatileRead ( ref counter ) ;
226+ Log ( $ "Expecting 1 message, got { counter1 } ") ;
227+ Assert . Equal ( 1 , counter1 ) ;
228+
229+ var server = GetServer ( conn ) ;
230+ var socketCount = server . GetCounters ( ) . Subscription . SocketCount ;
231+ Log ( $ "Expecting 1 socket, got { socketCount } ") ;
232+ Assert . Equal ( 1 , socketCount ) ;
233+
234+ // We might fail both connections or just the primary in the time period
235+ SetExpectedAmbientFailureCount ( - 1 ) ;
236+
237+ // Make sure we fail all the way
238+ conn . AllowConnect = false ;
239+ Log ( "Failing connection" ) ;
240+ // Fail all connections
241+ server . SimulateConnectionFailure ( SimulatedFailureType . All ) ;
242+ // Trigger failure (RedisTimeoutException or RedisConnectionException because
243+ // of backlog behavior)
244+ var ex = Assert . ThrowsAny < Exception > ( ( ) => sub . Ping ( ) ) ;
245+ Assert . True ( ex is RedisTimeoutException or RedisConnectionException ) ;
246+ Assert . False ( sub . IsConnected ( channel ) ) ;
247+
248+ // Now reconnect...
249+ conn . AllowConnect = true ;
250+ Log ( "Waiting on reconnect" ) ;
251+ // Wait until we're reconnected
252+ await UntilConditionAsync ( TimeSpan . FromSeconds ( 10 ) , ( ) => sub . IsConnected ( channel ) ) ;
253+ Log ( "Reconnected" ) ;
254+ // Ensure we're reconnected
255+ Assert . True ( sub . IsConnected ( channel ) ) ;
256+
257+ // Ensure we've sent the subscribe command after reconnecting
258+ var profile2 = Log ( profiler ) ;
259+ //Assert.Equal(1, profile2.Count(p => p.Command == nameof(RedisCommand.SUBSCRIBE)));
260+
261+ Log ( "Issuing ping after reconnected" ) ;
262+ sub . Ping ( ) ;
263+
264+ var muxerSubCount = conn . GetSubscriptionsCount ( ) ;
265+ Log ( $ "Muxer thinks we have { muxerSubCount } subscriber(s).") ;
266+ Assert . Equal ( 1 , muxerSubCount ) ;
267+
268+ var muxerSubs = conn . GetSubscriptions ( ) ;
269+ foreach ( var pair in muxerSubs )
270+ {
271+ var muxerSub = pair . Value ;
272+ Log ( $ " Muxer Sub: { pair . Key } : (EndPoint: { muxerSub . GetCurrentServer ( ) } , Connected: { muxerSub . IsConnected } )") ;
273+ }
274+
275+ Log ( "Publishing" ) ;
276+ var published = await sub . PublishAsync ( channel , "abc" ) . ConfigureAwait ( false ) ;
277+
278+ Log ( $ "Published to { published } subscriber(s).") ;
279+ Assert . Equal ( 1 , published ) ;
280+
281+ // Give it a few seconds to get our messages
282+ Log ( "Waiting for 2 messages" ) ;
283+ await UntilConditionAsync ( TimeSpan . FromSeconds ( 5 ) , ( ) => Thread . VolatileRead ( ref counter ) == 2 ) ;
284+
285+ var counter2 = Thread . VolatileRead ( ref counter ) ;
286+ Log ( $ "Expecting 2 messages, got { counter2 } ") ;
287+ Assert . Equal ( 2 , counter2 ) ;
288+
289+ // Log all commands at the end
290+ Log ( "All commands since connecting:" ) ;
291+ var profile3 = profiler . FinishProfiling ( ) ;
292+ foreach ( var command in profile3 )
293+ {
294+ Log ( $ "{ command . EndPoint } : { command } ") ;
295+ }
296+ }
297+
199298 [ Fact ]
200299 public async Task SubscriptionsSurvivePrimarySwitchAsync ( )
201300 {
@@ -215,14 +314,8 @@ public async Task SubscriptionsSurvivePrimarySwitchAsync()
215314 var subB = bConn . GetSubscriber ( ) ;
216315
217316 long primaryChanged = 0 , aCount = 0 , bCount = 0 ;
218- aConn . ConfigurationChangedBroadcast += delegate
219- {
220- Log ( "A noticed config broadcast: " + Interlocked . Increment ( ref primaryChanged ) ) ;
221- } ;
222- bConn . ConfigurationChangedBroadcast += delegate
223- {
224- Log ( "B noticed config broadcast: " + Interlocked . Increment ( ref primaryChanged ) ) ;
225- } ;
317+ aConn . ConfigurationChangedBroadcast += ( s , args ) => Log ( "A noticed config broadcast: " + Interlocked . Increment ( ref primaryChanged ) + " (Endpoint:" + args . EndPoint + ")" ) ;
318+ bConn . ConfigurationChangedBroadcast += ( s , args ) => Log ( "B noticed config broadcast: " + Interlocked . Increment ( ref primaryChanged ) + " (Endpoint:" + args . EndPoint + ")" ) ;
226319 subA . Subscribe ( channel , ( _ , message ) =>
227320 {
228321 Log ( "A got message: " + message ) ;
@@ -333,8 +426,8 @@ public async Task SubscriptionsSurvivePrimarySwitchAsync()
333426
334427 Assert . Equal ( 2 , Interlocked . Read ( ref aCount ) ) ;
335428 Assert . Equal ( 2 , Interlocked . Read ( ref bCount ) ) ;
336- // Expect 12, because a sees a, but b sees a and b due to replication
337- Assert . Equal ( 12 , Interlocked . CompareExchange ( ref primaryChanged , 0 , 0 ) ) ;
429+ // Expect 12, because a sees a, but b sees a and b due to replication, but contenders may add their own
430+ Assert . True ( Interlocked . CompareExchange ( ref primaryChanged , 0 , 0 ) >= 12 ) ;
338431 }
339432 catch
340433 {
0 commit comments