Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.TestFeatureService;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase;
import org.junit.After;
import org.junit.Before;
Expand All @@ -29,6 +30,7 @@
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -39,9 +41,12 @@
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.xpack.esql.ccq.Clusters.REMOTE_CLUSTER_NAME;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class MultiClustersIT extends ESRestTestCase {
Expand Down Expand Up @@ -129,7 +134,7 @@ void indexDocs(RestClient client, String index, List<Doc> docs) throws IOExcepti
}

private Map<String, Object> run(String query, boolean includeCCSMetadata) throws IOException {
var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query);
var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query).profile(true);
if (includeCCSMetadata) {
queryBuilder.includeCCSMetadata(true);
}
Expand Down Expand Up @@ -158,12 +163,42 @@ private Map<String, Object> runEsql(RestEsqlTestCase.RequestObjectBuilder reques
}
}

private <C, V> void assertResultMapForLike(
boolean includeCCSMetadata,
Map<String, Object> result,
C columns,
V values,
boolean remoteOnly,
boolean requireLikeListCapability
) throws IOException {
List<String> requiredCapabilities = new ArrayList<>(List.of("like_on_index_fields"));
if (requireLikeListCapability) {
requiredCapabilities.add("like_list_on_index_fields");
}
// the feature is completely supported if both local and remote clusters support it
boolean isSupported = clusterHasCapability("POST", "/_query", List.of(), requiredCapabilities).orElse(false);
try (RestClient remoteClient = remoteClusterClient()) {
isSupported = isSupported
&& clusterHasCapability(remoteClient, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
}

if (isSupported) {
assertResultMap(includeCCSMetadata, result, columns, values, remoteOnly);
} else {
logger.info("--> skipping data check for like index test, cluster does not support like index feature");
// just verify that we did not get a partial result
var clusters = result.get("_clusters");
var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "");
assertThat(reason, result.get("is_partial"), anyOf(nullValue(), is(false)));
}
}

private <C, V> void assertResultMap(boolean includeCCSMetadata, Map<String, Object> result, C columns, V values, boolean remoteOnly) {
MapMatcher mapMatcher = getResultMatcher(
ccsMetadataAvailable(),
result.containsKey("is_partial"),
result.containsKey("documents_found")
);
).extraOk();
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
Expand Down Expand Up @@ -254,7 +289,7 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO
assertThat(remoteClusterShards.keySet(), equalTo(Set.of("total", "successful", "skipped", "failed")));
assertThat((Integer) remoteClusterShards.get("total"), greaterThanOrEqualTo(0));
assertThat((Integer) remoteClusterShards.get("successful"), equalTo((Integer) remoteClusterShards.get("total")));
assertThat((Integer) remoteClusterShards.get("skipped"), equalTo(0));
// assertThat((Integer) remoteClusterShards.get("skipped"), equalTo(0));
assertThat((Integer) remoteClusterShards.get("failed"), equalTo(0));

if (remoteOnly == false) {
Expand All @@ -270,7 +305,7 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO
assertThat(localClusterShards.keySet(), equalTo(Set.of("total", "successful", "skipped", "failed")));
assertThat((Integer) localClusterShards.get("total"), greaterThanOrEqualTo(0));
assertThat((Integer) localClusterShards.get("successful"), equalTo((Integer) localClusterShards.get("total")));
assertThat((Integer) localClusterShards.get("skipped"), equalTo(0));
// assertThat((Integer) localClusterShards.get("skipped"), equalTo(0));
assertThat((Integer) localClusterShards.get("failed"), equalTo(0));
}
}
Expand Down Expand Up @@ -371,6 +406,93 @@ public void testStats() throws IOException {
assertThat(clusterData, hasKey("took"));
}

public void testLikeIndex() throws Exception {

boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index LIKE "*remote*"
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(remoteDocs.size(), REMOTE_CLUSTER_NAME + ":" + remoteIndex));
String resultString = Strings.toString(JsonXContent.contentBuilder().prettyPrint().map(result));
logger.info(resultString);
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
}

public void testNotLikeIndex() throws Exception {
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index NOT LIKE "*remote*"
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(localDocs.size(), localIndex));
String resultString = Strings.toString(JsonXContent.contentBuilder().prettyPrint().map(result));
logger.info(resultString);
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
}

