Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* This class main focus is to resolve multi-syntax target expressions to resources or concrete indices. This resolution is influenced
Expand Down Expand Up @@ -2126,13 +2127,7 @@ private static <V> V splitSelectorExpression(String expression, BiFunction<Strin
int lastDoubleColon = expression.lastIndexOf(SELECTOR_SEPARATOR);
if (lastDoubleColon >= 0) {
String suffix = expression.substring(lastDoubleColon + SELECTOR_SEPARATOR.length());
IndexComponentSelector selector = IndexComponentSelector.getByKey(suffix);
if (selector == null) {
throw new InvalidIndexNameException(
expression,
"invalid usage of :: separator, [" + suffix + "] is not a recognized selector"
);
}
doValidateSelectorString(() -> expression, suffix);
String expressionBase = expression.substring(0, lastDoubleColon);
ensureNoMoreSelectorSeparators(expressionBase, expression);
return bindFunction.apply(expressionBase, suffix);
Expand All @@ -2141,6 +2136,20 @@ private static <V> V splitSelectorExpression(String expression, BiFunction<Strin
return bindFunction.apply(expression, null);
}

public static void validateIndexSelectorString(String indexName, String suffix) {
doValidateSelectorString(() -> indexName + SELECTOR_SEPARATOR + suffix, suffix);
}

private static void doValidateSelectorString(Supplier<String> expression, String suffix) {
IndexComponentSelector selector = IndexComponentSelector.getByKey(suffix);
if (selector == null) {
throw new InvalidIndexNameException(
expression.get(),
"invalid usage of :: separator, [" + suffix + "] is not a recognized selector"
);
}
}

/**
* Checks the selectors that have been returned from splitting an expression and throws an exception if any were present.
* @param expression Original expression
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugin/esql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ dependencies {
testImplementation project(path: ':modules:analysis-common')
testImplementation project(path: ':modules:ingest-common')
testImplementation project(path: ':modules:legacy-geo')
testImplementation project(path: ':modules:data-streams')
testImplementation project(path: ':modules:mapper-extras')
testImplementation project(xpackModule('esql:compute:test'))
testImplementation('net.nextencia:rrdiagram:0.9.4')
testImplementation('org.webjars.npm:fontsource__roboto-mono:4.5.7')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,43 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.Build;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.datastreams.DeleteDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.ClusterAdminClient;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamFailureStore;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ResettableValue;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.ListMatcher;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -38,11 +55,13 @@
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.junit.Assume;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -58,8 +77,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static java.util.Comparator.comparing;
import static java.util.Comparator.naturalOrder;
Expand Down Expand Up @@ -100,6 +121,11 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(DataStreamsPlugin.class, MapperExtrasPlugin.class)).toList();
}

public void testProjectConstant() {
try (EsqlQueryResponse results = run("from test | eval x = 1 | keep x")) {
assertThat(results.columns(), equalTo(List.of(new ColumnInfoImpl("x", "integer"))));
Expand Down Expand Up @@ -970,6 +996,176 @@ public void testIndexPatterns() throws Exception {
}
}

public void testDataStreamPatterns() throws Exception {
Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled());

Map<String, Long> testCases = new HashMap<>();
// Concrete data stream with each selector
testCases.put("test_ds_patterns_1", 5L);
testCases.put("test_ds_patterns_1::data", 5L);
testCases.put("test_ds_patterns_1::failures", 3L);
testCases.put("test_ds_patterns_2", 5L);
testCases.put("test_ds_patterns_2::data", 5L);
testCases.put("test_ds_patterns_2::failures", 3L);

// Wildcard pattern with each selector
testCases.put("test_ds_patterns*", 15L);
testCases.put("test_ds_patterns*::data", 15L);
testCases.put("test_ds_patterns*::failures", 9L);

// Match all pattern with each selector
testCases.put("*", 15L);
testCases.put("*::data", 15L);
testCases.put("*::failures", 9L);

// Concrete multi-pattern
testCases.put("test_ds_patterns_1,test_ds_patterns_2", 10L);
testCases.put("test_ds_patterns_1::data,test_ds_patterns_2::data", 10L);
testCases.put("test_ds_patterns_1::failures,test_ds_patterns_2::failures", 6L);

// Wildcard multi-pattern
testCases.put("test_ds_patterns_1*,test_ds_patterns_2*", 10L);
testCases.put("test_ds_patterns_1*::data,test_ds_patterns_2*::data", 10L);
testCases.put("test_ds_patterns_1*::failures,test_ds_patterns_2*::failures", 6L);

// Wildcard pattern with data stream exclusions for each selector combination (data stream exclusions need * on the end to negate)
// None (default)
testCases.put("test_ds_patterns*,-test_ds_patterns_2*", 10L);
testCases.put("test_ds_patterns*,-test_ds_patterns_2*::data", 10L);
testCases.put("test_ds_patterns*,-test_ds_patterns_2*::failures", 15L);
// Subtracting from ::data
testCases.put("test_ds_patterns*::data,-test_ds_patterns_2*", 10L);
testCases.put("test_ds_patterns*::data,-test_ds_patterns_2*::data", 10L);
testCases.put("test_ds_patterns*::data,-test_ds_patterns_2*::failures", 15L);
// Subtracting from ::failures
testCases.put("test_ds_patterns*::failures,-test_ds_patterns_2*", 9L);
testCases.put("test_ds_patterns*::failures,-test_ds_patterns_2*::data", 9L);
testCases.put("test_ds_patterns*::failures,-test_ds_patterns_2*::failures", 6L);
// Subtracting from ::*
testCases.put("test_ds_patterns*::data,test_ds_patterns*::failures,-test_ds_patterns_2*", 19L);
testCases.put("test_ds_patterns*::data,test_ds_patterns*::failures,-test_ds_patterns_2*::data", 19L);
testCases.put("test_ds_patterns*::data,test_ds_patterns*::failures,-test_ds_patterns_2*::failures", 21L);

testCases.put("\"test_ds_patterns_1,test_ds_patterns_2\"::failures", 8L);

runDataStreamTest(testCases, new String[] { "test_ds_patterns_1", "test_ds_patterns_2", "test_ds_patterns_3" }, (key, value) -> {
try (var results = run("from " + key + " | stats count(@timestamp)")) {
assertEquals(key, 1, getValuesList(results).size());
assertEquals(key, value, getValuesList(results).get(0).get(0));
}
});
}

public void testDataStreamInvalidPatterns() throws Exception {
Assume.assumeTrue(DataStream.isFailureStoreFeatureFlagEnabled());

Map<String, String> testCases = new HashMap<>();
// === Errors
// Only recognized components can be selected
testCases.put("testXXX::custom", "invalid usage of :: separator, [custom] is not a recognized selector");
// Spelling is important
testCases.put("testXXX::failres", "invalid usage of :: separator, [failres] is not a recognized selector");
// Only the match all wildcard is supported
testCases.put("testXXX::d*ta", "invalid usage of :: separator, [d*ta] is not a recognized selector");
// The first instance of :: is split upon so that you cannot chain the selector
testCases.put("test::XXX::data", "mismatched input '::' expecting {");
// Selectors must be outside of date math expressions or else they trip up the selector parsing
testCases.put("<test-{now/d}::failures>", "Invalid index name [<test-{now/d}], must not contain the following characters [");
// Only one selector separator is allowed per expression
testCases.put("::::data", "mismatched input '::' expecting {");
// Suffix case is not supported because there is no component named with the empty string
testCases.put("index::", "missing {QUOTED_STRING, UNQUOTED_SOURCE} at '|'");

runDataStreamTest(testCases, new String[] { "test_ds_patterns_1" }, (key, value) -> {
logger.info(key);
var exception = expectThrows(ParsingException.class, () -> { run("from " + key + " | stats count(@timestamp)").close(); });
assertThat(exception.getMessage(), containsString(value));
});
}

private <V> void runDataStreamTest(Map<String, V> testCases, String[] dsNames, BiConsumer<String, V> testMethod) throws IOException {
boolean deleteTemplate = false;
List<String> deleteDataStreams = new ArrayList<>();
try {
assertAcked(
client().execute(
TransportPutComposableIndexTemplateAction.TYPE,
new TransportPutComposableIndexTemplateAction.Request("test_ds_template").indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of("test_ds_patterns_*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(
Template.builder()
.mappings(new CompressedXContent("""
{
"dynamic": false,
"properties": {
"@timestamp": {
"type": "date"
},
"count": {
"type": "long"
}
}
}"""))
.dataStreamOptions(
ResettableValue.create(
new DataStreamOptions.Template(
ResettableValue.create(new DataStreamFailureStore.Template(ResettableValue.create(true)))
)
)
)
.build()
)
.build()
)
)
);
deleteTemplate = true;

String time = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(System.currentTimeMillis());
int i = 0;
for (String dsName : dsNames) {
BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (String id : Arrays.asList("1", "2", "3", "4", "5")) {
bulk.add(createDoc(dsName, id, time, ++i * 1000));
}
for (String id : Arrays.asList("6", "7", "8")) {
bulk.add(createDoc(dsName, id, time, "garbage"));
}
BulkResponse bulkItemResponses = bulk.get();
assertThat(bulkItemResponses.hasFailures(), is(false));
deleteDataStreams.add(dsName);
ensureYellow(dsName);
}

for (Map.Entry<String, V> testCase : testCases.entrySet()) {
testMethod.accept(testCase.getKey(), testCase.getValue());
}
} finally {
if (deleteDataStreams.isEmpty() == false) {
assertAcked(
client().execute(
DeleteDataStreamAction.INSTANCE,
new DeleteDataStreamAction.Request(new TimeValue(30, TimeUnit.SECONDS), deleteDataStreams.toArray(String[]::new))
)
);
}
if (deleteTemplate) {
assertAcked(
client().execute(
TransportDeleteComposableIndexTemplateAction.TYPE,
new TransportDeleteComposableIndexTemplateAction.Request("test_ds_template")
)
);
}
}
}

private static IndexRequest createDoc(String dsName, String id, String ts, Object count) {
return new IndexRequest(dsName).opType(DocWriteRequest.OpType.CREATE).id(id).source("@timestamp", ts, "count", count);
}

public void testOverlappingIndexPatterns() throws Exception {
String[] indexNames = { "test_overlapping_index_patterns_1", "test_overlapping_index_patterns_2" };

Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugin/esql/src/main/antlr/EsqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ FROM_PIPE : PIPE -> type(PIPE), popMode;
FROM_OPENING_BRACKET : OPENING_BRACKET -> type(OPENING_BRACKET);
FROM_CLOSING_BRACKET : CLOSING_BRACKET -> type(CLOSING_BRACKET);
FROM_COLON : COLON -> type(COLON);
FROM_SELECTOR : {this.isDevVersion()}? CAST_OP -> type(CAST_OP);
FROM_COMMA : COMMA -> type(COMMA);
FROM_ASSIGN : ASSIGN -> type(ASSIGN);
METADATA : 'metadata';
Expand Down Expand Up @@ -622,6 +623,10 @@ CLOSING_METRICS_COLON
: COLON -> type(COLON), popMode, pushMode(METRICS_MODE)
;

CLOSING_METRICS_SELECTOR
: CAST_OP -> type(CAST_OP), popMode, pushMode(METRICS_MODE)
;

CLOSING_METRICS_COMMA
: COMMA -> type(COMMA), popMode, pushMode(METRICS_MODE)
;
Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugin/esql/src/main/antlr/EsqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,19 @@ fromCommand

indexPattern
: (clusterString COLON)? indexString
| {this.isDevVersion()}? indexString (CAST_OP selectorString)?
;

clusterString
: UNQUOTED_SOURCE
| QUOTED_STRING
;

selectorString
: UNQUOTED_SOURCE
| QUOTED_STRING
;

indexString
: UNQUOTED_SOURCE
| QUOTED_STRING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.Build;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.util.FeatureFlag;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.rest.action.admin.cluster.RestNodesCapabilitiesAction;
Expand Down Expand Up @@ -761,7 +762,12 @@ public enum Cap {
/**
* Make numberOfChannels consistent with layout in DefaultLayout by removing duplicated ChannelSet.
*/
MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT;
MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT,

/**
* Index component selector syntax (my-data-stream-name::failures)
*/
INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled());

private final boolean enabled;

Expand Down

Large diffs are not rendered by default.

Loading