|
9 | 9 | import org.apache.lucene.util.SetOnce; |
10 | 10 | import org.elasticsearch.action.admin.indices.get.GetIndexAction; |
11 | 11 | import org.elasticsearch.action.admin.indices.get.GetIndexRequest; |
| 12 | +import org.elasticsearch.action.admin.indices.refresh.RefreshAction; |
| 13 | +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; |
| 14 | +import org.elasticsearch.action.support.broadcast.BroadcastResponse; |
12 | 15 | import org.elasticsearch.cluster.ClusterState; |
13 | 16 | import org.elasticsearch.common.Strings; |
14 | 17 | import org.elasticsearch.common.settings.Settings; |
|
56 | 59 | import org.elasticsearch.xpack.core.ml.utils.QueryProvider; |
57 | 60 | import org.elasticsearch.xpack.ml.dataframe.StoredProgress; |
58 | 61 | import org.hamcrest.Matcher; |
59 | | -import org.hamcrest.Matchers; |
60 | 62 |
|
61 | 63 | import java.io.IOException; |
62 | 64 | import java.io.UncheckedIOException; |
63 | 65 | import java.util.ArrayList; |
64 | | -import java.util.Arrays; |
65 | 66 | import java.util.Collection; |
66 | 67 | import java.util.Collections; |
67 | 68 | import java.util.HashSet; |
68 | 69 | import java.util.List; |
| 70 | +import java.util.Locale; |
69 | 71 | import java.util.Map; |
70 | 72 | import java.util.Optional; |
71 | 73 | import java.util.Set; |
72 | 74 | import java.util.concurrent.TimeUnit; |
| 75 | +import java.util.stream.Collectors; |
73 | 76 |
|
74 | 77 | import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; |
75 | 78 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; |
76 | 79 | import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; |
| 80 | +import static org.hamcrest.Matchers.anyOf; |
77 | 81 | import static org.hamcrest.Matchers.arrayWithSize; |
78 | 82 | import static org.hamcrest.Matchers.equalTo; |
79 | 83 | import static org.hamcrest.Matchers.greaterThan; |
80 | 84 | import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
81 | | -import static org.hamcrest.Matchers.hasItems; |
82 | 85 | import static org.hamcrest.Matchers.hasSize; |
83 | 86 | import static org.hamcrest.Matchers.is; |
84 | 87 | import static org.hamcrest.Matchers.nullValue; |
@@ -359,23 +362,44 @@ protected TrainedModelDefinition getModelDefinition(String modelId) throws IOExc |
359 | 362 | /** |
360 | 363 | * Asserts whether the audit messages fetched from index match provided prefixes. |
361 | 364 | * More specifically, in order to pass: |
362 | | - * 1. the number of fetched messages must equal the number of provided prefixes |
| 365 | + * 1. ALL expected message prefixes must be found in the fetched messages |
363 | 366 | * AND |
364 | | - * 2. each fetched message must start with the corresponding prefix |
| 367 | + * 2. each fetched message that matches must start with the corresponding prefix |
365 | 368 | */ |
366 | 369 | protected static void assertThatAuditMessagesMatch(String configId, String... expectedAuditMessagePrefixes) throws Exception { |
367 | 370 | // Make sure we wrote to the audit |
368 | 371 | // Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start, |
369 | 372 | // finished the job (as this is a very short analytics job), all without the audit being fully written. |
370 | 373 | awaitIndexExists(NotificationsIndex.NOTIFICATIONS_INDEX); |
371 | 374 |
|
372 | | - @SuppressWarnings("unchecked") |
373 | | - Matcher<String>[] itemMatchers = Arrays.stream(expectedAuditMessagePrefixes).map(Matchers::startsWith).toArray(Matcher[]::new); |
| 375 | + Set<String> expectedPrefixes = Set.of(expectedAuditMessagePrefixes); |
374 | 376 | assertBusy(() -> { |
| 377 | + // Refresh the notifications index to ensure latest writes are visible |
| 378 | + RefreshRequest refreshRequest = new RefreshRequest(NotificationsIndex.NOTIFICATIONS_INDEX); |
| 379 | + BroadcastResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet(); |
| 380 | + assertThat(refreshResponse.getStatus().getStatus(), anyOf(equalTo(200), equalTo(201))); |
| 381 | + |
375 | 382 | List<String> allAuditMessages = fetchAllAuditMessages(configId); |
376 | | - assertThat(allAuditMessages, hasItems(itemMatchers)); |
377 | | - // TODO: Consider restoring this assertion when we are sure all the audit messages are available at this point. |
378 | | - // assertThat("Messages: " + allAuditMessages, allAuditMessages, hasSize(expectedAuditMessagePrefixes.length)); |
| 383 | + |
| 384 | + // Find which expected prefixes match any of the audit messages |
| 385 | + Set<String> foundPrefixes = expectedPrefixes.stream() |
| 386 | + .filter(prefix -> allAuditMessages.stream().anyMatch(msg -> msg.startsWith(prefix))) |
| 387 | + .collect(Collectors.toSet()); |
| 388 | + |
| 389 | + // Only calculate missing prefixes if not all were found |
| 390 | + if (foundPrefixes.size() != expectedPrefixes.size()) { |
| 391 | + Set<String> missingPrefixes = new HashSet<>(expectedPrefixes); |
| 392 | + missingPrefixes.removeAll(foundPrefixes); |
| 393 | + fail( |
| 394 | + String.format( |
| 395 | + Locale.ROOT, |
| 396 | + "Expected audit messages not found for config [%s]. Missing prefixes: %s. Found messages: %s", |
| 397 | + configId, |
| 398 | + missingPrefixes, |
| 399 | + allAuditMessages |
| 400 | + ) |
| 401 | + ); |
| 402 | + } |
379 | 403 | }); |
380 | 404 | } |
381 | 405 |
|
|
0 commit comments