|
8 | 8 | package org.elasticsearch.xpack.esql.action; |
9 | 9 |
|
10 | 10 | import org.elasticsearch.Build; |
| 11 | +import org.elasticsearch.action.DocWriteRequest; |
11 | 12 | import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; |
12 | 13 | import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; |
13 | 14 | import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; |
| 15 | +import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteComposableIndexTemplateAction; |
| 16 | +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; |
14 | 17 | import org.elasticsearch.action.bulk.BulkRequestBuilder; |
| 18 | +import org.elasticsearch.action.bulk.BulkResponse; |
| 19 | +import org.elasticsearch.action.datastreams.DeleteDataStreamAction; |
15 | 20 | import org.elasticsearch.action.index.IndexRequest; |
16 | 21 | import org.elasticsearch.action.index.IndexRequestBuilder; |
17 | 22 | import org.elasticsearch.action.support.WriteRequest; |
18 | 23 | import org.elasticsearch.client.internal.ClusterAdminClient; |
| 24 | +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; |
| 25 | +import org.elasticsearch.cluster.metadata.DataStream; |
| 26 | +import org.elasticsearch.cluster.metadata.DataStreamFailureStore; |
| 27 | +import org.elasticsearch.cluster.metadata.DataStreamOptions; |
19 | 28 | import org.elasticsearch.cluster.metadata.IndexMetadata; |
| 29 | +import org.elasticsearch.cluster.metadata.ResettableValue; |
| 30 | +import org.elasticsearch.cluster.metadata.Template; |
20 | 31 | import org.elasticsearch.cluster.node.DiscoveryNode; |
21 | 32 | import org.elasticsearch.common.collect.Iterators; |
| 33 | +import org.elasticsearch.common.compress.CompressedXContent; |
22 | 34 | import org.elasticsearch.common.settings.Setting; |
23 | 35 | import org.elasticsearch.common.settings.Settings; |
| 36 | +import org.elasticsearch.core.TimeValue; |
| 37 | +import org.elasticsearch.datastreams.DataStreamsPlugin; |
24 | 38 | import org.elasticsearch.index.Index; |
25 | 39 | import org.elasticsearch.index.IndexService; |
26 | 40 | import org.elasticsearch.index.IndexSettings; |
| 41 | +import org.elasticsearch.index.mapper.DateFieldMapper; |
| 42 | +import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; |
27 | 43 | import org.elasticsearch.index.query.QueryBuilder; |
28 | 44 | import org.elasticsearch.index.query.RangeQueryBuilder; |
29 | 45 | import org.elasticsearch.index.shard.IndexShard; |
30 | 46 | import org.elasticsearch.indices.IndicesService; |
| 47 | +import org.elasticsearch.plugins.Plugin; |
31 | 48 | import org.elasticsearch.test.ESTestCase; |
32 | 49 | import org.elasticsearch.test.ListMatcher; |
33 | 50 | import org.elasticsearch.xcontent.XContentBuilder; |
|
38 | 55 | import org.elasticsearch.xpack.esql.parser.ParsingException; |
39 | 56 | import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; |
40 | 57 | import org.elasticsearch.xpack.esql.plugin.QueryPragmas; |
| 58 | +import org.junit.Assume; |
41 | 59 | import org.junit.Before; |
42 | 60 |
|
43 | 61 | import java.io.IOException; |
44 | 62 | import java.util.ArrayList; |
45 | 63 | import java.util.Arrays; |
| 64 | +import java.util.Collection; |
46 | 65 | import java.util.Collections; |
47 | 66 | import java.util.Comparator; |
48 | 67 | import java.util.HashMap; |
|
58 | 77 | import java.util.concurrent.TimeUnit; |
59 | 78 | import java.util.concurrent.atomic.AtomicBoolean; |
60 | 79 | import java.util.concurrent.atomic.AtomicLong; |
| 80 | +import java.util.function.BiConsumer; |
61 | 81 | import java.util.stream.IntStream; |
62 | 82 | import java.util.stream.LongStream; |
| 83 | +import java.util.stream.Stream; |
63 | 84 |
|
64 | 85 | import static java.util.Comparator.comparing; |
65 | 86 | import static java.util.Comparator.naturalOrder; |
@@ -100,6 +121,11 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { |
100 | 121 | .build(); |
101 | 122 | } |
102 | 123 |
|
| 124 | + @Override |
| 125 | + protected Collection<Class<? extends Plugin>> nodePlugins() { |
| 126 | + return Stream.concat(super.nodePlugins().stream(), Stream.of(DataStreamsPlugin.class, MapperExtrasPlugin.class)).toList(); |
| 127 | + } |
| 128 | + |
103 | 129 | public void testProjectConstant() { |
104 | 130 | try (EsqlQueryResponse results = run("from test | eval x = 1 | keep x")) { |
105 | 131 | assertThat(results.columns(), equalTo(List.of(new ColumnInfoImpl("x", "integer")))); |
@@ -970,6 +996,176 @@ public void testIndexPatterns() throws Exception { |
970 | 996 | } |
971 | 997 | } |
972 | 998 |
|
| 999 | + public void testDataStreamPatterns() throws Exception { |
| 1000 | + Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled()); |
| 1001 | + |
| 1002 | + Map<String, Long> testCases = new HashMap<>(); |
| 1003 | + // Concrete data stream with each selector |
| 1004 | + testCases.put("test_ds_patterns_1", 5L); |
| 1005 | + testCases.put("test_ds_patterns_1::data", 5L); |
| 1006 | + testCases.put("test_ds_patterns_1::failures", 3L); |
| 1007 | + testCases.put("test_ds_patterns_2", 5L); |
| 1008 | + testCases.put("test_ds_patterns_2::data", 5L); |
| 1009 | + testCases.put("test_ds_patterns_2::failures", 3L); |
| 1010 | + |
| 1011 | + // Wildcard pattern with each selector |
| 1012 | + testCases.put("test_ds_patterns*", 15L); |
| 1013 | + testCases.put("test_ds_patterns*::data", 15L); |
| 1014 | + testCases.put("test_ds_patterns*::failures", 9L); |
| 1015 | + |
| 1016 | + // Match all pattern with each selector |
| 1017 | + testCases.put("*", 15L); |
| 1018 | + testCases.put("*::data", 15L); |
| 1019 | + testCases.put("*::failures", 9L); |
| 1020 | + |
| 1021 | + // Concrete multi-pattern |
| 1022 | + testCases.put("test_ds_patterns_1,test_ds_patterns_2", 10L); |
| 1023 | + testCases.put("test_ds_patterns_1::data,test_ds_patterns_2::data", 10L); |
| 1024 | + testCases.put("test_ds_patterns_1::failures,test_ds_patterns_2::failures", 6L); |
| 1025 | + |
| 1026 | + // Wildcard multi-pattern |
| 1027 | + testCases.put("test_ds_patterns_1*,test_ds_patterns_2*", 10L); |
| 1028 | + testCases.put("test_ds_patterns_1*::data,test_ds_patterns_2*::data", 10L); |
| 1029 | + testCases.put("test_ds_patterns_1*::failures,test_ds_patterns_2*::failures", 6L); |
| 1030 | + |
| 1031 | + // Wildcard pattern with data stream exclusions for each selector combination (data stream exclusions need * on the end to negate) |
| 1032 | + // None (default) |
| 1033 | + testCases.put("test_ds_patterns*,-test_ds_patterns_2*", 10L); |
| 1034 | + testCases.put("test_ds_patterns*,-test_ds_patterns_2*::data", 10L); |
| 1035 | + testCases.put("test_ds_patterns*,-test_ds_patterns_2*::failures", 15L); |
| 1036 | + // Subtracting from ::data |
| 1037 | + testCases.put("test_ds_patterns*::data,-test_ds_patterns_2*", 10L); |
| 1038 | + testCases.put("test_ds_patterns*::data,-test_ds_patterns_2*::data", 10L); |
| 1039 | + testCases.put("test_ds_patterns*::data,-test_ds_patterns_2*::failures", 15L); |
| 1040 | + // Subtracting from ::failures |
| 1041 | + testCases.put("test_ds_patterns*::failures,-test_ds_patterns_2*", 9L); |
| 1042 | + testCases.put("test_ds_patterns*::failures,-test_ds_patterns_2*::data", 9L); |
| 1043 | + testCases.put("test_ds_patterns*::failures,-test_ds_patterns_2*::failures", 6L); |
| 1044 | + // Subtracting from ::* |
| 1045 | + testCases.put("test_ds_patterns*::data,test_ds_patterns*::failures,-test_ds_patterns_2*", 19L); |
| 1046 | + testCases.put("test_ds_patterns*::data,test_ds_patterns*::failures,-test_ds_patterns_2*::data", 19L); |
| 1047 | + testCases.put("test_ds_patterns*::data,test_ds_patterns*::failures,-test_ds_patterns_2*::failures", 21L); |
| 1048 | + |
| 1049 | + testCases.put("\"test_ds_patterns_1,test_ds_patterns_2\"::failures", 8L); |
| 1050 | + |
| 1051 | + runDataStreamTest(testCases, new String[] { "test_ds_patterns_1", "test_ds_patterns_2", "test_ds_patterns_3" }, (key, value) -> { |
| 1052 | + try (var results = run("from " + key + " | stats count(@timestamp)")) { |
| 1053 | + assertEquals(key, 1, getValuesList(results).size()); |
| 1054 | + assertEquals(key, value, getValuesList(results).get(0).get(0)); |
| 1055 | + } |
| 1056 | + }); |
| 1057 | + } |
| 1058 | + |
| 1059 | + public void testDataStreamInvalidPatterns() throws Exception { |
| 1060 | + Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled()); |
| 1061 | + |
| 1062 | + Map<String, String> testCases = new HashMap<>(); |
| 1063 | + // === Errors |
| 1064 | + // Only recognized components can be selected |
| 1065 | + testCases.put("testXXX::custom", "invalid usage of :: separator, [custom] is not a recognized selector"); |
| 1066 | + // Spelling is important |
| 1067 | + testCases.put("testXXX::failres", "invalid usage of :: separator, [failres] is not a recognized selector"); |
| 1068 | + // Only the match all wildcard is supported |
| 1069 | + testCases.put("testXXX::d*ta", "invalid usage of :: separator, [d*ta] is not a recognized selector"); |
| 1070 | + // The first instance of :: is split upon so that you cannot chain the selector |
| 1071 | + testCases.put("test::XXX::data", "mismatched input '::' expecting {"); |
| 1072 | + // Selectors must be outside of date math expressions or else they trip up the selector parsing |
| 1073 | + testCases.put("<test-{now/d}::failures>", "Invalid index name [<test-{now/d}], must not contain the following characters ["); |
| 1074 | + // Only one selector separator is allowed per expression |
| 1075 | + testCases.put("::::data", "mismatched input '::' expecting {"); |
| 1076 | + // Suffix case is not supported because there is no component named with the empty string |
| 1077 | + testCases.put("index::", "missing {QUOTED_STRING, UNQUOTED_SOURCE} at '|'"); |
| 1078 | + |
| 1079 | + runDataStreamTest(testCases, new String[] { "test_ds_patterns_1" }, (key, value) -> { |
| 1080 | + logger.info(key); |
| 1081 | + var exception = expectThrows(ParsingException.class, () -> { run("from " + key + " | stats count(@timestamp)").close(); }); |
| 1082 | + assertThat(exception.getMessage(), containsString(value)); |
| 1083 | + }); |
| 1084 | + } |
| 1085 | + |
| 1086 | + private <V> void runDataStreamTest(Map<String, V> testCases, String[] dsNames, BiConsumer<String, V> testMethod) throws IOException { |
| 1087 | + boolean deleteTemplate = false; |
| 1088 | + List<String> deleteDataStreams = new ArrayList<>(); |
| 1089 | + try { |
| 1090 | + assertAcked( |
| 1091 | + client().execute( |
| 1092 | + TransportPutComposableIndexTemplateAction.TYPE, |
| 1093 | + new TransportPutComposableIndexTemplateAction.Request("test_ds_template").indexTemplate( |
| 1094 | + ComposableIndexTemplate.builder() |
| 1095 | + .indexPatterns(List.of("test_ds_patterns_*")) |
| 1096 | + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) |
| 1097 | + .template( |
| 1098 | + Template.builder() |
| 1099 | + .mappings(new CompressedXContent(""" |
| 1100 | + { |
| 1101 | + "dynamic": false, |
| 1102 | + "properties": { |
| 1103 | + "@timestamp": { |
| 1104 | + "type": "date" |
| 1105 | + }, |
| 1106 | + "count": { |
| 1107 | + "type": "long" |
| 1108 | + } |
| 1109 | + } |
| 1110 | + }""")) |
| 1111 | + .dataStreamOptions( |
| 1112 | + ResettableValue.create( |
| 1113 | + new DataStreamOptions.Template( |
| 1114 | + ResettableValue.create(new DataStreamFailureStore.Template(ResettableValue.create(true))) |
| 1115 | + ) |
| 1116 | + ) |
| 1117 | + ) |
| 1118 | + .build() |
| 1119 | + ) |
| 1120 | + .build() |
| 1121 | + ) |
| 1122 | + ) |
| 1123 | + ); |
| 1124 | + deleteTemplate = true; |
| 1125 | + |
| 1126 | + String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); |
| 1127 | + int i = 0; |
| 1128 | + for (String dsName : dsNames) { |
| 1129 | + BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
| 1130 | + for (String id : Arrays.asList("1", "2", "3", "4", "5")) { |
| 1131 | + bulk.add(createDoc(dsName, id, time, ++i * 1000)); |
| 1132 | + } |
| 1133 | + for (String id : Arrays.asList("6", "7", "8")) { |
| 1134 | + bulk.add(createDoc(dsName, id, time, "garbage")); |
| 1135 | + } |
| 1136 | + BulkResponse bulkItemResponses = bulk.get(); |
| 1137 | + assertThat(bulkItemResponses.hasFailures(), is(false)); |
| 1138 | + deleteDataStreams.add(dsName); |
| 1139 | + ensureYellow(dsName); |
| 1140 | + } |
| 1141 | + |
| 1142 | + for (Map.Entry<String, V> testCase : testCases.entrySet()) { |
| 1143 | + testMethod.accept(testCase.getKey(), testCase.getValue()); |
| 1144 | + } |
| 1145 | + } finally { |
| 1146 | + if (deleteDataStreams.isEmpty() == false) { |
| 1147 | + assertAcked( |
| 1148 | + client().execute( |
| 1149 | + DeleteDataStreamAction.INSTANCE, |
| 1150 | + new DeleteDataStreamAction.Request(new TimeValue(30, TimeUnit.SECONDS), deleteDataStreams.toArray(String[]::new)) |
| 1151 | + ) |
| 1152 | + ); |
| 1153 | + } |
| 1154 | + if (deleteTemplate) { |
| 1155 | + assertAcked( |
| 1156 | + client().execute( |
| 1157 | + TransportDeleteComposableIndexTemplateAction.TYPE, |
| 1158 | + new TransportDeleteComposableIndexTemplateAction.Request("test_ds_template") |
| 1159 | + ) |
| 1160 | + ); |
| 1161 | + } |
| 1162 | + } |
| 1163 | + } |
| 1164 | + |
| 1165 | + private static IndexRequest createDoc(String dsName, String id, String ts, Object count) { |
| 1166 | + return new IndexRequest(dsName).opType(DocWriteRequest.OpType.CREATE).id(id).source("@timestamp", ts, "count", count); |
| 1167 | + } |
| 1168 | + |
973 | 1169 | public void testOverlappingIndexPatterns() throws Exception { |
974 | 1170 | String[] indexNames = { "test_overlapping_index_patterns_1", "test_overlapping_index_patterns_2" }; |
975 | 1171 |
|
|
0 commit comments