1414import org .elasticsearch .action .ActionRunnable ;
1515import org .elasticsearch .cluster .ClusterName ;
1616import org .elasticsearch .cluster .ClusterState ;
17+ import org .elasticsearch .cluster .coordination .stateless .SingleNodeReconfigurator ;
1718import org .elasticsearch .cluster .metadata .Metadata ;
1819import org .elasticsearch .cluster .node .DiscoveryNode ;
1920import org .elasticsearch .cluster .node .DiscoveryNodes ;
3637import java .util .function .Function ;
3738import java .util .function .LongSupplier ;
3839
40+ import static org .elasticsearch .cluster .coordination .AtomicRegisterCoordinatorTests .StoreHeartbeatService .HEARTBEAT_FREQUENCY ;
41+ import static org .elasticsearch .cluster .coordination .AtomicRegisterCoordinatorTests .StoreHeartbeatService .MAX_MISSED_HEARTBEATS ;
3942import static org .elasticsearch .cluster .coordination .CoordinationStateTests .clusterState ;
4043
4144@ TestLogging (reason = "these tests do a lot of log-worthy things but we usually don't care" , value = "org.elasticsearch:FATAL" )
@@ -238,36 +241,48 @@ protected CoordinatorStrategy getCoordinatorStrategy() {
238241 return new AtomicRegisterCoordinatorStrategy (atomicRegister , sharedStore );
239242 }
240243
241- record HeartBeat (DiscoveryNode leader , long term , long absoluteTimeInMillis ) {
244+ public record Heartbeat (DiscoveryNode leader , long term , long absoluteTimeInMillis ) {
242245 long timeSinceLastHeartbeatInMillis (long nowInMillis ) {
243246 return nowInMillis - absoluteTimeInMillis ;
244247 }
245248 }
246249
247- static class StoreHeartbeatService implements LeaderHeartbeatService {
250+ public static class StoreHeartbeatService implements LeaderHeartbeatService {
251+ public static final Setting <TimeValue > HEARTBEAT_FREQUENCY = Setting .timeSetting (
252+ "heartbeat_frequency" ,
253+ TimeValue .timeValueSeconds (15 ),
254+ Setting .Property .NodeScope
255+ );
256+
257+ public static final Setting <Integer > MAX_MISSED_HEARTBEATS = Setting .intSetting (
258+ "max_missed_heartbeats" ,
259+ 2 ,
260+ 1 ,
261+ Setting .Property .NodeScope
262+ );
248263
249264 private static final Logger logger = LogManager .getLogger (StoreHeartbeatService .class );
250265
251- private final SharedStore sharedStore ;
266+ private final HeartbeatStore heartbeatStore ;
252267 private final ThreadPool threadPool ;
253268 private final TimeValue heartbeatFrequency ;
254269 private final TimeValue maxTimeSinceLastHeartbeat ;
255- private final AtomicRegister register ;
270+ private final LongSupplier currentTermSupplier ;
256271
257272 private volatile HeartbeatTask heartbeatTask ;
258273
259- StoreHeartbeatService (
260- SharedStore sharedStore ,
274+ public StoreHeartbeatService (
275+ HeartbeatStore heartbeatStore ,
261276 ThreadPool threadPool ,
262277 TimeValue heartbeatFrequency ,
263278 TimeValue maxTimeSinceLastHeartbeat ,
264- AtomicRegister register
279+ LongSupplier currentTermSupplier
265280 ) {
266- this .sharedStore = sharedStore ;
281+ this .heartbeatStore = heartbeatStore ;
267282 this .threadPool = threadPool ;
268283 this .heartbeatFrequency = heartbeatFrequency ;
269284 this .maxTimeSinceLastHeartbeat = maxTimeSinceLastHeartbeat ;
270- this .register = register ;
285+ this .currentTermSupplier = currentTermSupplier ;
271286 }
272287
273288 @ Override
@@ -283,9 +298,9 @@ public void stop() {
283298 }
284299
285300 void runIfNoRecentLeader (Runnable runnable ) {
286- sharedStore .readLatestHeartbeat (new ActionListener <>() {
301+ heartbeatStore .readLatestHeartbeat (new ActionListener <>() {
287302 @ Override
288- public void onResponse (HeartBeat heartBeat ) {
303+ public void onResponse (Heartbeat heartBeat ) {
289304 if (heartBeat == null
290305 || maxTimeSinceLastHeartbeat .millis () <= heartBeat .timeSinceLastHeartbeatInMillis (
291306 threadPool .absoluteTimeInMillis ()
@@ -328,10 +343,10 @@ protected void doRun() throws Exception {
328343 return ;
329344 }
330345
331- final var registerTerm = register . readCurrentTerm ();
346+ final var registerTerm = currentTermSupplier . getAsLong ();
332347 if (registerTerm == heartbeatTerm ) {
333- sharedStore . writeHeartBeat (
334- new HeartBeat (currentLeader , heartbeatTerm , threadPool .absoluteTimeInMillis ()),
348+ heartbeatStore . writeHeartbeat (
349+ new Heartbeat (currentLeader , heartbeatTerm , threadPool .absoluteTimeInMillis ()),
335350 rerunListener
336351 );
337352 } else {
@@ -343,13 +358,6 @@ protected void doRun() throws Exception {
343358 }
344359
345360 class AtomicRegisterCoordinatorStrategy implements CoordinatorStrategy {
346- static final Setting <TimeValue > HEARTBEAT_FREQUENCY = Setting .timeSetting (
347- "heartbeat_frequency" ,
348- TimeValue .timeValueSeconds (15 ),
349- Setting .Property .NodeScope
350- );
351- static final Setting <Integer > MAX_MISSED_HEARTBEATS = Setting .intSetting ("max_missed_heartbeats" , 2 , 1 , Setting .Property .NodeScope );
352-
353361 private final AtomicRegister atomicRegister ;
354362 private final SharedStore sharedStore ;
355363
@@ -366,19 +374,19 @@ public CoordinationServices getCoordinationServices(
366374 CoordinationState .PersistedState persistedState
367375 ) {
368376 final TimeValue heartbeatFrequency = HEARTBEAT_FREQUENCY .get (settings );
369- var atomicHeartBeat = new StoreHeartbeatService (
377+ var atomicHeartbeat = new StoreHeartbeatService (
370378 sharedStore ,
371379 threadPool ,
372380 heartbeatFrequency ,
373381 TimeValue .timeValueMillis (heartbeatFrequency .millis () * MAX_MISSED_HEARTBEATS .get (settings )),
374- atomicRegister
382+ atomicRegister :: readCurrentTerm
375383 );
376384 var reconfigurator = new SingleNodeReconfigurator (settings , clusterSettings );
377- var quorumStrategy = new AtomicRegisterElectionStrategy (atomicRegister );
385+ var electionStrategy = new AtomicRegisterElectionStrategy (atomicRegister );
378386 return new CoordinationServices () {
379387 @ Override
380- public ElectionStrategy getQuorumStrategy () {
381- return quorumStrategy ;
388+ public ElectionStrategy getElectionStrategy () {
389+ return electionStrategy ;
382390 }
383391
384392 @ Override
@@ -388,7 +396,7 @@ public Reconfigurator getReconfigurator() {
388396
389397 @ Override
390398 public LeaderHeartbeatService getLeaderHeartbeatService () {
391- return atomicHeartBeat ;
399+ return atomicHeartbeat ;
392400 }
393401
394402 @ Override
@@ -398,7 +406,7 @@ public PreVoteCollector.Factory getPreVoteCollectorFactory() {
398406 startElection ,
399407 updateMaxTermSeen ,
400408 electionStrategy ,
401- nodeHealthService ) -> new AtomicRegisterPreVoteCollector (atomicHeartBeat , startElection );
409+ nodeHealthService ) -> new AtomicRegisterPreVoteCollector (atomicHeartbeat , startElection );
402410 }
403411 };
404412 }
@@ -422,39 +430,6 @@ public CoordinationState.PersistedState createPersistedStateFromExistingState(
422430 }
423431 }
424432
425- static class SingleNodeReconfigurator extends Reconfigurator {
426- SingleNodeReconfigurator (Settings settings , ClusterSettings clusterSettings ) {
427- super (settings , clusterSettings );
428- }
429-
430- @ Override
431- public CoordinationMetadata .VotingConfiguration reconfigure (
432- Set <DiscoveryNode > liveNodes ,
433- Set <String > retiredNodeIds ,
434- DiscoveryNode currentMaster ,
435- CoordinationMetadata .VotingConfiguration currentConfig
436- ) {
437- return currentConfig ;
438- }
439-
440- @ Override
441- public ClusterState maybeReconfigureAfterNewMasterIsElected (ClusterState clusterState ) {
442- return ClusterState .builder (clusterState )
443- .metadata (
444- Metadata .builder (clusterState .metadata ())
445- .coordinationMetadata (
446- CoordinationMetadata .builder (clusterState .coordinationMetadata ())
447- .lastAcceptedConfiguration (
448- new CoordinationMetadata .VotingConfiguration (Set .of (clusterState .nodes ().getMasterNodeId ()))
449- )
450- .build ()
451- )
452- .build ()
453- )
454- .build ();
455- }
456- }
457-
458433 static class AtomicRegisterElectionStrategy extends ElectionStrategy {
459434 private final AtomicRegister register ;
460435
@@ -551,9 +526,9 @@ public void beforeCommit(long term, long version, ActionListener<Void> listener)
551526
552527 record PersistentClusterState (long term , long version , Metadata state ) {}
553528
554- private static class SharedStore {
529+ private static class SharedStore implements HeartbeatStore {
555530 private final Map <Long , PersistentClusterState > clusterStateByTerm = new HashMap <>();
556- private HeartBeat heartBeat ;
531+ private Heartbeat heartbeat ;
557532
558533 private void writeClusterState (ClusterState clusterState ) {
559534 clusterStateByTerm .put (
@@ -574,16 +549,24 @@ void getClusterStateForTerm(long termGoal, ActionListener<PersistentClusterState
574549 });
575550 }
576551
577- void writeHeartBeat (HeartBeat newHeartBeat , ActionListener <Void > listener ) {
578- this .heartBeat = newHeartBeat ;
552+ @ Override
553+ public void writeHeartbeat (Heartbeat newHeartbeat , ActionListener <Void > listener ) {
554+ this .heartbeat = newHeartbeat ;
579555 listener .onResponse (null );
580556 }
581557
582- void readLatestHeartbeat (ActionListener <HeartBeat > listener ) {
583- listener .onResponse (heartBeat );
558+ @ Override
559+ public void readLatestHeartbeat (ActionListener <Heartbeat > listener ) {
560+ listener .onResponse (heartbeat );
584561 }
585562 }
586563
564+ public interface HeartbeatStore {
565+ void writeHeartbeat (Heartbeat newHeartbeat , ActionListener <Void > listener );
566+
567+ void readLatestHeartbeat (ActionListener <Heartbeat > listener );
568+ }
569+
587570 private static class AtomicRegister {
588571 private long currentTerm ;
589572
@@ -600,11 +583,11 @@ long compareAndExchange(long expected, long updated) {
600583 }
601584 }
602585
603- static class AtomicRegisterPreVoteCollector extends PreVoteCollector {
586+ public static class AtomicRegisterPreVoteCollector extends PreVoteCollector {
604587 private final StoreHeartbeatService heartbeatService ;
605588 private final Runnable startElection ;
606589
607- AtomicRegisterPreVoteCollector (StoreHeartbeatService heartbeatService , Runnable startElection ) {
590+ public AtomicRegisterPreVoteCollector (StoreHeartbeatService heartbeatService , Runnable startElection ) {
608591 this .heartbeatService = heartbeatService ;
609592 this .startElection = startElection ;
610593 }
0 commit comments