|
48 | 48 | import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
|
49 | 49 | import org.junit.After;
|
50 | 50 | import org.junit.Before;
|
| 51 | +import org.mockito.verification.VerificationMode; |
51 | 52 |
|
52 | 53 | import java.time.Clock;
|
53 | 54 | import java.util.Collections;
|
|
58 | 59 | import static java.util.stream.Collectors.toList;
|
59 | 60 | import static org.hamcrest.Matchers.contains;
|
60 | 61 | import static org.hamcrest.Matchers.containsInAnyOrder;
|
| 62 | +import static org.hamcrest.Matchers.containsString; |
61 | 63 | import static org.hamcrest.Matchers.empty;
|
62 | 64 | import static org.hamcrest.Matchers.equalTo;
|
63 | 65 | import static org.hamcrest.Matchers.is;
|
64 | 66 | import static org.hamcrest.Matchers.nullValue;
|
65 | 67 | import static org.mockito.ArgumentMatchers.any;
|
| 68 | +import static org.mockito.ArgumentMatchers.anyLong; |
66 | 69 | import static org.mockito.Mockito.mock;
|
| 70 | +import static org.mockito.Mockito.never; |
67 | 71 | import static org.mockito.Mockito.times;
|
68 | 72 | import static org.mockito.Mockito.verify;
|
| 73 | +import static org.mockito.Mockito.verifyNoInteractions; |
69 | 74 | import static org.mockito.Mockito.when;
|
70 | 75 |
|
71 | 76 | public class TransformTaskTests extends ESTestCase {
|
@@ -449,6 +454,117 @@ public void testApplyNewAuthState() {
|
449 | 454 | assertThat(transformTask.getContext().getAuthState(), is(nullValue()));
|
450 | 455 | }
|
451 | 456 |
|
| 457 | + private TransformTask createTransformTask(TransformConfig transformConfig, MockTransformAuditor auditor) { |
| 458 | + var threadPool = mock(ThreadPool.class); |
| 459 | + |
| 460 | + var transformState = new TransformState( |
| 461 | + TransformTaskState.STARTED, |
| 462 | + IndexerState.STARTED, |
| 463 | + null, |
| 464 | + 0L, |
| 465 | + "because", |
| 466 | + null, |
| 467 | + null, |
| 468 | + false, |
| 469 | + null |
| 470 | + ); |
| 471 | + |
| 472 | + return new TransformTask( |
| 473 | + 42, |
| 474 | + "some_type", |
| 475 | + "some_action", |
| 476 | + TaskId.EMPTY_TASK_ID, |
| 477 | + createTransformTaskParams(transformConfig.getId()), |
| 478 | + transformState, |
| 479 | + new TransformScheduler(Clock.systemUTC(), threadPool, Settings.EMPTY), |
| 480 | + auditor, |
| 481 | + threadPool, |
| 482 | + Collections.emptyMap() |
| 483 | + ); |
| 484 | + } |
| 485 | + |
| 486 | + public void testInitializeIndexerWhenAlreadyInitialized() { |
| 487 | + var transformTask = createTransformTask( |
| 488 | + TransformConfigTests.randomTransformConfigWithoutHeaders(), |
| 489 | + MockTransformAuditor.createMockAuditor() |
| 490 | + ); |
| 491 | + transformTask.initializeIndexer(mock(ClientTransformIndexerBuilder.class)); |
| 492 | + IllegalStateException e = expectThrows( |
| 493 | + IllegalStateException.class, |
| 494 | + () -> transformTask.initializeIndexer(mock(ClientTransformIndexerBuilder.class)) |
| 495 | + ); |
| 496 | + assertThat(e.getMessage(), containsString("The object cannot be set twice!")); |
| 497 | + } |
| 498 | + |
| 499 | + public void testTriggeredIsNoOpWhenTransformIdMismatch() { |
| 500 | + var transformId = randomAlphaOfLengthBetween(1, 10); |
| 501 | + var transformTask = createTransformTask( |
| 502 | + TransformConfigTests.randomTransformConfigWithoutHeaders(transformId), |
| 503 | + MockTransformAuditor.createMockAuditor() |
| 504 | + ); |
| 505 | + var indexer = mock(ClientTransformIndexer.class); |
| 506 | + transformTask.initializeIndexer(indexer); |
| 507 | + transformTask.triggered(new TransformScheduler.Event("not-" + transformId, randomNonNegativeLong(), randomNonNegativeLong())); |
| 508 | + verifyNoInteractions(indexer); |
| 509 | + } |
| 510 | + |
| 511 | + public void testTriggeredIsNoOpWhenIndexerIsUninitialized() { |
| 512 | + var transformId = randomAlphaOfLengthBetween(1, 10); |
| 513 | + var transformTask = createTransformTask( |
| 514 | + TransformConfigTests.randomTransformConfigWithoutHeaders(transformId), |
| 515 | + MockTransformAuditor.createMockAuditor() |
| 516 | + ); |
| 517 | + transformTask.triggered(new TransformScheduler.Event(transformId, randomNonNegativeLong(), randomNonNegativeLong())); |
| 518 | + } |
| 519 | + |
| 520 | + public void testTriggeredIsNoOpWhenStateIsWrong() { |
| 521 | + testTriggered(TransformTaskState.STOPPED, IndexerState.INDEXING, never()); |
| 522 | + testTriggered(TransformTaskState.STOPPED, IndexerState.STOPPING, never()); |
| 523 | + testTriggered(TransformTaskState.STOPPED, IndexerState.STOPPED, never()); |
| 524 | + testTriggered(TransformTaskState.STOPPED, IndexerState.ABORTING, never()); |
| 525 | + testTriggered(TransformTaskState.STOPPED, IndexerState.STARTED, never()); |
| 526 | + testTriggered(TransformTaskState.FAILED, IndexerState.INDEXING, never()); |
| 527 | + testTriggered(TransformTaskState.FAILED, IndexerState.STOPPING, never()); |
| 528 | + testTriggered(TransformTaskState.FAILED, IndexerState.STOPPED, never()); |
| 529 | + testTriggered(TransformTaskState.FAILED, IndexerState.ABORTING, never()); |
| 530 | + testTriggered(TransformTaskState.FAILED, IndexerState.STARTED, never()); |
| 531 | + testTriggered(TransformTaskState.STARTED, IndexerState.INDEXING, never()); |
| 532 | + testTriggered(TransformTaskState.STARTED, IndexerState.STOPPING, never()); |
| 533 | + testTriggered(TransformTaskState.STARTED, IndexerState.STOPPED, never()); |
| 534 | + testTriggered(TransformTaskState.STARTED, IndexerState.ABORTING, never()); |
| 535 | + } |
| 536 | + |
| 537 | + public void testTriggeredActuallyTriggersIndexer() { |
| 538 | + testTriggered(TransformTaskState.STARTED, IndexerState.STARTED, times(1)); |
| 539 | + } |
| 540 | + |
| 541 | + private void testTriggered(TransformTaskState taskState, IndexerState indexerState, VerificationMode indexerVerificationMode) { |
| 542 | + String transformId = randomAlphaOfLengthBetween(1, 10); |
| 543 | + TransformState transformState = new TransformState(taskState, indexerState, null, 0L, "because", null, null, false, null); |
| 544 | + ThreadPool threadPool = mock(ThreadPool.class); |
| 545 | + TransformAuditor auditor = mock(TransformAuditor.class); |
| 546 | + TransformTask transformTask = new TransformTask( |
| 547 | + 42, |
| 548 | + "some_type", |
| 549 | + "some_action", |
| 550 | + TaskId.EMPTY_TASK_ID, |
| 551 | + createTransformTaskParams(transformId), |
| 552 | + transformState, |
| 553 | + new TransformScheduler(mock(Clock.class), threadPool, Settings.EMPTY), |
| 554 | + auditor, |
| 555 | + threadPool, |
| 556 | + Collections.emptyMap() |
| 557 | + ); |
| 558 | + |
| 559 | + ClientTransformIndexer indexer = mock(ClientTransformIndexer.class); |
| 560 | + when(indexer.getState()).thenReturn(indexerState); |
| 561 | + transformTask.initializeIndexer(indexer); |
| 562 | + transformTask.triggered(new TransformScheduler.Event(transformId, randomNonNegativeLong(), randomNonNegativeLong())); |
| 563 | + |
| 564 | + verify(indexer, indexerVerificationMode).maybeTriggerAsyncJob(anyLong()); |
| 565 | + verifyNoInteractions(auditor, threadPool); |
| 566 | + } |
| 567 | + |
452 | 568 | private static TransformTaskParams createTransformTaskParams(String transformId) {
|
453 | 569 | return new TransformTaskParams(transformId, TransformConfigVersion.CURRENT, TimeValue.timeValueSeconds(10), false);
|
454 | 570 | }
|
|
0 commit comments