public void testLikeListIndex() throws Exception {
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index LIKE ("*remote*", "not-exist*")
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(remoteDocs.size(), REMOTE_CLUSTER_NAME + ":" + remoteIndex));
String resultString = Strings.toString(JsonXContent.contentBuilder().prettyPrint().map(result));
logger.info(resultString);
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, true);
}

public void testNotLikeListIndex() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testNotLikeListIndex will fail some of the UTs, please see the latest from my PR for fix.
I just disabled this test if it does not have like_list_on_index_fields

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please copy the latest version of this file from #130019 to get all the latest fixes on the tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha

boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index NOT LIKE ("*remote*", "not-exist*")
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(localDocs.size(), localIndex));
String resultString = Strings.toString(JsonXContent.contentBuilder().prettyPrint().map(result));
logger.info(resultString);
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, true);
}

public void testRLikeIndex() throws Exception {
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index RLIKE ".*remote.*"
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(remoteDocs.size(), REMOTE_CLUSTER_NAME + ":" + remoteIndex));
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
}

public void testNotRLikeIndex() throws Exception {
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index NOT RLIKE ".*remote.*"
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(localDocs.size(), localIndex));
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
}

private RestClient remoteClusterClient() throws IOException {
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ protected WildcardLikeList replaceChild(Expression newLeft) {
*/
@Override
public Translatable translatable(LucenePushdownPredicates pushdownPredicates) {
return pushdownPredicates.isPushableAttribute(field()) ? Translatable.YES : Translatable.NO;

if (pushdownPredicates.minTransportVersion() == null) {
return pushdownPredicates.isPushableAttribute(field()) ? Translatable.YES : Translatable.NO;
} else {
// The AutomatonQuery that we use right now isn't serializable.
return Translatable.NO;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
Expand All @@ -30,6 +33,24 @@
* </ol>
*/
public interface LucenePushdownPredicates {
/**
* If we're extracting a query for {@code can_match} then this is the
* minimum transport version in the cluster. Otherwise, this is {@code null}.
* <p>
* If this is not null {@link Expression}s should not claim to be
* serializable unless their {@link QueryBuilder}
* {@link QueryBuilder#supportsVersion supports} the version.
* </p>
* <p>
* This is done on the coordinating node <strong>and</strong>. And for
* cross cluster search this is done on the coordinating node on the
* remote cluster. So! We actually <strong>have</strong> the minimum
* cluster transport version.
* </p>
*/
@Nullable
TransportVersion minTransportVersion();

/**
* For TEXT fields, we need to check if the field has a subfield of type KEYWORD that can be used instead.
*/
Expand Down Expand Up @@ -101,29 +122,41 @@ static String pushableAttributeName(TypedAttribute attribute) {
* In particular, it assumes TEXT fields have no exact subfields (underlying keyword field),
* and that isAggregatable means indexed and has hasDocValues.
*/
LucenePushdownPredicates DEFAULT = new LucenePushdownPredicates() {
@Override
public boolean hasExactSubfield(FieldAttribute attr) {
return false;
}
LucenePushdownPredicates DEFAULT = forCanMatch(null);

@Override
public boolean isIndexedAndHasDocValues(FieldAttribute attr) {
// Is the FieldType.isAggregatable() check correct here? In FieldType isAggregatable usually only means hasDocValues
return attr.field().isAggregatable();
}
/**
* A {@link LucenePushdownPredicates} for use with the {@code can_match} phase.
*/
static LucenePushdownPredicates forCanMatch(TransportVersion minTransportVersion) {
return new LucenePushdownPredicates() {
@Override
public TransportVersion minTransportVersion() {
return minTransportVersion;
}

@Override
public boolean isIndexed(FieldAttribute attr) {
// TODO: This is the original behaviour, but is it correct? In FieldType isAggregatable usually only means hasDocValues
return attr.field().isAggregatable();
}
@Override
public boolean hasExactSubfield(FieldAttribute attr) {
return false;
}

@Override
public boolean canUseEqualityOnSyntheticSourceDelegate(FieldAttribute attr, String value) {
return false;
}
};
@Override
public boolean isIndexedAndHasDocValues(FieldAttribute attr) {
// Is the FieldType.isAggregatable() check correct here? In FieldType isAggregatable usually only means hasDocValues
return attr.field().isAggregatable();
}

@Override
public boolean isIndexed(FieldAttribute attr) {
// TODO: This is the original behaviour, but is it correct? In FieldType isAggregatable usually only means hasDocValues
return attr.field().isAggregatable();
}

@Override
public boolean canUseEqualityOnSyntheticSourceDelegate(FieldAttribute attr, String value) {
return false;
}
};
}

/**
* If we have access to {@link SearchStats} over a collection of shards, we can make more fine-grained decisions about what can be
Expand All @@ -133,6 +166,11 @@ static LucenePushdownPredicates from(SearchStats stats) {
// TODO: use FieldAttribute#fieldName, otherwise this doesn't apply to field attributes used for union types.
// C.f. https://github.com/elastic/elasticsearch/issues/128905
return new LucenePushdownPredicates() {
@Override
public TransportVersion minTransportVersion() {
return null;
}

@Override
public boolean hasExactSubfield(FieldAttribute attr) {
return stats.hasExactSubfield(new FieldAttribute.FieldName(attr.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.planner;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
Expand Down Expand Up @@ -212,20 +213,23 @@ public static PhysicalPlan localPlan(
/**
* Extracts a filter that can be used to skip unmatched shards on the coordinator.
*/
public static QueryBuilder canMatchFilter(PhysicalPlan plan) {
return detectFilter(plan, CoordinatorRewriteContext.SUPPORTED_FIELDS::contains);
public static QueryBuilder canMatchFilter(TransportVersion minTransportVersion, PhysicalPlan plan) {
return detectFilter(minTransportVersion, plan, CoordinatorRewriteContext.SUPPORTED_FIELDS::contains);
}

/**
* Note that since this filter does not have access to SearchStats, it cannot detect if the field is a text field with a delegate.
* We currently only use this filter for the @timestamp field, which is always a date field. Any tests that wish to use this should
* take care to not use it with TEXT fields.
*/
static QueryBuilder detectFilter(PhysicalPlan plan, Predicate<String> fieldName) {
static QueryBuilder detectFilter(TransportVersion minTransportVersion, PhysicalPlan plan, Predicate<String> fieldName) {
// first position is the REST filter, the second the query filter
final List<QueryBuilder> requestFilters = new ArrayList<>();
final LucenePushdownPredicates ctx = LucenePushdownPredicates.forCanMatch(minTransportVersion);
plan.forEachDown(FragmentExec.class, fe -> {
requestFilters.add(fe.esFilter());
if (fe.esFilter() != null && fe.esFilter().supportsVersion(minTransportVersion)) {
requestFilters.add(fe.esFilter());
}
// detect filter inside the query
fe.fragment().forEachUp(Filter.class, f -> {
// the only filter that can be pushed down is that on top of the relation
Expand All @@ -243,15 +247,13 @@ static QueryBuilder detectFilter(PhysicalPlan plan, Predicate<String> fieldName)
// and the expression is pushable (functions can be fully translated)
if (matchesField
&& refsBuilder.isEmpty()
&& translatable(exp, LucenePushdownPredicates.DEFAULT).finish() == TranslationAware.FinishedTranslatable.YES) {
&& translatable(exp, ctx).finish() == TranslationAware.FinishedTranslatable.YES) {
matches.add(exp);
}
}
}
if (matches.isEmpty() == false) {
requestFilters.add(
TRANSLATOR_HANDLER.asQuery(LucenePushdownPredicates.DEFAULT, Predicates.combineAnd(matches)).toQueryBuilder()
);
requestFilters.add(TRANSLATOR_HANDLER.asQuery(ctx, Predicates.combineAnd(matches)).toQueryBuilder());
}
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void startComputeOnDataNodes(
esqlExecutor,
parentTask,
originalIndices,
PlannerUtils.canMatchFilter(dataNodePlan),
PlannerUtils.canMatchFilter(clusterService.state().getMinTransportVersion(), dataNodePlan),
clusterAlias,
configuration.allowPartialResults(),
maxConcurrentNodesPerCluster == null ? -1 : maxConcurrentNodesPerCluster,
Expand Down
Loading