Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.jar.JarInputStream;
import java.util.zip.ZipEntry;

Expand Down Expand Up @@ -548,23 +549,17 @@ public static List<List<Object>> getValuesList(EsqlQueryResponse results) {
}

public static List<List<Object>> getValuesList(Iterator<Iterator<Object>> values) {
var valuesList = new ArrayList<List<Object>>();
values.forEachRemaining(row -> {
var rowValues = new ArrayList<>();
row.forEachRemaining(rowValues::add);
valuesList.add(rowValues);
});
return valuesList;
return toList(values, row -> toList(row, Function.identity()));
}

public static List<List<Object>> getValuesList(Iterable<Iterable<Object>> values) {
var valuesList = new ArrayList<List<Object>>();
values.iterator().forEachRemaining(row -> {
var rowValues = new ArrayList<>();
row.iterator().forEachRemaining(rowValues::add);
valuesList.add(rowValues);
});
return valuesList;
return toList(values.iterator(), row -> toList(row.iterator(), Function.identity()));
}

private static <E, T> List<T> toList(Iterator<E> iterable, Function<E, T> transformer) {
var list = new ArrayList<T>();
iterable.forEachRemaining(e -> list.add(transformer.apply(e)));
return list;
}

public static List<String> withDefaultLimitWarning(List<String> warnings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ public void testLimitThenEnrichRemote() {
FROM *:events,events
| LIMIT 25
| eval ip= TO_STR(host)
| %s | KEEP host, timestamp, user, os
| %s
| KEEP host, timestamp, user, os
""", enrichHosts(Enrich.Mode.REMOTE));
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
var values = getValuesList(resp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.AbstractTransportRequest;
Expand All @@ -39,21 +40,24 @@
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.core.util.StringUtils;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.index.MappingException;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
import org.elasticsearch.xpack.esql.session.IndexResolver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -110,22 +114,37 @@ public static UnresolvedPolicy from(Enrich e) {
/**
* Resolves a set of enrich policies
*
* @param enriches the unresolved policies
* @param preAnalysis to retrieve indices and enriches to resolve
* @param requestFilter to resolve target clusters
* @param executionInfo the execution info
* @param listener notified with the enrich resolution
*/
public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionInfo, ActionListener<EnrichResolution> listener) {
if (enriches.isEmpty()) {
public void resolvePolicies(
PreAnalyzer.PreAnalysis preAnalysis,
QueryBuilder requestFilter,
EsqlExecutionInfo executionInfo,
ActionListener<EnrichResolution> listener
) {
if (preAnalysis.enriches.isEmpty()) {
listener.onResponse(new EnrichResolution());
return;
}

doResolvePolicies(
new HashSet<>(executionInfo.getClusters().keySet()),
enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(),
executionInfo,
listener
);
doResolveRemotes(preAnalysis.indices, requestFilter, listener.delegateFailureAndWrap((l, remotes) -> {
doResolvePolicies(remotes, preAnalysis.enriches.stream().map(UnresolvedPolicy::from).toList(), executionInfo, l);
}));
}

private void doResolveRemotes(List<IndexPattern> indexPatterns, QueryBuilder requestFilter, ActionListener<Set<String>> listener) {
switch (indexPatterns.size()) {
case 0 -> listener.onResponse(Set.of());
case 1 -> indexResolver.resolveConcreteIndices(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the index pattern also contains a cluster/project prefix, will this still be correct?

eg.

FROM remote1:idx1,remote2:idx2
| STATS max(a) by b
| ENRICH policy on a, b

The enrich can only be calculated on the coordinator (after the STATS reduction), but I guess the enrich index on the coordinator won't be resolved. I'm not sure it works at all today tbh, maybe worth checking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the index pattern also contains a cluster/project prefix, will this still be correct?

Yes, this is a valid situation, where all supplied indices are correct specific indices (not aliases nor patterns) and exist.
In such cases we would resolve to exact same 2 identifiers and would derive 2 remotes from them: remote1 and remote2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before merging, I'd just prefer to have a test for this query and make sure we are not introducing a regression.
Apart from that, I think we can proceed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For LOOKUP JOINs we have a specific check: we just don't allow it.

I'm checking ENRICH now, but I couldn't find similar validation for it in the codebase, so I guess it's being treated differently...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is behaving the same way. It looks like we have a similar test for enrich:

public void testAggThenEnrichRemote() {
String query = String.format(Locale.ROOT, """
FROM *:events,events
| eval ip= TO_STR(host)
| %s
| stats c = COUNT(*) by os
| %s
| sort vendor
""", enrichHosts(Enrich.Mode.ANY), enrichVendors(Enrich.Mode.REMOTE));
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [stats c = COUNT(*) by os]@4:3"));
}
public void testEnrichCoordinatorThenEnrichRemote() {
String query = String.format(Locale.ROOT, """
FROM *:events,events
| eval ip= TO_STR(host)
| %s
| %s
| sort vendor
""", enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.REMOTE));
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after [ENRICH _COORDINATOR"));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@idegtiarenko I added a test in main for this case, see #134199, and it seems to work fine.
I think it makes sense to run it on this PR and see if the results are the same.

indexPatterns.getFirst().indexPattern(),
requestFilter,
listener.map(EsqlCCSUtils::getRemotesOf)
);
default -> listener.onFailure(new MappingException("Queries with multiple indices are not supported"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is OK for now, but it will change when we start supporting subqueries and proper JOINs.

I'm not sure how it will actually work, maybe the enriches in a subquery will have to consider the remotes from the main indices in the subquery itself.

Anyway, I'd add a // TODO here, so that we don't forget.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the error message is a bit misleading, it took me a couple of seconds to realize that we were referring to multiple FROM, not to multiple indices. I know it comes from a similar check in EsqlSession, probably we should change both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied it from

listener.onFailure(new MappingException("Queries with multiple indices are not supported"));

but that is a good call.

This is OK for now, but it will change when we start supporting subqueries and proper JOINs.

I suspect we would still need a distinction between main/top-level from and all the nested ones.
I will think about a way to improve it (across all usages)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it correctly, latest CI run (that is green) includes my tests.
All good then 🎉

}
}

protected void doResolvePolicies(
Expand Down Expand Up @@ -442,7 +461,7 @@ public void messageReceived(LookupRequest request, TransportChannel channel, Tas
if (p == null) {
continue;
}
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
try (var ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
String indexName = EnrichPolicy.getBaseName(policyName);
indexResolver.resolveAsMergedMapping(indexName, IndexResolver.ALL_FIELDS, null, false, refs.acquire(indexResult -> {
if (indexResult.isValid() && indexResult.get().concreteIndices().size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toSet;

public class EsqlCCSUtils {

Expand Down Expand Up @@ -206,7 +207,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
// NOTE: we assume that updateExecutionInfoWithUnavailableClusters() was already run and took care of unavailable clusters.
final Set<String> clustersWithNoMatchingIndices = executionInfo.getClusterStates(Cluster.Status.RUNNING)
.map(Cluster::getClusterAlias)
.collect(Collectors.toSet());
.collect(toSet());
for (String indexName : indexResolution.resolvedIndices()) {
clustersWithNoMatchingIndices.remove(RemoteClusterAware.parseClusterAlias(indexName));
}
Expand Down Expand Up @@ -414,4 +415,8 @@ public static String inClusterName(String clusterAlias) {
return "in remote cluster [" + clusterAlias + "]";
}
}

public static Set<String> getRemotesOf(Set<String> concreteIndices) {
return concreteIndices.stream().map(RemoteClusterAware::parseClusterAlias).collect(toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ public void analyzedPlan(
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indices, executionInfo);

SubscribableListener. //
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches, executionInfo, l))
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis, requestFilter, executionInfo, l))
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
.<PreAnalysisResult>andThen((l, r) -> resolveInferences(parsed, r, l))
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.TreeMap;
import java.util.TreeSet;

import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME;
import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD;
import static org.elasticsearch.xpack.esql.core.type.DataType.OBJECT;
Expand Down Expand Up @@ -74,6 +75,16 @@ public IndexResolver(Client client) {
this.client = client;
}

public void resolveConcreteIndices(String indexPattern, QueryBuilder requestFilter, ActionListener<Set<String>> listener) {
client.execute(
EsqlResolveFieldsAction.TYPE,
createFieldCapsRequest(indexPattern, Set.of("_id"), requestFilter, false),
listener.delegateFailureAndWrap((l, response) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder though what happens here if one of the clusters fails. Will it fail this listener and thus the whole request? In other places, we have to do some handling to figure out what failed and skip failed clusters and so on, but we don't do it here. I'm not sure what happens here if one of the clusters fails?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am afraid of is if EsqlResolveFieldsAction returns partial response with say cluster A failing and thus having no indices but other clusters having indices, then A would be excluded from the list but not marked as skipped, and then there could be problems with it downstream.

l.onResponse(response.getIndexResponses().stream().map(FieldCapabilitiesIndexResponse::getIndexName).collect(toSet()));
})
);
}

/**
* Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ EnrichPolicyResolver mockEnrichResolver() {
ActionListener<EnrichResolution> listener = (ActionListener<EnrichResolution>) arguments[arguments.length - 1];
listener.onResponse(new EnrichResolution());
return null;
}).when(enrichResolver).resolvePolicies(any(), any(), any());
}).when(enrichResolver).resolvePolicies(any(), any(), any(), any());
return enrichResolver;
}

Expand Down