8
8
import org .slf4j .Logger ;
9
9
import org .slf4j .LoggerFactory ;
10
10
import redis .clients .jedis .*;
11
+ import redis .clients .jedis .MultiClusterClientConfig .ClusterConfig ;
11
12
import redis .clients .jedis .providers .MultiClusterPooledConnectionProvider ;
12
13
import redis .clients .jedis .exceptions .JedisConnectionException ;
13
14
14
15
import java .io .IOException ;
15
- import java .time .Duration ;
16
16
import java .time .Instant ;
17
17
import java .util .HashMap ;
18
18
import java .util .Map ;
25
25
import static org .junit .jupiter .api .Assertions .fail ;
26
26
import static org .junit .jupiter .api .Assumptions .assumeTrue ;
27
27
28
- @ Tags ({
29
- @ Tag ("failover" ),
30
- @ Tag ("scenario" )
31
- })
28
+ @ Tags ({ @ Tag ("failover" ), @ Tag ("scenario" ) })
32
29
public class ActiveActiveFailoverTest {
33
30
private static final Logger log = LoggerFactory .getLogger (ActiveActiveFailoverTest .class );
34
31
@@ -52,13 +49,13 @@ public void testFailover() {
52
49
MultiClusterClientConfig .ClusterConfig [] clusterConfig = new MultiClusterClientConfig .ClusterConfig [2 ];
53
50
54
51
JedisClientConfig config = endpoint .getClientConfigBuilder ()
55
- .socketTimeoutMillis (RecommendedSettings .DEFAULT_TIMEOUT_MS )
56
- .connectionTimeoutMillis (RecommendedSettings .DEFAULT_TIMEOUT_MS ).build ();
52
+ .socketTimeoutMillis (RecommendedSettings .DEFAULT_TIMEOUT_MS )
53
+ .connectionTimeoutMillis (RecommendedSettings .DEFAULT_TIMEOUT_MS ).build ();
57
54
58
- clusterConfig [0 ] = new MultiClusterClientConfig . ClusterConfig (endpoint .getHostAndPort (0 ),
59
- config , RecommendedSettings .poolConfig );
60
- clusterConfig [1 ] = new MultiClusterClientConfig . ClusterConfig (endpoint .getHostAndPort (1 ),
61
- config , RecommendedSettings .poolConfig );
55
+ clusterConfig [0 ] = ClusterConfig . builder (endpoint .getHostAndPort (0 ), config )
56
+ . connectionPoolConfig ( RecommendedSettings .poolConfig ). weight ( 1.0f ). build ( );
57
+ clusterConfig [1 ] = ClusterConfig . builder (endpoint .getHostAndPort (1 ), config )
58
+ . connectionPoolConfig ( RecommendedSettings .poolConfig ). weight ( 0.5f ). build ( );
62
59
63
60
MultiClusterClientConfig .Builder builder = new MultiClusterClientConfig .Builder (clusterConfig );
64
61
@@ -67,6 +64,10 @@ public void testFailover() {
67
64
builder .circuitBreakerSlidingWindowMinCalls (1 );
68
65
builder .circuitBreakerFailureRateThreshold (10.0f ); // percentage of failures to trigger circuit breaker
69
66
67
+ builder .failbackSupported (true );
68
+ builder .failbackCheckInterval (1000 );
69
+ builder .gracePeriod (10000 );
70
+
70
71
builder .retryWaitDuration (10 );
71
72
builder .retryMaxAttempts (1 );
72
73
builder .retryWaitDurationExponentialBackoffMultiplier (1 );
@@ -79,24 +80,30 @@ class FailoverReporter implements Consumer<String> {
79
80
80
81
Instant failoverAt = null ;
81
82
83
+ boolean failbackHappened = false ;
84
+
85
+ Instant failbackAt = null ;
86
+
82
87
public String getCurrentClusterName () {
83
88
return currentClusterName ;
84
89
}
85
90
86
91
@ Override
87
92
public void accept (String clusterName ) {
88
93
this .currentClusterName = clusterName ;
89
- log .info (
90
- "\n \n ====FailoverEvent=== \n Jedis failover to cluster: {}\n ====FailoverEvent===\n \n " ,
91
- clusterName );
92
-
93
- failoverHappened = true ;
94
- failoverAt = Instant .now ();
94
+ log .info ("\n \n ====FailoverEvent=== \n Jedis failover to cluster: {}\n ====FailoverEvent===\n \n " , clusterName );
95
+
96
+ if (failoverHappened ) {
97
+ failbackHappened = true ;
98
+ failbackAt = Instant .now ();
99
+ } else {
100
+ failoverHappened = true ;
101
+ failoverAt = Instant .now ();
102
+ }
95
103
}
96
104
}
97
105
98
- MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider (
99
- builder .build ());
106
+ MultiClusterPooledConnectionProvider provider = new MultiClusterPooledConnectionProvider (builder .build ());
100
107
FailoverReporter reporter = new FailoverReporter ();
101
108
provider .setClusterFailoverPostProcessor (reporter );
102
109
provider .setActiveCluster (endpoint .getHostAndPort (0 ));
@@ -117,15 +124,17 @@ public void accept(String clusterName) {
117
124
int retryingDelay = 5 ;
118
125
while (true ) {
119
126
try {
120
- Map <String , String > executionInfo = new HashMap <String , String >() {{
121
- put ("threadId" , String .valueOf (threadId ));
122
- put ("cluster" , reporter .getCurrentClusterName ());
123
- }};
127
+ Map <String , String > executionInfo = new HashMap <String , String >() {
128
+ {
129
+ put ("threadId" , String .valueOf (threadId ));
130
+ put ("cluster" , reporter .getCurrentClusterName ());
131
+ }
132
+ };
124
133
client .xadd ("execution_log" , StreamEntryID .NEW_ENTRY , executionInfo );
125
134
126
135
if (attempt > 0 ) {
127
136
log .info ("Thread {} recovered after {} ms. Threads still not recovered: {}" , threadId ,
128
- attempt * retryingDelay , retryingThreadsCounter .decrementAndGet ());
137
+ attempt * retryingDelay , retryingThreadsCounter .decrementAndGet ());
129
138
}
130
139
131
140
break ;
@@ -134,15 +143,13 @@ public void accept(String clusterName) {
134
143
if (reporter .failoverHappened ) {
135
144
long failedCommands = failedCommandsAfterFailover .incrementAndGet ();
136
145
lastFailedCommandAt .set (Instant .now ());
137
- log .warn (
138
- "Thread {} failed to execute command after failover. Failed commands after failover: {}" ,
139
- threadId , failedCommands );
146
+ log .warn ("Thread {} failed to execute command after failover. Failed commands after failover: {}" , threadId ,
147
+ failedCommands );
140
148
}
141
149
142
150
if (attempt == 0 ) {
143
151
long failedThreads = retryingThreadsCounter .incrementAndGet ();
144
- log .warn ("Thread {} failed to execute command. Failed threads: {}" , threadId ,
145
- failedThreads );
152
+ log .warn ("Thread {} failed to execute command. Failed threads: {}" , threadId , failedThreads );
146
153
}
147
154
try {
148
155
Thread .sleep (retryingDelay );
@@ -153,20 +160,21 @@ public void accept(String clusterName) {
153
160
}
154
161
}
155
162
return true ;
156
- }, 18 );
163
+ }, 4 );
157
164
fakeApp .setKeepExecutingForSeconds (30 );
158
165
Thread t = new Thread (fakeApp );
159
166
t .start ();
160
167
161
168
HashMap <String , Object > params = new HashMap <>();
162
169
params .put ("bdb_id" , endpoint .getBdbId ());
163
- params .put ("rlutil_command" , "pause_bdb" );
170
+ params .put ("actions" ,
171
+ "[{\" type\" :\" execute_rlutil_command\" ,\" params\" :{\" rlutil_command\" :\" pause_bdb\" }},{\" type\" :\" wait\" ,\" params\" :{\" wait_time\" :\" 15\" }},{\" type\" :\" execute_rlutil_command\" ,\" params\" :{\" rlutil_command\" :\" resume_bdb\" }}]" );
164
172
165
173
FaultInjectionClient .TriggerActionResponse actionResponse = null ;
166
174
167
175
try {
168
- log .info ("Triggering bdb_pause" );
169
- actionResponse = faultClient .triggerAction ("execute_rlutil_command " , params );
176
+ log .info ("Triggering bdb_pause + wait 15 seconds + bdb_resume " );
177
+ actionResponse = faultClient .triggerAction ("sequence_of_actions " , params );
170
178
} catch (IOException e ) {
171
179
fail ("Fault Injection Server error:" + e .getMessage ());
172
180
}
@@ -182,15 +190,17 @@ public void accept(String clusterName) {
182
190
183
191
ConnectionPool pool = provider .getCluster (endpoint .getHostAndPort (0 )).getConnectionPool ();
184
192
185
- log .info ("First connection pool state: active: {}, idle: {}" , pool .getNumActive (),
186
- pool . getNumIdle () );
187
- log .info ("Full failover time : {} s" ,
188
- Duration . between ( reporter . failoverAt , lastFailedCommandAt .get ()). getSeconds ());
193
+ log .info ("First connection pool state: active: {}, idle: {}" , pool .getNumActive (), pool . getNumIdle ());
194
+ log . info ( "Failover happened at: {}" , reporter . failoverAt );
195
+ log .info ("Failback happened at : {}" , reporter . failbackAt );
196
+ log . info ( "Last failed command at: {}" , lastFailedCommandAt .get ());
189
197
190
198
assertEquals (0 , pool .getNumActive ());
191
199
assertTrue (fakeApp .capturedExceptions ().isEmpty ());
200
+ assertTrue (reporter .failoverHappened );
201
+ assertTrue (reporter .failbackHappened );
192
202
193
203
client .close ();
194
204
}
195
205
196
- }
206
+ }
0 commit comments