1010import org .apache .logging .log4j .LogManager ;
1111import org .apache .logging .log4j .Logger ;
1212import org .elasticsearch .action .DocWriteRequest ;
13- import org .elasticsearch .action .admin .cluster .shards .ClusterSearchShardsRequest ;
14- import org .elasticsearch .action .admin .cluster .shards .TransportClusterSearchShardsAction ;
1513import org .elasticsearch .action .admin .indices .get .GetIndexRequest ;
1614import org .elasticsearch .action .admin .indices .get .GetIndexResponse ;
1715import org .elasticsearch .action .admin .indices .settings .get .GetSettingsRequest ;
5755import java .util .List ;
5856import java .util .Locale ;
5957import java .util .Map ;
60- import java .util .concurrent .CountDownLatch ;
6158import java .util .concurrent .TimeUnit ;
62- import java .util .function .Consumer ;
6359
6460import static org .elasticsearch .core .Strings .format ;
6561import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
@@ -144,7 +140,7 @@ public void setup(final String sourceIndex, int numOfShards, int numOfReplicas,
144140
145141 public void testILMDownsampleRollingRestart () throws Exception {
146142 final InternalTestCluster cluster = internalCluster ();
147- final List < String > masterNodes = cluster .startMasterOnlyNodes (1 );
143+ cluster .startMasterOnlyNodes (1 );
148144 cluster .startDataOnlyNodes (3 );
149145 ensureStableCluster (cluster .size ());
150146 ensureGreen ();
@@ -169,46 +165,16 @@ public void testILMDownsampleRollingRestart() throws Exception {
169165 .endObject ();
170166 };
171167 int indexedDocs = bulkIndex (sourceIndex , sourceSupplier , DOC_COUNT );
172- final CountDownLatch disruptionStart = new CountDownLatch (1 );
173- final CountDownLatch disruptionEnd = new CountDownLatch (1 );
174168
175- new Thread (new Disruptor (cluster , sourceIndex , new DisruptionListener () {
176- @ Override
177- public void disruptionStart () {
178- disruptionStart .countDown ();
179- }
180-
181- @ Override
182- public void disruptionEnd () {
183- disruptionEnd .countDown ();
184- }
185- }, masterNodes .get (0 ), (ignored ) -> {
186- try {
187- cluster .rollingRestart (new InternalTestCluster .RestartCallback () {
188- @ Override
189- public boolean validateClusterForming () {
190- return true ;
191- }
192- });
193- } catch (Exception e ) {
194- throw new RuntimeException (e );
195- }
196- })).start ();
169+ cluster .rollingRestart (new InternalTestCluster .RestartCallback ());
197170
198171 final String targetIndex = "downsample-1h-" + sourceIndex ;
199- startDownsampleTaskViaIlm (sourceIndex , targetIndex , disruptionStart , disruptionEnd );
200- waitUntil (() -> getClusterPendingTasks (cluster .client ()).pendingTasks ().isEmpty (), 60 , TimeUnit .SECONDS );
201- ensureStableCluster (cluster .numDataAndMasterNodes ());
202- assertTargetIndex (cluster , targetIndex , indexedDocs );
172+ startDownsampleTaskViaIlm (sourceIndex , targetIndex );
173+ assertBusy (() -> assertTargetIndex (cluster , targetIndex , indexedDocs ));
174+ ensureGreen (targetIndex );
203175 }
204176
205- private void startDownsampleTaskViaIlm (
206- String sourceIndex ,
207- String targetIndex ,
208- CountDownLatch disruptionStart ,
209- CountDownLatch disruptionEnd
210- ) throws Exception {
211- disruptionStart .await ();
177+ private void startDownsampleTaskViaIlm (String sourceIndex , String targetIndex ) throws Exception {
212178 var request = new UpdateSettingsRequest (sourceIndex ).settings (
213179 Settings .builder ().put (LifecycleSettings .LIFECYCLE_NAME , POLICY_NAME )
214180 );
@@ -231,7 +197,6 @@ private void startDownsampleTaskViaIlm(
231197 var getSettingsResponse = client ().admin ().indices ().getSettings (new GetSettingsRequest ().indices (targetIndex )).actionGet ();
232198 assertThat (getSettingsResponse .getSetting (targetIndex , IndexMetadata .INDEX_DOWNSAMPLE_STATUS .getKey ()), equalTo ("success" ));
233199 }, 60 , TimeUnit .SECONDS );
234- disruptionEnd .await ();
235200 }
236201
237202 private void assertTargetIndex (final InternalTestCluster cluster , final String targetIndex , int indexedDocs ) {
@@ -294,53 +259,4 @@ private String randomDateForRange(long start, long end) {
294259 public interface SourceSupplier {
295260 XContentBuilder get () throws IOException ;
296261 }
297-
298- interface DisruptionListener {
299- void disruptionStart ();
300-
301- void disruptionEnd ();
302- }
303-
304- private class Disruptor implements Runnable {
305- final InternalTestCluster cluster ;
306- private final String sourceIndex ;
307- private final DisruptionListener listener ;
308- private final String clientNode ;
309- private final Consumer <String > disruption ;
310-
311- private Disruptor (
312- final InternalTestCluster cluster ,
313- final String sourceIndex ,
314- final DisruptionListener listener ,
315- final String clientNode ,
316- final Consumer <String > disruption
317- ) {
318- this .cluster = cluster ;
319- this .sourceIndex = sourceIndex ;
320- this .listener = listener ;
321- this .clientNode = clientNode ;
322- this .disruption = disruption ;
323- }
324-
325- @ Override
326- public void run () {
327- listener .disruptionStart ();
328- try {
329- final String candidateNode = safeExecute (
330- cluster .client (clientNode ),
331- TransportClusterSearchShardsAction .TYPE ,
332- new ClusterSearchShardsRequest (TEST_REQUEST_TIMEOUT , sourceIndex )
333- ).getNodes ()[0 ].getName ();
334- logger .info ("Candidate node [" + candidateNode + "]" );
335- disruption .accept (candidateNode );
336- ensureGreen (sourceIndex );
337- ensureStableCluster (cluster .numDataAndMasterNodes (), clientNode );
338-
339- } catch (Exception e ) {
340- logger .error ("Ignoring Error while injecting disruption [" + e .getMessage () + "]" );
341- } finally {
342- listener .disruptionEnd ();
343- }
344- }
345- }
346262}
0 commit comments