10
10
import org .apache .logging .log4j .LogManager ;
11
11
import org .apache .logging .log4j .Logger ;
12
12
import org .elasticsearch .action .DocWriteRequest ;
13
- import org .elasticsearch .action .admin .cluster .shards .ClusterSearchShardsRequest ;
14
- import org .elasticsearch .action .admin .cluster .shards .TransportClusterSearchShardsAction ;
15
13
import org .elasticsearch .action .admin .indices .get .GetIndexRequest ;
16
14
import org .elasticsearch .action .admin .indices .get .GetIndexResponse ;
17
15
import org .elasticsearch .action .admin .indices .settings .get .GetSettingsRequest ;
57
55
import java .util .List ;
58
56
import java .util .Locale ;
59
57
import java .util .Map ;
60
- import java .util .concurrent .CountDownLatch ;
61
58
import java .util .concurrent .TimeUnit ;
62
- import java .util .function .Consumer ;
63
59
64
60
import static org .elasticsearch .core .Strings .format ;
65
61
import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
@@ -144,7 +140,7 @@ public void setup(final String sourceIndex, int numOfShards, int numOfReplicas,
144
140
145
141
public void testILMDownsampleRollingRestart () throws Exception {
146
142
final InternalTestCluster cluster = internalCluster ();
147
- final List < String > masterNodes = cluster .startMasterOnlyNodes (1 );
143
+ cluster .startMasterOnlyNodes (1 );
148
144
cluster .startDataOnlyNodes (3 );
149
145
ensureStableCluster (cluster .size ());
150
146
ensureGreen ();
@@ -169,46 +165,16 @@ public void testILMDownsampleRollingRestart() throws Exception {
169
165
.endObject ();
170
166
};
171
167
int indexedDocs = bulkIndex (sourceIndex , sourceSupplier , DOC_COUNT );
172
- final CountDownLatch disruptionStart = new CountDownLatch (1 );
173
- final CountDownLatch disruptionEnd = new CountDownLatch (1 );
174
168
175
- new Thread (new Disruptor (cluster , sourceIndex , new DisruptionListener () {
176
- @ Override
177
- public void disruptionStart () {
178
- disruptionStart .countDown ();
179
- }
180
-
181
- @ Override
182
- public void disruptionEnd () {
183
- disruptionEnd .countDown ();
184
- }
185
- }, masterNodes .get (0 ), (ignored ) -> {
186
- try {
187
- cluster .rollingRestart (new InternalTestCluster .RestartCallback () {
188
- @ Override
189
- public boolean validateClusterForming () {
190
- return true ;
191
- }
192
- });
193
- } catch (Exception e ) {
194
- throw new RuntimeException (e );
195
- }
196
- })).start ();
169
+ cluster .rollingRestart (new InternalTestCluster .RestartCallback ());
197
170
198
171
final String targetIndex = "downsample-1h-" + sourceIndex ;
199
- startDownsampleTaskViaIlm (sourceIndex , targetIndex , disruptionStart , disruptionEnd );
200
- waitUntil (() -> getClusterPendingTasks (cluster .client ()).pendingTasks ().isEmpty (), 60 , TimeUnit .SECONDS );
201
- ensureStableCluster (cluster .numDataAndMasterNodes ());
202
- assertTargetIndex (cluster , targetIndex , indexedDocs );
172
+ startDownsampleTaskViaIlm (sourceIndex , targetIndex );
173
+ assertBusy (() -> assertTargetIndex (cluster , targetIndex , indexedDocs ));
174
+ ensureGreen (targetIndex );
203
175
}
204
176
205
- private void startDownsampleTaskViaIlm (
206
- String sourceIndex ,
207
- String targetIndex ,
208
- CountDownLatch disruptionStart ,
209
- CountDownLatch disruptionEnd
210
- ) throws Exception {
211
- disruptionStart .await ();
177
+ private void startDownsampleTaskViaIlm (String sourceIndex , String targetIndex ) throws Exception {
212
178
var request = new UpdateSettingsRequest (sourceIndex ).settings (
213
179
Settings .builder ().put (LifecycleSettings .LIFECYCLE_NAME , POLICY_NAME )
214
180
);
@@ -231,7 +197,6 @@ private void startDownsampleTaskViaIlm(
231
197
var getSettingsResponse = client ().admin ().indices ().getSettings (new GetSettingsRequest ().indices (targetIndex )).actionGet ();
232
198
assertThat (getSettingsResponse .getSetting (targetIndex , IndexMetadata .INDEX_DOWNSAMPLE_STATUS .getKey ()), equalTo ("success" ));
233
199
}, 60 , TimeUnit .SECONDS );
234
- disruptionEnd .await ();
235
200
}
236
201
237
202
private void assertTargetIndex (final InternalTestCluster cluster , final String targetIndex , int indexedDocs ) {
@@ -294,53 +259,4 @@ private String randomDateForRange(long start, long end) {
294
259
public interface SourceSupplier {
295
260
XContentBuilder get () throws IOException ;
296
261
}
297
-
298
- interface DisruptionListener {
299
- void disruptionStart ();
300
-
301
- void disruptionEnd ();
302
- }
303
-
304
- private class Disruptor implements Runnable {
305
- final InternalTestCluster cluster ;
306
- private final String sourceIndex ;
307
- private final DisruptionListener listener ;
308
- private final String clientNode ;
309
- private final Consumer <String > disruption ;
310
-
311
- private Disruptor (
312
- final InternalTestCluster cluster ,
313
- final String sourceIndex ,
314
- final DisruptionListener listener ,
315
- final String clientNode ,
316
- final Consumer <String > disruption
317
- ) {
318
- this .cluster = cluster ;
319
- this .sourceIndex = sourceIndex ;
320
- this .listener = listener ;
321
- this .clientNode = clientNode ;
322
- this .disruption = disruption ;
323
- }
324
-
325
- @ Override
326
- public void run () {
327
- listener .disruptionStart ();
328
- try {
329
- final String candidateNode = safeExecute (
330
- cluster .client (clientNode ),
331
- TransportClusterSearchShardsAction .TYPE ,
332
- new ClusterSearchShardsRequest (TEST_REQUEST_TIMEOUT , sourceIndex )
333
- ).getNodes ()[0 ].getName ();
334
- logger .info ("Candidate node [" + candidateNode + "]" );
335
- disruption .accept (candidateNode );
336
- ensureGreen (sourceIndex );
337
- ensureStableCluster (cluster .numDataAndMasterNodes (), clientNode );
338
-
339
- } catch (Exception e ) {
340
- logger .error ("Ignoring Error while injecting disruption [" + e .getMessage () + "]" );
341
- } finally {
342
- listener .disruptionEnd ();
343
- }
344
- }
345
- }
346
262
}
0 commit comments