28
28
import org .elasticsearch .test .junit .annotations .TestLogging ;
29
29
import org .elasticsearch .threadpool .ThreadPool ;
30
30
31
+ import java .io .IOException ;
31
32
import java .util .HashMap ;
32
33
import java .util .Map ;
33
34
import java .util .OptionalLong ;
34
35
import java .util .Set ;
36
+ import java .util .concurrent .atomic .AtomicLong ;
37
+ import java .util .concurrent .atomic .AtomicReference ;
35
38
import java .util .function .BooleanSupplier ;
36
39
import java .util .function .Function ;
37
40
import java .util .function .LongSupplier ;
@@ -48,31 +51,12 @@ public void testLeaderDisconnectionWithDisconnectEventDetectedQuickly() {
48
51
// In this test the leader still has access to the register, therefore it is still considered as a leader.
49
52
}
50
53
51
- @ Override
52
- @ AwaitsFix (bugUrl = "ES-5645" )
53
- public void testLeaderDisconnectionWithoutDisconnectEventDetectedQuickly () {
54
- // In this test the leader still has access to the register, therefore it is still considered as a leader.
55
- }
56
-
57
- @ Override
58
- @ AwaitsFix (bugUrl = "ES-5645" )
59
- public void testMasterStatsOnFailedUpdate () {
60
- // In this test the leader still has access to the register, therefore it is still considered as a leader, and it can perform
61
- // updates.
62
- }
63
-
64
54
@ Override
65
55
@ AwaitsFix (bugUrl = "ES-5645" )
66
56
public void testUnhealthyLeaderIsReplaced () {
67
57
// In this test the leader still has access to the register, therefore it is still considered as a leader.
68
58
}
69
59
70
- @ Override
71
- @ AwaitsFix (bugUrl = "ES-5645" )
72
- public void testUnresponsiveLeaderDetectedEventually () {
73
- // In this test the leader still has access to the register, therefore it is still considered as a leader.
74
- }
75
-
76
60
@ Override
77
61
@ AwaitsFix (bugUrl = "ES-5645" )
78
62
public void testLogsWarningPeriodicallyIfClusterNotFormed () {
@@ -91,47 +75,12 @@ public void testAckListenerReceivesNacksIfPublicationTimesOut() {
91
75
// The leader still has access to the register, therefore it acknowledges the state update
92
76
}
93
77
94
- @ Override
95
- @ AwaitsFix (bugUrl = "ES-5645" )
96
- public void testAppliesNoMasterBlockWritesByDefault () {
97
- // If the disconnected node is the leader it will continue to have connectivity
98
- // into the register and therefore the no master block won't be applied
99
- }
100
-
101
- @ Override
102
- @ AwaitsFix (bugUrl = "ES-5645" )
103
- public void testAppliesNoMasterBlockWritesIfConfigured () {
104
- // If the disconnected node is the leader it will continue to have connectivity
105
- // into the register and therefore the no master block won't be applied
106
- }
107
-
108
- @ Override
109
- @ AwaitsFix (bugUrl = "ES-5645" )
110
- public void testAppliesNoMasterBlockAllIfConfigured () {
111
- // If the disconnected node is the leader it will continue to have connectivity
112
- // into the register and therefore the no master block won't be applied
113
- }
114
-
115
- @ Override
116
- @ AwaitsFix (bugUrl = "ES-5645" )
117
- public void testAppliesNoMasterBlockMetadataWritesIfConfigured () {
118
- // If the disconnected node is the leader it will continue to have connectivity
119
- // into the register and therefore the no master block won't be applied
120
- }
121
-
122
78
@ Override
123
79
@ AwaitsFix (bugUrl = "ES-5645" )
124
80
public void testClusterCannotFormWithFailingJoinValidation () {
125
81
// A single node can form a cluster in this case
126
82
}
127
83
128
- @ Override
129
- @ AwaitsFix (bugUrl = "ES-5645" )
130
- public void testReportsConnectBackProblemsDuringJoining () {
131
- // If the partitioned node is the leader, it still has access
132
- // to the store, therefore the test fail
133
- }
134
-
135
84
@ Override
136
85
@ AwaitsFix (bugUrl = "ES-5645" )
137
86
public void testCannotJoinClusterWithDifferentUUID () {
@@ -165,34 +114,30 @@ public void testJoiningNodeReceivesFullState() {
165
114
166
115
@ Override
167
116
protected CoordinatorStrategy getCoordinatorStrategy () {
168
- var atomicRegister = new AtomicRegister ();
169
- var sharedStore = new SharedStore ();
170
- return new AtomicRegisterCoordinatorStrategy (atomicRegister , sharedStore );
117
+ return new AtomicRegisterCoordinatorStrategy ();
171
118
}
172
119
173
120
class AtomicRegisterCoordinatorStrategy implements CoordinatorStrategy {
174
- private final AtomicRegister atomicRegister ;
175
- private final SharedStore sharedStore ;
176
-
177
- AtomicRegisterCoordinatorStrategy (AtomicRegister atomicRegister , SharedStore sharedStore ) {
178
- this .atomicRegister = atomicRegister ;
179
- this .sharedStore = sharedStore ;
180
- }
121
+ private final AtomicLong currentTermRef = new AtomicLong ();
122
+ private final AtomicReference <Heartbeat > heartBeatRef = new AtomicReference <>();
123
+ private final SharedStore sharedStore = new SharedStore ();
181
124
182
125
@ Override
183
126
public CoordinationServices getCoordinationServices (
184
127
ThreadPool threadPool ,
185
128
Settings settings ,
186
129
ClusterSettings clusterSettings ,
187
- CoordinationState .PersistedState persistedState
130
+ CoordinationState .PersistedState persistedState ,
131
+ BooleanSupplier isDisruptedSupplier
188
132
) {
189
133
final TimeValue heartbeatFrequency = HEARTBEAT_FREQUENCY .get (settings );
190
- var atomicHeartbeat = new StoreHeartbeatService (
191
- sharedStore ,
134
+ final var atomicRegister = new AtomicRegister (currentTermRef , isDisruptedSupplier );
135
+ final var atomicHeartbeat = new StoreHeartbeatService (
136
+ new SharedHeartbeatStore (heartBeatRef , isDisruptedSupplier ),
192
137
threadPool ,
193
138
heartbeatFrequency ,
194
139
TimeValue .timeValueMillis (heartbeatFrequency .millis () * MAX_MISSED_HEARTBEATS .get (settings )),
195
- listener -> listener . onResponse ( OptionalLong .of (atomicRegister .readCurrentTerm ()))
140
+ listener -> ActionListener . completeWith ( listener , () -> OptionalLong .of (atomicRegister .readCurrentTerm ()))
196
141
);
197
142
var reconfigurator = new SingleNodeReconfigurator (settings , clusterSettings );
198
143
var electionStrategy = new AtomicRegisterElectionStrategy (atomicRegister );
@@ -324,30 +269,29 @@ public boolean isInvalidReconfiguration(
324
269
@ Override
325
270
public void beforeCommit (long term , long version , ActionListener <Void > listener ) {
326
271
// TODO: add a test to ensure that this gets called
327
- final var currentTerm = register . readCurrentTerm ();
328
- if ( currentTerm == term ) {
329
- listener . onResponse ( null );
330
- } else {
331
- assert term < currentTerm : term + " vs " + currentTerm ;
332
- listener . onFailure (
333
- new CoordinationStateRejectedException (
272
+ ActionListener . completeWith ( listener , () -> {
273
+ final var currentTerm = register . readCurrentTerm ();
274
+ if ( currentTerm == term ) {
275
+ return null ;
276
+ } else {
277
+ assert term < currentTerm : term + " vs " + currentTerm ;
278
+ throw new CoordinationStateRejectedException (
334
279
Strings .format (
335
280
"could not commit cluster state version %d in term %d, current term is now %d" ,
336
281
version ,
337
282
term ,
338
283
currentTerm
339
284
)
340
- )
341
- );
342
- }
285
+ );
286
+ }
287
+ });
343
288
}
344
289
}
345
290
346
291
record PersistentClusterState (long term , long version , Metadata state ) {}
347
292
348
- private static class SharedStore implements HeartbeatStore {
293
+ private static class SharedStore {
349
294
private final Map <Long , PersistentClusterState > clusterStateByTerm = new HashMap <>();
350
- private Heartbeat heartbeat ;
351
295
352
296
private void writeClusterState (ClusterState clusterState ) {
353
297
clusterStateByTerm .put (
@@ -367,32 +311,57 @@ void getClusterStateForTerm(long termGoal, ActionListener<PersistentClusterState
367
311
return null ;
368
312
});
369
313
}
314
+ }
315
+
316
+ private static class SharedHeartbeatStore implements HeartbeatStore {
317
+
318
+ private final AtomicReference <Heartbeat > hearbeatRef ;
319
+ private final BooleanSupplier isDisruptedSupplier ;
320
+
321
+ SharedHeartbeatStore (AtomicReference <Heartbeat > hearbeatRef , BooleanSupplier isDisruptedSupplier ) {
322
+ this .hearbeatRef = hearbeatRef ;
323
+ this .isDisruptedSupplier = isDisruptedSupplier ;
324
+ }
370
325
371
326
@ Override
372
327
public void writeHeartbeat (Heartbeat newHeartbeat , ActionListener <Void > listener ) {
373
- this .heartbeat = newHeartbeat ;
328
+ if (isDisruptedSupplier .getAsBoolean ()) {
329
+ listener .onFailure (new IOException ("simulating disrupted access to shared store" ));
330
+ }
331
+ hearbeatRef .set (newHeartbeat );
374
332
listener .onResponse (null );
375
333
}
376
334
377
335
@ Override
378
336
public void readLatestHeartbeat (ActionListener <Heartbeat > listener ) {
379
- listener .onResponse (heartbeat );
337
+ if (isDisruptedSupplier .getAsBoolean ()) {
338
+ listener .onFailure (new IOException ("simulating disrupted access to shared store" ));
339
+ }
340
+ listener .onResponse (hearbeatRef .get ());
380
341
}
381
342
}
382
343
383
344
private static class AtomicRegister {
384
- private long currentTerm ;
345
+ private final AtomicLong currentTermRef ;
346
+ private final BooleanSupplier isDisruptedSupplier ;
385
347
386
- long readCurrentTerm () {
387
- return currentTerm ;
348
+ AtomicRegister (AtomicLong currentTermRef , BooleanSupplier isDisruptedSupplier ) {
349
+ this .currentTermRef = currentTermRef ;
350
+ this .isDisruptedSupplier = isDisruptedSupplier ;
351
+ }
352
+
353
+ long readCurrentTerm () throws IOException {
354
+ if (isDisruptedSupplier .getAsBoolean ()) {
355
+ throw new IOException ("simulating disrupted access to shared store" );
356
+ }
357
+ return currentTermRef .get ();
388
358
}
389
359
390
- long compareAndExchange (long expected , long updated ) {
391
- final var witness = currentTerm ;
392
- if (currentTerm == expected ) {
393
- currentTerm = updated ;
360
+ long compareAndExchange (long expected , long updated ) throws IOException {
361
+ if (isDisruptedSupplier .getAsBoolean ()) {
362
+ throw new IOException ("simulating disrupted access to shared store" );
394
363
}
395
- return witness ;
364
+ return currentTermRef . compareAndExchange ( expected , updated ) ;
396
365
}
397
366
}
398
367
0 commit comments