1818import org .elasticsearch .action .datastreams .GetDataStreamAction ;
1919import org .elasticsearch .action .index .IndexRequest ;
2020import org .elasticsearch .action .support .master .AcknowledgedResponse ;
21+ import org .elasticsearch .cluster .ClusterState ;
22+ import org .elasticsearch .cluster .ClusterStateUpdateTask ;
2123import org .elasticsearch .cluster .metadata .ComposableIndexTemplate ;
2224import org .elasticsearch .cluster .metadata .DataStream ;
25+ import org .elasticsearch .cluster .service .ClusterService ;
2326import org .elasticsearch .plugins .Plugin ;
2427import org .elasticsearch .test .ESIntegTestCase ;
25- import org .elasticsearch .test .disruption .IntermittentLongGCDisruption ;
26- import org .elasticsearch .test .disruption .SingleNodeDisruption ;
2728import org .elasticsearch .xcontent .XContentType ;
2829
2930import java .util .Collection ;
3031import java .util .List ;
3132import java .util .concurrent .CountDownLatch ;
33+ import java .util .concurrent .CyclicBarrier ;
3234import java .util .concurrent .ExecutionException ;
3335
3436import static org .hamcrest .Matchers .equalTo ;
@@ -43,25 +45,38 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
4345 }
4446
4547 public void testRolloverIsExecutedOnce () throws ExecutionException , InterruptedException {
46- String masterNode = internalCluster ().startMasterOnlyNode ();
48+ internalCluster ().startMasterOnlyNode ();
4749 internalCluster ().startDataOnlyNodes (3 );
4850 ensureStableCluster (4 );
4951
5052 String dataStreamName = "my-data-stream" ;
5153 createDataStream (dataStreamName );
5254
5355 // Mark it to lazy rollover
54- new RolloverRequestBuilder (client ()).setRolloverTarget (dataStreamName ).lazy (true ).execute (). get ( );
56+ safeGet ( new RolloverRequestBuilder (client ()).setRolloverTarget (dataStreamName ).lazy (true ).execute ());
5557
5658 // Verify that the data stream is marked for rollover and that it has currently one index
5759 DataStream dataStream = getDataStream (dataStreamName );
5860 assertThat (dataStream .rolloverOnWrite (), equalTo (true ));
5961 assertThat (dataStream .getBackingIndices ().getIndices ().size (), equalTo (1 ));
6062
6163 // Introduce a disruption to the master node that should delay the rollover execution
62- SingleNodeDisruption masterNodeDisruption = new IntermittentLongGCDisruption (random (), masterNode , 100 , 200 , 30000 , 60000 );
63- internalCluster ().setDisruptionScheme (masterNodeDisruption );
64- masterNodeDisruption .startDisrupting ();
64+ final var barrier = new CyclicBarrier (2 );
65+ internalCluster ().getCurrentMasterNodeInstance (ClusterService .class )
66+ .submitUnbatchedStateUpdateTask ("block" , new ClusterStateUpdateTask () {
67+ @ Override
68+ public ClusterState execute (ClusterState currentState ) {
69+ safeAwait (barrier );
70+ safeAwait (barrier );
71+ return currentState ;
72+ }
73+
74+ @ Override
75+ public void onFailure (Exception e ) {
76+ fail (e );
77+ }
78+ });
79+ safeAwait (barrier );
6580
6681 // Start indexing operations
6782 int docs = randomIntBetween (5 , 10 );
@@ -84,10 +99,10 @@ public void onFailure(Exception e) {
8499 }
85100
86101 // End the disruption so that all pending tasks will complete
87- masterNodeDisruption . stopDisrupting ( );
102+ safeAwait ( barrier );
88103
89104 // Wait for all the indexing requests to be processed successfully
90- countDownLatch . await ( );
105+ safeAwait ( countDownLatch );
91106
92107 // Verify that the rollover has happened once
93108 dataStream = getDataStream (dataStreamName );
@@ -96,10 +111,12 @@ public void onFailure(Exception e) {
96111 }
97112
98113 private DataStream getDataStream (String dataStreamName ) {
99- return client ().execute (
100- GetDataStreamAction .INSTANCE ,
101- new GetDataStreamAction .Request (TEST_REQUEST_TIMEOUT , new String [] { dataStreamName })
102- ).actionGet ().getDataStreams ().get (0 ).getDataStream ();
114+ return safeGet (
115+ client ().execute (
116+ GetDataStreamAction .INSTANCE ,
117+ new GetDataStreamAction .Request (TEST_REQUEST_TIMEOUT , new String [] { dataStreamName })
118+ )
119+ ).getDataStreams ().get (0 ).getDataStream ();
103120 }
104121
105122 private void createDataStream (String dataStreamName ) throws InterruptedException , ExecutionException {
@@ -111,19 +128,19 @@ private void createDataStream(String dataStreamName) throws InterruptedException
111128 .dataStreamTemplate (new ComposableIndexTemplate .DataStreamTemplate (false , false ))
112129 .build ()
113130 );
114- final AcknowledgedResponse putComposableTemplateResponse = client ().execute (
115- TransportPutComposableIndexTemplateAction .TYPE ,
116- putComposableTemplateRequest
117- ).actionGet ();
131+ final AcknowledgedResponse putComposableTemplateResponse = safeGet (
132+ client ().execute (TransportPutComposableIndexTemplateAction .TYPE , putComposableTemplateRequest )
133+ );
118134 assertThat (putComposableTemplateResponse .isAcknowledged (), is (true ));
119135
120136 final CreateDataStreamAction .Request createDataStreamRequest = new CreateDataStreamAction .Request (
121137 TEST_REQUEST_TIMEOUT ,
122138 TEST_REQUEST_TIMEOUT ,
123139 dataStreamName
124140 );
125- final AcknowledgedResponse createDataStreamResponse = client ().execute (CreateDataStreamAction .INSTANCE , createDataStreamRequest )
126- .get ();
141+ final AcknowledgedResponse createDataStreamResponse = safeGet (
142+ client ().execute (CreateDataStreamAction .INSTANCE , createDataStreamRequest )
143+ );
127144 assertThat (createDataStreamResponse .isAcknowledged (), is (true ));
128145 }
129146}
0 commit comments