@@ -103,6 +103,42 @@ public async Task ConnectToRabbitMqWithOAuth2TokenShouldDisconnectAfterTimeout()
103103 }
104104 }
105105
106+ [ SkippableFact ]
107+ public async Task ConnectionShouldReconnectWithTheNewToken ( )
108+ {
109+ var recoveryConfiguration = new RecoveryConfiguration ( ) ;
110+ recoveryConfiguration . Topology ( false ) ;
111+ recoveryConfiguration . BackOffDelayPolicy ( new FakeFastBackOffDelay ( ) ) ;
112+
113+ Skip . IfNot ( IsCluster ) ;
114+ IConnection connection = await AmqpConnection . CreateAsync (
115+ ConnectionSettingsBuilder . Create ( )
116+ . Host ( "localhost" )
117+ . Port ( 5672 )
118+ . RecoveryConfiguration ( recoveryConfiguration )
119+ . ContainerId ( _containerId )
120+ . OAuth2Options ( new OAuth2Options ( GenerateToken ( DateTime . UtcNow . AddMilliseconds ( 1_500 ) ) ) )
121+ . Build ( ) ) ;
122+ await connection . RefreshTokenAsync ( GenerateToken ( DateTime . UtcNow . AddMinutes ( 5 ) ) ) ;
123+ TaskCompletionSource < bool > twoRecoveryEventsSeenTcs = CreateTaskCompletionSource < bool > ( ) ;
124+ int recoveryEvents = 0 ;
125+ connection . ChangeState += ( sender , from , to , error ) =>
126+ {
127+ if ( Interlocked . Increment ( ref recoveryEvents ) == 2 )
128+ {
129+ twoRecoveryEventsSeenTcs . SetResult ( true ) ;
130+ }
131+ } ;
132+
133+
134+ Assert . Equal ( State . Open , connection . State ) ;
135+ Thread . Sleep ( TimeSpan . FromSeconds ( 1 ) ) ;
136+ await WaitUntilConnectionIsKilledAndOpen ( _containerId ) ;
137+ Assert . Equal ( State . Open , connection . State ) ;
138+ await WhenTcsCompletes ( twoRecoveryEventsSeenTcs ) ;
139+ await connection . CloseAsync ( ) ;
140+ }
141+
106142 private static string GenerateToken ( DateTime duration )
107143 {
108144 byte [ ] decodedKey = Convert . FromBase64String ( Base64Key ) ;
0 commit comments