1414import org .elasticsearch .cluster .metadata .Metadata ;
1515import org .elasticsearch .cluster .node .DiscoveryNode ;
1616import org .elasticsearch .cluster .node .DiscoveryNodes ;
17+ import org .elasticsearch .common .Strings ;
1718import org .elasticsearch .common .io .stream .NamedWriteableRegistry ;
1819import org .elasticsearch .common .settings .ClusterSettings ;
1920import org .elasticsearch .common .settings .Setting ;
2627
2728import java .util .HashMap ;
2829import java .util .Map ;
29- import java .util .Objects ;
3030import java .util .Optional ;
3131import java .util .Set ;
3232import java .util .function .BooleanSupplier ;
@@ -255,7 +255,7 @@ public void start(DiscoveryNode currentLeader, long term) {
255255 }
256256
257257 private void sendHeartBeatToStore () {
258- sharedStore .writeHearBeat (currentTerm , new HeartBeat (currentLeader , threadPool .absoluteTimeInMillis ()));
258+ sharedStore .writeHeartBeat (currentTerm , new HeartBeat (currentLeader , threadPool .absoluteTimeInMillis ()));
259259 }
260260
261261 @ Override
@@ -458,12 +458,8 @@ public boolean isPublishQuorum(
458458 @ Override
459459 public void onNewElection (DiscoveryNode localNode , long proposedTerm , ActionListener <Void > listener ) {
460460 ActionListener .completeWith (listener , () -> {
461- final var proposedNewTermOwner = new TermOwner (localNode , proposedTerm );
462- final var witness = register .claimTerm (proposedNewTermOwner );
463- maxTermSeen = Math .max (maxTermSeen , witness .term ());
464- if (proposedNewTermOwner != witness ) {
465- throw new CoordinationStateRejectedException ("Term " + proposedTerm + " already claimed by another node" );
466- }
461+ maxTermSeen = Math .max (maxTermSeen , proposedTerm );
462+ register .claimTerm (proposedTerm );
467463 lastWonTerm = proposedTerm ;
468464 return null ;
469465 });
@@ -482,21 +478,27 @@ public boolean isInvalidReconfiguration(
482478 @ Override
483479 public void beforeCommit (long term , long version , ActionListener <Void > listener ) {
484480 // TODO: add a test to ensure that this gets called
485- final var currentTermOwner = register .getTermOwner ();
486- if (currentTermOwner .term () > term ) {
487- listener .onFailure (new CoordinationStateRejectedException ("Term " + term + " already claimed by another node" ));
481+ final var currentTerm = register .readCurrentTerm ();
482+ if (currentTerm > term ) {
483+ listener .onFailure (
484+ new CoordinationStateRejectedException (
485+ Strings .format (
486+ "could not commit cluster state version %d in term %d, current term is now %d" ,
487+ version ,
488+ term ,
489+ currentTerm
490+ )
491+ )
492+ );
488493 } else {
494+ assert currentTerm == term : currentTerm + " vs " + term ;
489495 listener .onResponse (null );
490496 }
491497 }
492498 }
493499
494500 record PersistentClusterState (long term , long version , Metadata state ) {}
495501
496- record TermOwner (DiscoveryNode node , long term ) {
497- static TermOwner EMPTY = new TermOwner (null , 0 );
498- }
499-
500502 static class SharedStore {
501503 private final Map <Long , PersistentClusterState > clusterStateByTerm = new HashMap <>();
502504 private final Map <Long , HeartBeat > heartBeatsByTerm = new HashMap <>();
@@ -514,9 +516,7 @@ private void writeClusterState(ClusterState clusterState) {
514516 }
515517
516518 private PersistentClusterState getLatestClusterState () {
517- final var termOwner = register .getTermOwner ();
518-
519- return getClusterStateForTerm (termOwner .term ());
519+ return getClusterStateForTerm (register .readCurrentTerm ());
520520 }
521521
522522 private PersistentClusterState getClusterStateForTerm (long termGoal ) {
@@ -529,7 +529,7 @@ private PersistentClusterState getClusterStateForTerm(long termGoal) {
529529 return null ;
530530 }
531531
532- private void writeHearBeat (long term , HeartBeat heartBeat ) {
532+ private void writeHeartBeat (long term , HeartBeat heartBeat ) {
533533 HeartBeat previousHeartbeat = heartBeatsByTerm .put (term , heartBeat );
534534 assert previousHeartbeat == null || heartBeat .leader ().equals (previousHeartbeat .leader ());
535535 }
@@ -540,20 +540,17 @@ private HeartBeat getHearbeatForTerm(long term) {
540540 }
541541
542542 static class AtomicRegister {
543- private TermOwner currentTermOwner ;
543+ private long currentTerm ;
544544
545- private TermOwner getTermOwner () {
546- return Objects . requireNonNullElse ( currentTermOwner , TermOwner . EMPTY ) ;
545+ private long readCurrentTerm () {
546+ return currentTerm ;
547547 }
548548
549- TermOwner claimTerm (TermOwner proposedNewTermOwner ) {
550- final var currentTermOwner = getTermOwner ();
551-
552- if (currentTermOwner .term () >= proposedNewTermOwner .term ()) {
553- return currentTermOwner ;
549+ void claimTerm (long proposedTerm ) {
550+ if (currentTerm >= proposedTerm ) {
551+ throw new CoordinationStateRejectedException ("could not claim " + proposedTerm + ", current term is " + currentTerm );
554552 }
555-
556- return this .currentTermOwner = proposedNewTermOwner ;
553+ currentTerm = proposedTerm ;
557554 }
558555 }
559556
@@ -604,8 +601,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
604601 }
605602
606603 void writeClusterState (ClusterState state ) {
607- final var termOwner = atomicRegister .getTermOwner ();
608- if (termOwner .term () > state .term ()) {
604+ if (atomicRegister .readCurrentTerm () > state .term ()) {
609605 throw new RuntimeException ("Conflicting cluster state update" );
610606 }
611607 sharedStore .writeClusterState (state );
0 commit comments