33
33
import org .elasticsearch .cluster .node .DiscoveryNode ;
34
34
import org .elasticsearch .cluster .node .DiscoveryNodeUtils ;
35
35
import org .elasticsearch .cluster .node .DiscoveryNodes ;
36
+ import org .elasticsearch .cluster .routing .RoutingTable ;
36
37
import org .elasticsearch .common .Priority ;
37
38
import org .elasticsearch .common .Randomness ;
38
39
import org .elasticsearch .common .component .Lifecycle ;
77
78
import java .util .concurrent .atomic .AtomicBoolean ;
78
79
import java .util .concurrent .atomic .AtomicInteger ;
79
80
import java .util .concurrent .atomic .AtomicReference ;
81
+ import java .util .function .UnaryOperator ;
80
82
import java .util .stream .Collectors ;
81
83
82
84
import static java .util .Collections .emptySet ;
92
94
import static org .hamcrest .Matchers .hasSize ;
93
95
import static org .hamcrest .Matchers .instanceOf ;
94
96
import static org .hamcrest .Matchers .lessThanOrEqualTo ;
97
+ import static org .hamcrest .Matchers .startsWith ;
95
98
96
99
public class MasterServiceTests extends ESTestCase {
97
100
@@ -497,7 +500,7 @@ public void onFailure(Exception e) {}
497
500
@ Override
498
501
public ClusterState execute (ClusterState currentState ) {
499
502
relativeTimeInMillis += TimeValue .timeValueSeconds (3 ).millis ();
500
- return ClusterState .builder (currentState ).incrementVersion (). build ();
503
+ return ClusterState .builder (currentState ).build ();
501
504
}
502
505
503
506
@ Override
@@ -1250,7 +1253,7 @@ public void onFailure(Exception e) {
1250
1253
public ClusterState execute (ClusterState currentState ) {
1251
1254
relativeTimeInMillis += MasterService .MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING .get (Settings .EMPTY ).millis ()
1252
1255
+ randomLongBetween (1 , 1000000 );
1253
- return ClusterState .builder (currentState ).incrementVersion (). build ();
1256
+ return ClusterState .builder (currentState ).build ();
1254
1257
}
1255
1258
1256
1259
@ Override
@@ -1284,7 +1287,7 @@ public void onFailure(Exception e) {
1284
1287
masterService .submitUnbatchedStateUpdateTask ("test5" , new ClusterStateUpdateTask () {
1285
1288
@ Override
1286
1289
public ClusterState execute (ClusterState currentState ) {
1287
- return ClusterState .builder (currentState ).incrementVersion (). build ();
1290
+ return ClusterState .builder (currentState ).build ();
1288
1291
}
1289
1292
1290
1293
@ Override
@@ -1300,7 +1303,7 @@ public void onFailure(Exception e) {
1300
1303
masterService .submitUnbatchedStateUpdateTask ("test6" , new ClusterStateUpdateTask () {
1301
1304
@ Override
1302
1305
public ClusterState execute (ClusterState currentState ) {
1303
- return ClusterState .builder (currentState ).incrementVersion (). build ();
1306
+ return ClusterState .builder (currentState ).build ();
1304
1307
}
1305
1308
1306
1309
@ Override
@@ -2545,6 +2548,69 @@ public void onFailure(Exception e) {
2545
2548
}
2546
2549
}
2547
2550
2551
+ public void testVersionNumberProtection () {
2552
+ runVersionNumberProtectionTest (
2553
+ currentState -> ClusterState .builder (currentState )
2554
+ .version (randomFrom (currentState .version () - 1 , currentState .version () + 1 ))
2555
+ .build ()
2556
+ );
2557
+
2558
+ runVersionNumberProtectionTest (
2559
+ currentState -> currentState .copyAndUpdateMetadata (
2560
+ b -> b .version (randomFrom (currentState .metadata ().version () - 1 , currentState .metadata ().version () + 1 ))
2561
+ )
2562
+ );
2563
+
2564
+ runVersionNumberProtectionTest (
2565
+ currentState -> ClusterState .builder (currentState )
2566
+ .routingTable (
2567
+ RoutingTable .builder (currentState .routingTable ())
2568
+ .version (randomFrom (currentState .routingTable ().version () - 1 , currentState .routingTable ().version () + 1 ))
2569
+ .build ()
2570
+ )
2571
+ .build ()
2572
+ );
2573
+ }
2574
+
2575
+ private void runVersionNumberProtectionTest (UnaryOperator <ClusterState > updateOperator ) {
2576
+ final var deterministicTaskQueue = new DeterministicTaskQueue ();
2577
+ final var threadPool = deterministicTaskQueue .getThreadPool ();
2578
+ final var threadContext = threadPool .getThreadContext ();
2579
+ final var failureCaught = new AtomicBoolean ();
2580
+
2581
+ try (
2582
+ var masterService = createMasterService (true , null , threadPool , deterministicTaskQueue .getPrioritizedEsThreadPoolExecutor ());
2583
+ var ignored = threadContext .stashContext ()
2584
+ ) {
2585
+ final var taskId = randomIdentifier ();
2586
+
2587
+ masterService .submitUnbatchedStateUpdateTask (taskId , new ClusterStateUpdateTask () {
2588
+ @ Override
2589
+ public ClusterState execute (ClusterState currentState ) {
2590
+ return updateOperator .apply (currentState );
2591
+ }
2592
+
2593
+ @ Override
2594
+ public void onFailure (Exception e ) {
2595
+ assertThat (
2596
+ asInstanceOf (IllegalStateException .class , e ).getMessage (),
2597
+ allOf (startsWith ("cluster state update executor did not preserve version numbers" ), containsString (taskId ))
2598
+ );
2599
+ assertTrue (failureCaught .compareAndSet (false , true ));
2600
+ }
2601
+ });
2602
+
2603
+ // suppress assertion errors to check production behaviour
2604
+ threadContext .putTransient (MasterService .TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME , new Object ());
2605
+ threadContext .markAsSystemContext ();
2606
+ deterministicTaskQueue .runAllRunnableTasks ();
2607
+ assertFalse (deterministicTaskQueue .hasRunnableTasks ());
2608
+ assertFalse (deterministicTaskQueue .hasDeferredTasks ());
2609
+
2610
+ assertTrue (failureCaught .get ());
2611
+ }
2612
+ }
2613
+
2548
2614
/**
2549
2615
* Returns the cluster state that the master service uses (and that is provided by the discovery layer)
2550
2616
*/
0 commit comments