|
8 | 8 | package org.elasticsearch.xpack.esql.action; |
9 | 9 |
|
10 | 10 | import org.elasticsearch.Build; |
| 11 | +import org.elasticsearch.action.DocWriteRequest; |
11 | 12 | import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; |
12 | 13 | import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; |
13 | 14 | import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; |
| 15 | +import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteComposableIndexTemplateAction; |
| 16 | +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; |
14 | 17 | import org.elasticsearch.action.bulk.BulkRequestBuilder; |
| 18 | +import org.elasticsearch.action.bulk.BulkResponse; |
| 19 | +import org.elasticsearch.action.datastreams.DeleteDataStreamAction; |
15 | 20 | import org.elasticsearch.action.index.IndexRequest; |
16 | 21 | import org.elasticsearch.action.index.IndexRequestBuilder; |
17 | 22 | import org.elasticsearch.action.support.WriteRequest; |
18 | 23 | import org.elasticsearch.client.internal.ClusterAdminClient; |
| 24 | +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; |
| 25 | +import org.elasticsearch.cluster.metadata.DataStream; |
| 26 | +import org.elasticsearch.cluster.metadata.DataStreamFailureStore; |
| 27 | +import org.elasticsearch.cluster.metadata.DataStreamOptions; |
19 | 28 | import org.elasticsearch.cluster.metadata.IndexMetadata; |
| 29 | +import org.elasticsearch.cluster.metadata.ResettableValue; |
| 30 | +import org.elasticsearch.cluster.metadata.Template; |
20 | 31 | import org.elasticsearch.cluster.node.DiscoveryNode; |
21 | 32 | import org.elasticsearch.common.collect.Iterators; |
| 33 | +import org.elasticsearch.common.compress.CompressedXContent; |
22 | 34 | import org.elasticsearch.common.settings.Setting; |
23 | 35 | import org.elasticsearch.common.settings.Settings; |
| 36 | +import org.elasticsearch.core.TimeValue; |
| 37 | +import org.elasticsearch.datastreams.DataStreamsPlugin; |
24 | 38 | import org.elasticsearch.index.Index; |
25 | 39 | import org.elasticsearch.index.IndexService; |
26 | 40 | import org.elasticsearch.index.IndexSettings; |
| 41 | +import org.elasticsearch.index.mapper.DateFieldMapper; |
| 42 | +import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; |
27 | 43 | import org.elasticsearch.index.query.QueryBuilder; |
28 | 44 | import org.elasticsearch.index.query.RangeQueryBuilder; |
29 | 45 | import org.elasticsearch.index.shard.IndexShard; |
30 | 46 | import org.elasticsearch.indices.IndicesService; |
| 47 | +import org.elasticsearch.plugins.Plugin; |
31 | 48 | import org.elasticsearch.test.ESTestCase; |
32 | 49 | import org.elasticsearch.test.ListMatcher; |
33 | 50 | import org.elasticsearch.xcontent.XContentBuilder; |
|
38 | 55 | import org.elasticsearch.xpack.esql.parser.ParsingException; |
39 | 56 | import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; |
40 | 57 | import org.elasticsearch.xpack.esql.plugin.QueryPragmas; |
| 58 | +import org.junit.Assume; |
41 | 59 | import org.junit.Before; |
42 | 60 |
|
43 | 61 | import java.io.IOException; |
44 | 62 | import java.util.ArrayList; |
45 | 63 | import java.util.Arrays; |
| 64 | +import java.util.Collection; |
46 | 65 | import java.util.Collections; |
47 | 66 | import java.util.Comparator; |
48 | 67 | import java.util.HashMap; |
|
58 | 77 | import java.util.concurrent.TimeUnit; |
59 | 78 | import java.util.concurrent.atomic.AtomicBoolean; |
60 | 79 | import java.util.concurrent.atomic.AtomicLong; |
| 80 | +import java.util.function.BiConsumer; |
61 | 81 | import java.util.stream.IntStream; |
62 | 82 | import java.util.stream.LongStream; |
| 83 | +import java.util.stream.Stream; |
63 | 84 |
|
64 | 85 | import static java.util.Comparator.comparing; |
65 | 86 | import static java.util.Comparator.naturalOrder; |
@@ -100,6 +121,11 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { |
100 | 121 | .build(); |
101 | 122 | } |
102 | 123 |
|
| 124 | + @Override |
| 125 | + protected Collection<Class<? extends Plugin>> nodePlugins() { |
| 126 | + return Stream.concat(super.nodePlugins().stream(), Stream.of(DataStreamsPlugin.class, MapperExtrasPlugin.class)).toList(); |
| 127 | + } |
| 128 | + |
103 | 129 | public void testProjectConstant() { |
104 | 130 | try (EsqlQueryResponse results = run("from test | eval x = 1 | keep x")) { |
105 | 131 | assertThat(results.columns(), equalTo(List.of(new ColumnInfoImpl("x", "integer")))); |
@@ -992,6 +1018,176 @@ public void testIndexPatterns() throws Exception { |
992 | 1018 | } |
993 | 1019 | } |
994 | 1020 |
|
| 1021 | + public void testDataStreamPatterns() throws Exception { |
| 1022 | + Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled()); |
| 1023 | + |
| 1024 | + Map<String, Long> testCases = new HashMap<>(); |
| 1025 | + // Concrete data stream with each selector |
| 1026 | + testCases.put("test_ds_patterns_1", 5L); |
| 1027 | + testCases.put("test_ds_patterns_1::data", 5L); |
| 1028 | + testCases.put("test_ds_patterns_1::failures", 3L); |
| 1029 | + testCases.put("test_ds_patterns_2", 5L); |
| 1030 | + testCases.put("test_ds_patterns_2::data", 5L); |
| 1031 | + testCases.put("test_ds_patterns_2::failures", 3L); |
| 1032 | + |
| 1033 | + // Wildcard pattern with each selector |
| 1034 | + testCases.put("test_ds_patterns*", 15L); |
| 1035 | + testCases.put("test_ds_patterns*::data", 15L); |
| 1036 | + testCases.put("test_ds_patterns*::failures", 9L); |
| 1037 | + |
| 1038 | + // Match all pattern with each selector |
| 1039 | + testCases.put("*", 15L); |
| 1040 | + testCases.put("*::data", 15L); |
| 1041 | + testCases.put("*::failures", 9L); |
| 1042 | + |
| 1043 | + // Concrete multi-pattern |
| 1044 | + testCases.put("test_ds_patterns_1,test_ds_patterns_2", 10L); |
| 1045 | + testCases.put("test_ds_patterns_1::data,test_ds_patterns_2::data", 10L); |
| 1046 | + testCases.put("test_ds_patterns_1::failures,test_ds_patterns_2::failures", 6L); |
| 1047 | + |
| 1048 | + // Wildcard multi-pattern |
| 1049 | + testCases.put("test_ds_patterns_1*,test_ds_patterns_2*", 10L); |
| 1050 | + testCases.put("test_ds_patterns_1*::data,test_ds_patterns_2*::data", 10L); |
| 1051 | + testCases.put("test_ds_patterns_1*::failures,test_ds_patterns_2*::failures", 6L); |
| 1052 | + |
| 1053 | + // Wildcard pattern with data stream exclusions for each selector combination (data stream exclusions need * on the end to negate) |
| 1054 | + // None (default) |
| 1055 | + testCases.put("test_ds_patterns*,-test_ds_patterns_2*", 10L); |
| 1056 | + testCases.put("test_ds_patterns*,-test_ds_patterns_2*::data", 10L); |
| 1057 | + testCases.put("test_ds_patterns*,-test_ds_patterns_2*::failures", 15L); |
| 1058 | + // Subtracting from ::data |
| 1059 | + testCases.put("test_ds_patterns*::data,-test_ds_patterns_2*", 10L); |
| 1060 | + testCases.put("test_ds_patterns*::data,-test_ds_patterns_2*::data", 10L); |
| 1061 | + testCases.put("test_ds_patterns*::data,-test_ds_patterns_2*::failures", 15L); |
| 1062 | + // Subtracting from ::failures |
| 1063 | + testCases.put("test_ds_patterns*::failures,-test_ds_patterns_2*", 9L); |
| 1064 | + testCases.put("test_ds_patterns*::failures,-test_ds_patterns_2*::data", 9L); |
| 1065 | + testCases.put("test_ds_patterns*::failures,-test_ds_patterns_2*::failures", 6L); |
| 1066 | + // Subtracting from ::* |
| 1067 | + testCases.put("test_ds_patterns*::data,test_ds_patterns*::failures,-test_ds_patterns_2*", 19L); |
| 1068 | + testCases.put("test_ds_patterns*::data,test_ds_patterns*::failures,-test_ds_patterns_2*::data", 19L); |
| 1069 | + testCases.put("test_ds_patterns*::data,test_ds_patterns*::failures,-test_ds_patterns_2*::failures", 21L); |
| 1070 | + |
| 1071 | + testCases.put("\"test_ds_patterns_1,test_ds_patterns_2\"::failures", 8L); |
| 1072 | + |
| 1073 | + runDataStreamTest(testCases, new String[] { "test_ds_patterns_1", "test_ds_patterns_2", "test_ds_patterns_3" }, (key, value) -> { |
| 1074 | + try (var results = run("from " + key + " | stats count(@timestamp)")) { |
| 1075 | + assertEquals(key, 1, getValuesList(results).size()); |
| 1076 | + assertEquals(key, value, getValuesList(results).get(0).get(0)); |
| 1077 | + } |
| 1078 | + }); |
| 1079 | + } |
| 1080 | + |
| 1081 | + public void testDataStreamInvalidPatterns() throws Exception { |
| 1082 | + Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled()); |
| 1083 | + |
| 1084 | + Map<String, String> testCases = new HashMap<>(); |
| 1085 | + // === Errors |
| 1086 | + // Only recognized components can be selected |
| 1087 | + testCases.put("testXXX::custom", "invalid usage of :: separator, [custom] is not a recognized selector"); |
| 1088 | + // Spelling is important |
| 1089 | + testCases.put("testXXX::failres", "invalid usage of :: separator, [failres] is not a recognized selector"); |
| 1090 | + // Only the match all wildcard is supported |
| 1091 | + testCases.put("testXXX::d*ta", "invalid usage of :: separator, [d*ta] is not a recognized selector"); |
| 1092 | + // The first instance of :: is split upon so that you cannot chain the selector |
| 1093 | + testCases.put("test::XXX::data", "mismatched input '::' expecting {<EOF>, '|', ',', 'metadata'}"); |
| 1094 | + // Selectors must be outside of date math expressions or else they trip up the selector parsing |
| 1095 | + testCases.put("<test-{now/d}::failures>", "Invalid index name [<test-{now/d}], must not contain the following characters ["); |
| 1096 | + // Only one selector separator is allowed per expression |
| 1097 | + testCases.put("::::data", "mismatched input '::' expecting {QUOTED_STRING, UNQUOTED_SOURCE}"); |
| 1098 | + // Suffix case is not supported because there is no component named with the empty string |
| 1099 | + testCases.put("index::", "missing {QUOTED_STRING, UNQUOTED_SOURCE} at '|'"); |
| 1100 | + |
| 1101 | + runDataStreamTest(testCases, new String[] { "test_ds_patterns_1" }, (key, value) -> { |
| 1102 | + logger.info(key); |
| 1103 | + var exception = expectThrows(ParsingException.class, () -> { run("from " + key + " | stats count(@timestamp)").close(); }); |
| 1104 | + assertThat(exception.getMessage(), containsString(value)); |
| 1105 | + }); |
| 1106 | + } |
| 1107 | + |
| 1108 | + private <V> void runDataStreamTest(Map<String, V> testCases, String[] dsNames, BiConsumer<String, V> testMethod) throws IOException { |
| 1109 | + boolean deleteTemplate = false; |
| 1110 | + List<String> deleteDataStreams = new ArrayList<>(); |
| 1111 | + try { |
| 1112 | + assertAcked( |
| 1113 | + client().execute( |
| 1114 | + TransportPutComposableIndexTemplateAction.TYPE, |
| 1115 | + new TransportPutComposableIndexTemplateAction.Request("test_ds_template").indexTemplate( |
| 1116 | + ComposableIndexTemplate.builder() |
| 1117 | + .indexPatterns(List.of("test_ds_patterns_*")) |
| 1118 | + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate()) |
| 1119 | + .template( |
| 1120 | + Template.builder() |
| 1121 | + .mappings(new CompressedXContent(""" |
| 1122 | + { |
| 1123 | + "dynamic": false, |
| 1124 | + "properties": { |
| 1125 | + "@timestamp": { |
| 1126 | + "type": "date" |
| 1127 | + }, |
| 1128 | + "count": { |
| 1129 | + "type": "long" |
| 1130 | + } |
| 1131 | + } |
| 1132 | + }""")) |
| 1133 | + .dataStreamOptions( |
| 1134 | + ResettableValue.create( |
| 1135 | + new DataStreamOptions.Template( |
| 1136 | + ResettableValue.create(new DataStreamFailureStore.Template(ResettableValue.create(true))) |
| 1137 | + ) |
| 1138 | + ) |
| 1139 | + ) |
| 1140 | + .build() |
| 1141 | + ) |
| 1142 | + .build() |
| 1143 | + ) |
| 1144 | + ) |
| 1145 | + ); |
| 1146 | + deleteTemplate = true; |
| 1147 | + |
| 1148 | + String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis()); |
| 1149 | + int i = 0; |
| 1150 | + for (String dsName : dsNames) { |
| 1151 | + BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
| 1152 | + for (String id : Arrays.asList("1", "2", "3", "4", "5")) { |
| 1153 | + bulk.add(createDoc(dsName, id, time, ++i * 1000)); |
| 1154 | + } |
| 1155 | + for (String id : Arrays.asList("6", "7", "8")) { |
| 1156 | + bulk.add(createDoc(dsName, id, time, "garbage")); |
| 1157 | + } |
| 1158 | + BulkResponse bulkItemResponses = bulk.get(); |
| 1159 | + assertThat(bulkItemResponses.hasFailures(), is(false)); |
| 1160 | + deleteDataStreams.add(dsName); |
| 1161 | + ensureYellow(dsName); |
| 1162 | + } |
| 1163 | + |
| 1164 | + for (Map.Entry<String, V> testCase : testCases.entrySet()) { |
| 1165 | + testMethod.accept(testCase.getKey(), testCase.getValue()); |
| 1166 | + } |
| 1167 | + } finally { |
| 1168 | + if (deleteDataStreams.isEmpty() == false) { |
| 1169 | + assertAcked( |
| 1170 | + client().execute( |
| 1171 | + DeleteDataStreamAction.INSTANCE, |
| 1172 | + new DeleteDataStreamAction.Request(new TimeValue(30, TimeUnit.SECONDS), deleteDataStreams.toArray(String[]::new)) |
| 1173 | + ) |
| 1174 | + ); |
| 1175 | + } |
| 1176 | + if (deleteTemplate) { |
| 1177 | + assertAcked( |
| 1178 | + client().execute( |
| 1179 | + TransportDeleteComposableIndexTemplateAction.TYPE, |
| 1180 | + new TransportDeleteComposableIndexTemplateAction.Request("test_ds_template") |
| 1181 | + ) |
| 1182 | + ); |
| 1183 | + } |
| 1184 | + } |
| 1185 | + } |
| 1186 | + |
| 1187 | + private static IndexRequest createDoc(String dsName, String id, String ts, Object count) { |
| 1188 | + return new IndexRequest(dsName).opType(DocWriteRequest.OpType.CREATE).id(id).source("@timestamp", ts, "count", count); |
| 1189 | + } |
| 1190 | + |
995 | 1191 | public void testOverlappingIndexPatterns() throws Exception { |
996 | 1192 | String[] indexNames = { "test_overlapping_index_patterns_1", "test_overlapping_index_patterns_2" }; |
997 | 1193 |
|
|
0 commit comments