1
1
package org .terracotta .voter ;
2
2
3
+ import org .junit .Rule ;
3
4
import org .junit .Test ;
5
+ import org .junit .contrib .java .lang .system .SystemOutRule ;
6
+ import org .mockito .stubbing .Answer ;
4
7
8
+ import java .time .Duration ;
9
+ import java .util .Collection ;
10
+ import java .util .HashMap ;
5
11
import java .util .HashSet ;
6
- import java .util .Optional ;
12
+ import java .util .Map ;
7
13
import java .util .Set ;
14
+ import java .util .UUID ;
8
15
import java .util .concurrent .CompletableFuture ;
9
16
import java .util .concurrent .TimeoutException ;
10
17
import java .util .function .Function ;
11
18
19
+ import static java .util .Optional .empty ;
20
+ import static java .util .function .Function .identity ;
21
+ import static java .util .stream .Collectors .toMap ;
22
+ import static org .hamcrest .CoreMatchers .containsString ;
12
23
import static org .hamcrest .CoreMatchers .is ;
13
24
import static org .junit .Assert .assertThat ;
25
+ import static org .junit .Assert .fail ;
26
+ import static org .mockito .Mockito .atLeastOnce ;
27
+ import static org .mockito .Mockito .clearInvocations ;
14
28
import static org .mockito .Mockito .mock ;
29
+ import static org .mockito .Mockito .verify ;
15
30
import static org .mockito .Mockito .when ;
31
+ import static org .terracotta .utilities .test .WaitForAssert .assertThatEventually ;
16
32
import static org .terracotta .voter .ActiveVoter .TOPOLOGY_FETCH_TIME_PROPERTY ;
17
33
18
34
public class ActiveVoterTest {
35
+ private static final String VOTER_ID = UUID .randomUUID ().toString ();
19
36
private static final long TOPOLOGY_FETCH_INTERVAL = 11000L ;
20
37
38
+ @ Rule
39
+ public final SystemOutRule systemOutRule = new SystemOutRule ().enableLog ();
40
+
21
41
static {
22
42
System .setProperty (TOPOLOGY_FETCH_TIME_PROPERTY , "9000" );
23
43
}
@@ -34,11 +54,11 @@ public void testTopologyUpdate() throws TimeoutException, InterruptedException {
34
54
when (otherClientVoterManager .isConnected ()).thenReturn (true );
35
55
when (firstClientVoterManager .getServerState ()).thenReturn ("ACTIVE-COORDINATOR" );
36
56
when (otherClientVoterManager .getServerState ()).thenReturn ("PASSIVE-STANDBY" );
37
- when (firstClientVoterManager .registerVoter ("mvoter" )).thenReturn (0L );
38
- when (otherClientVoterManager .registerVoter ("mvoter" )).thenReturn (0L );
57
+ when (firstClientVoterManager .registerVoter (VOTER_ID )).thenReturn (0L );
58
+ when (otherClientVoterManager .registerVoter (VOTER_ID )).thenReturn (0L );
39
59
when (firstClientVoterManager .getTopology ()).thenReturn (expectedTopology );
40
- when (firstClientVoterManager .heartbeat ("mvoter" )).thenReturn (0L );
41
- when (otherClientVoterManager .heartbeat ("mvoter" )).thenReturn (0L );
60
+ when (firstClientVoterManager .heartbeat (VOTER_ID )).thenReturn (0L );
61
+ when (otherClientVoterManager .heartbeat (VOTER_ID )).thenReturn (0L );
42
62
when (firstClientVoterManager .getTargetHostPort ()).thenReturn ("localhost:1234" );
43
63
when (otherClientVoterManager .getTargetHostPort ()).thenReturn ("localhost:1235" );
44
64
@@ -54,8 +74,8 @@ public void testTopologyUpdate() throws TimeoutException, InterruptedException {
54
74
return mockClientVoterManager ;
55
75
}
56
76
};
57
- ActiveVoter activeVoter = new ActiveVoter ("mvoter" ,
58
- new CompletableFuture <VoterStatus >(), Optional . empty (), factory , "localhost:1234" , "localhost:1235" );
77
+ ActiveVoter activeVoter = new ActiveVoter (VOTER_ID ,
78
+ new CompletableFuture <VoterStatus >(), empty (), factory , "localhost:1234" , "localhost:1235" );
59
79
activeVoter .start ();
60
80
61
81
Thread .sleep (TOPOLOGY_FETCH_INTERVAL );
@@ -85,9 +105,9 @@ public void testOverLappingHostPortsWhileAddingServers() throws TimeoutException
85
105
ClientVoterManager firstClientVoterManager = mock (ClientVoterManager .class );
86
106
when (firstClientVoterManager .isConnected ()).thenReturn (true );
87
107
when (firstClientVoterManager .getServerState ()).thenReturn ("ACTIVE-COORDINATOR" );
88
- when (firstClientVoterManager .registerVoter ("mvoter" )).thenReturn (0L );
108
+ when (firstClientVoterManager .registerVoter (VOTER_ID )).thenReturn (0L );
89
109
when (firstClientVoterManager .getTopology ()).thenReturn (expectedTopology );
90
- when (firstClientVoterManager .heartbeat ("mvoter" )).thenReturn (0L );
110
+ when (firstClientVoterManager .heartbeat (VOTER_ID )).thenReturn (0L );
91
111
when (firstClientVoterManager .getTargetHostPort ()).thenReturn ("localhost:12345" );
92
112
93
113
Function <String , ClientVoterManager > factory = hostPort -> {
@@ -100,8 +120,8 @@ public void testOverLappingHostPortsWhileAddingServers() throws TimeoutException
100
120
return mockClientVoterManager ;
101
121
}
102
122
};
103
- ActiveVoter activeVoter = new ActiveVoter ("mvoter" ,
104
- new CompletableFuture <VoterStatus >(), Optional . empty (), factory , "localhost:12345" );
123
+ ActiveVoter activeVoter = new ActiveVoter (VOTER_ID ,
124
+ new CompletableFuture <VoterStatus >(), empty (), factory , "localhost:12345" );
105
125
activeVoter .start ();
106
126
107
127
Thread .sleep (TOPOLOGY_FETCH_INTERVAL );
@@ -129,11 +149,11 @@ public void testOverLappingHostPortsWhileRemovingServers() throws TimeoutExcepti
129
149
when (otherClientVoterManager .isConnected ()).thenReturn (true );
130
150
when (firstClientVoterManager .getServerState ()).thenReturn ("ACTIVE-COORDINATOR" );
131
151
when (otherClientVoterManager .getServerState ()).thenReturn ("PASSIVE-STANDBY" );
132
- when (firstClientVoterManager .registerVoter ("mvoter" )).thenReturn (0L );
133
- when (otherClientVoterManager .registerVoter ("mvoter" )).thenReturn (0L );
152
+ when (firstClientVoterManager .registerVoter (VOTER_ID )).thenReturn (0L );
153
+ when (otherClientVoterManager .registerVoter (VOTER_ID )).thenReturn (0L );
134
154
when (firstClientVoterManager .getTopology ()).thenReturn (expectedTopology );
135
- when (firstClientVoterManager .heartbeat ("mvoter" )).thenReturn (0L );
136
- when (otherClientVoterManager .heartbeat ("mvoter" )).thenReturn (0L );
155
+ when (firstClientVoterManager .heartbeat (VOTER_ID )).thenReturn (0L );
156
+ when (otherClientVoterManager .heartbeat (VOTER_ID )).thenReturn (0L );
137
157
when (firstClientVoterManager .getTargetHostPort ()).thenReturn ("localhost:12345" );
138
158
when (otherClientVoterManager .getTargetHostPort ()).thenReturn ("localhost:1234" );
139
159
@@ -149,8 +169,8 @@ public void testOverLappingHostPortsWhileRemovingServers() throws TimeoutExcepti
149
169
return mockClientVoterManager ;
150
170
}
151
171
};
152
- ActiveVoter activeVoter = new ActiveVoter ("mvoter" ,
153
- new CompletableFuture <VoterStatus >(), Optional . empty (), factory , "localhost:12345" , "localhost:1234" );
172
+ ActiveVoter activeVoter = new ActiveVoter (VOTER_ID ,
173
+ new CompletableFuture <VoterStatus >(), empty (), factory , "localhost:12345" , "localhost:1234" );
154
174
activeVoter .start ();
155
175
156
176
Thread .sleep (TOPOLOGY_FETCH_INTERVAL );
@@ -179,11 +199,11 @@ public void testWhenStaticPassivePortsRemoved() throws TimeoutException, Interru
179
199
when (otherClientVoterManager .isConnected ()).thenReturn (true );
180
200
when (firstClientVoterManager .getServerState ()).thenReturn ("ACTIVE-COORDINATOR" );
181
201
when (otherClientVoterManager .getServerState ()).thenReturn ("PASSIVE-STANDBY" );
182
- when (firstClientVoterManager .registerVoter ("mvoter" )).thenReturn (0L );
183
- when (otherClientVoterManager .registerVoter ("mvoter" )).thenReturn (0L );
202
+ when (firstClientVoterManager .registerVoter (VOTER_ID )).thenReturn (0L );
203
+ when (otherClientVoterManager .registerVoter (VOTER_ID )).thenReturn (0L );
184
204
when (firstClientVoterManager .getTopology ()).thenReturn (expectedTopology );
185
- when (firstClientVoterManager .heartbeat ("mvoter" )).thenReturn (0L );
186
- when (otherClientVoterManager .heartbeat ("mvoter" )).thenReturn (0L );
205
+ when (firstClientVoterManager .heartbeat (VOTER_ID )).thenReturn (0L );
206
+ when (otherClientVoterManager .heartbeat (VOTER_ID )).thenReturn (0L );
187
207
when (firstClientVoterManager .getTargetHostPort ()).thenReturn ("localhost:1234" );
188
208
when (otherClientVoterManager .getTargetHostPort ()).thenReturn ("localhost:1235" );
189
209
@@ -199,12 +219,81 @@ public void testWhenStaticPassivePortsRemoved() throws TimeoutException, Interru
199
219
return mockClientVoterManager ;
200
220
}
201
221
};
202
- ActiveVoter activeVoter = new ActiveVoter ("mvoter" ,
203
- new CompletableFuture <VoterStatus >(), Optional . empty (), factory , "localhost:1234" , "localhost:1235" );
222
+ ActiveVoter activeVoter = new ActiveVoter (VOTER_ID ,
223
+ new CompletableFuture <VoterStatus >(), empty (), factory , "localhost:1234" , "localhost:1235" );
204
224
activeVoter .start ();
205
225
206
226
Thread .sleep (TOPOLOGY_FETCH_INTERVAL );
207
227
assertThat (activeVoter .getExistingTopology (), is (expectedTopology ));
208
228
assertThat (activeVoter .getHeartbeatFutures ().size (), is (3 ));
209
229
}
230
+
231
+ @ Test
232
+ public void testReregistrationWhenAllStaticHostPortsNotAvailable () throws TimeoutException {
233
+ Map <String , String > servers = new HashMap <String , String >() {{
234
+ put ("ACTIVE-COORDINATOR" , "localhost:1234" );
235
+ put ("PASSIVE-STANDBY" , "localhost:1235" );
236
+ }};
237
+
238
+ Map <String , ClientVoterManager > managers =
239
+ servers .entrySet ()
240
+ .stream ()
241
+ .map ((e ) -> manager (e .getKey (), e .getValue (), new HashSet <>(servers .values ())))
242
+ .collect (toMap (ClientVoterManager ::getTargetHostPort , identity ()));
243
+
244
+ new ActiveVoter (VOTER_ID , new CompletableFuture <>(), empty (), managers ::get , servers .get ("ACTIVE-COORDINATOR" )).start ();
245
+
246
+ waitForLogMessage ("New Topology detected" );
247
+
248
+ disconnectManagers (managers .values ());
249
+
250
+ waitForLogMessage ("Attempting to re-register" );
251
+
252
+ systemOutRule .clearLog ();
253
+
254
+ ClientVoterManager passiveManager = managers .get (servers .get ("PASSIVE-STANDBY" ));
255
+ promote (passiveManager );
256
+
257
+ waitForLogMessage ("Vote owner state: ACTIVE-COORDINATOR" );
258
+
259
+ verify (passiveManager , atLeastOnce ()).registerVoter (VOTER_ID );
260
+ }
261
+
262
+ private void promote (ClientVoterManager passiveManager ) throws TimeoutException {
263
+ clearInvocations (passiveManager );
264
+ when (passiveManager .getServerState ()).thenReturn ("ACTIVE-COORDINATOR" );
265
+ when (passiveManager .registerVoter (VOTER_ID )).thenReturn (0L );
266
+ when (passiveManager .heartbeat (VOTER_ID )).thenReturn (0L );
267
+ when (passiveManager .isConnected ()).thenReturn (true );
268
+ }
269
+
270
+ private void waitForLogMessage (String message ) throws TimeoutException {
271
+ assertThatEventually (systemOutRule ::getLog , containsString (message )).within (Duration .ofMillis (10000 ));
272
+ }
273
+
274
+ private void disconnectManagers (Collection <ClientVoterManager > managers ) throws TimeoutException {
275
+ for (ClientVoterManager manager : managers ) {
276
+ when (manager .heartbeat (VOTER_ID )).thenReturn (-1L );
277
+ when (manager .isConnected ()).thenAnswer ((Answer <Boolean >)invocationOnMock -> {
278
+ Thread .sleep (500 );
279
+ return false ;
280
+ });
281
+ when (manager .registerVoter (VOTER_ID )).thenReturn (-1L );
282
+ }
283
+ }
284
+
285
+ private ClientVoterManager manager (String state , String serverAddress , Set <String > topology ) {
286
+ try {
287
+ ClientVoterManager manager = mock (ClientVoterManager .class );
288
+ when (manager .isConnected ()).thenReturn (true );
289
+ when (manager .getServerState ()).thenReturn (state );
290
+ when (manager .registerVoter (VOTER_ID )).thenReturn (0L );
291
+ when (manager .getTopology ()).thenReturn (topology );
292
+ when (manager .heartbeat (VOTER_ID )).thenReturn (0L );
293
+ when (manager .getTargetHostPort ()).thenReturn (serverAddress );
294
+ return manager ;
295
+ } catch (TimeoutException e ) {
296
+ throw new RuntimeException (e );
297
+ }
298
+ }
210
299
}
0 commit comments