|
50 | 50 | import java.util.Map;
|
51 | 51 | import java.util.SortedMap;
|
52 | 52 | import java.util.TreeMap;
|
| 53 | +import java.util.concurrent.CountDownLatch; |
| 54 | +import java.util.concurrent.atomic.AtomicBoolean; |
53 | 55 |
|
54 | 56 | import static org.elasticsearch.cluster.metadata.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
|
55 | 57 | import static org.hamcrest.Matchers.containsString;
|
@@ -465,4 +467,76 @@ public void testUpdatePolicyButNoPhaseChangeIndexStepsDontChange() throws Except
|
465 | 467 | assertThat(((ShrinkStep) shrinkStep).getNumberOfShards(), equalTo(2));
|
466 | 468 | assertThat(((ShrinkStep) gotStep).getNumberOfShards(), equalTo(1));
|
467 | 469 | }
|
| 470 | + |
| 471 | + public void testGetStepMultithreaded() throws Exception { |
| 472 | + Client client = mock(Client.class); |
| 473 | + Mockito.when(client.settings()).thenReturn(Settings.EMPTY); |
| 474 | + |
| 475 | + LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases("policy"); |
| 476 | + String phaseName = randomFrom(policy.getPhases().keySet()); |
| 477 | + Phase phase = policy.getPhases().get(phaseName); |
| 478 | + |
| 479 | + LifecycleExecutionState lifecycleState = LifecycleExecutionState.builder() |
| 480 | + .setPhaseDefinition(Strings.toString(new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong()))) |
| 481 | + .build(); |
| 482 | + IndexMetadata indexMetadata = IndexMetadata.builder("test") |
| 483 | + .settings( |
| 484 | + Settings.builder() |
| 485 | + .put("index.number_of_shards", 1) |
| 486 | + .put("index.number_of_replicas", 0) |
| 487 | + .put("index.version.created", Version.CURRENT) |
| 488 | + .put(LifecycleSettings.LIFECYCLE_NAME, "policy") |
| 489 | + .build() |
| 490 | + ) |
| 491 | + .putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.asMap()) |
| 492 | + .build(); |
| 493 | + |
| 494 | + SortedMap<String, LifecyclePolicyMetadata> metas = new TreeMap<>(); |
| 495 | + metas.put("policy", new LifecyclePolicyMetadata(policy, Collections.emptyMap(), 1, randomNonNegativeLong())); |
| 496 | + IndexLifecycleMetadata meta = new IndexLifecycleMetadata(metas, OperationMode.RUNNING); |
| 497 | + |
| 498 | + PolicyStepsRegistry registry = new PolicyStepsRegistry(REGISTRY, client, null); |
| 499 | + registry.update(meta); |
| 500 | + |
| 501 | + // test a variety of getStep calls with random actions and steps |
| 502 | + for (int i = 0; i < scaledRandomIntBetween(100, 1000); i++) { |
| 503 | + LifecycleAction action = randomValueOtherThan(MigrateAction.DISABLED, () -> randomFrom(phase.getActions().values())); |
| 504 | + Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY, null)); |
| 505 | + // if the step's key is different from the previous iteration of the loop, then the cache will be updated, and we'll |
| 506 | + // get a non-cached response. if the step's key happens to be the same as the previous iteration of the loop, then |
| 507 | + // we'll get a cached response. so this loop randomly tests both cached and non-cached responses. |
| 508 | + Step actualStep = registry.getStep(indexMetadata, step.getKey()); |
| 509 | + assertThat(actualStep.getKey(), equalTo(step.getKey())); |
| 510 | + } |
| 511 | + |
| 512 | + final CountDownLatch latch = new CountDownLatch(1); |
| 513 | + final AtomicBoolean done = new AtomicBoolean(false); |
| 514 | + |
| 515 | + // now, in another thread, update the registry repeatedly as fast as possible. |
| 516 | + // updating the registry has the side effect of clearing the cache. |
| 517 | + Thread t = new Thread(() -> { |
| 518 | + latch.countDown(); // signal that we're starting |
| 519 | + while (done.get() == false) { |
| 520 | + registry.update(meta); |
| 521 | + } |
| 522 | + }); |
| 523 | + t.start(); |
| 524 | + |
| 525 | + try { |
| 526 | + latch.await(); // wait until the other thread started |
| 527 | + |
| 528 | + // and, while the cache is being repeatedly cleared, |
| 529 | + // test a variety of getStep calls with random actions and steps |
| 530 | + for (int i = 0; i < scaledRandomIntBetween(100, 1000); i++) { |
| 531 | + LifecycleAction action = randomValueOtherThan(MigrateAction.DISABLED, () -> randomFrom(phase.getActions().values())); |
| 532 | + Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY, null)); |
| 533 | + Step actualStep = registry.getStep(indexMetadata, step.getKey()); |
| 534 | + assertThat(actualStep.getKey(), equalTo(step.getKey())); |
| 535 | + } |
| 536 | + } finally { |
| 537 | + // tell the other thread we're finished and wait for it to die |
| 538 | + done.set(true); |
| 539 | + t.join(1000); |
| 540 | + } |
| 541 | + } |
468 | 542 | }
|
0 commit comments