Skip to content

Commit 7070afb

Browse files
committed
Remote lookup join implementation
1 parent f18f4ee commit 7070afb

File tree

3 files changed

+125
-11
lines changed

3 files changed

+125
-11
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
6969
import org.elasticsearch.xpack.esql.plan.logical.Sample;
7070
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
71+
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
7172
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
7273
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
7374
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
@@ -88,6 +89,7 @@
8889
import java.util.function.Function;
8990

9091
import static java.util.Collections.emptyList;
92+
import static org.elasticsearch.xpack.esql.common.Failure.fail;
9193
import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD;
9294
import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD;
9395
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputExpressions;
@@ -620,17 +622,25 @@ public PlanFactory visitJoinCommand(EsqlBaseParser.JoinCommandContext ctx) {
620622
}
621623

622624
return p -> {
625+
// TODO: should this check be here or in Lookup.java?
626+
boolean[] hasRemotes = { false };
623627
p.forEachUp(UnresolvedRelation.class, r -> {
624628
for (var leftPattern : Strings.splitStringByCommaToArray(r.indexPattern().indexPattern())) {
625629
if (RemoteClusterAware.isRemoteIndexName(leftPattern)) {
626-
throw new ParsingException(
627-
source(target),
628-
"invalid index pattern [{}], remote clusters are not supported in LOOKUP JOIN",
629-
r.indexPattern().indexPattern()
630-
);
630+
hasRemotes[0] = true;
631631
}
632632
}
633633
});
634+
if (hasRemotes[0]) {
635+
p.forEachUp(UnaryPlan.class, u -> {
636+
if (u instanceof Aggregate || (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR)) {
637+
throw new ParsingException(
638+
source,
639+
"LOOKUP JOIN with remote indices is not supported after aggregation or local enrich commands"
640+
);
641+
}
642+
});
643+
}
634644

635645
return new LookupJoin(source, p, right, joinFields);
636646
};

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,4 +384,12 @@ public static boolean canAllowPartial(Exception e) {
384384
}
385385
return true;
386386
}
387+
388+
public static String inClusterName(String clusterAlias) {
389+
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
390+
return "in local cluster";
391+
} else {
392+
return "in remote cluster [" + clusterAlias + "]";
393+
}
394+
}
387395
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.indices.IndicesExpressionGrouper;
2727
import org.elasticsearch.logging.LogManager;
2828
import org.elasticsearch.logging.Logger;
29+
import org.elasticsearch.transport.RemoteClusterAware;
2930
import org.elasticsearch.xpack.esql.VerificationException;
3031
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
3132
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
@@ -103,6 +104,7 @@
103104
import java.util.Set;
104105
import java.util.function.Function;
105106
import java.util.stream.Collectors;
107+
import java.util.stream.Stream;
106108

107109
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
108110
import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD;
@@ -350,7 +352,7 @@ public void analyzedPlan(
350352
.<PreAnalysisResult>andThen((l, preAnalysisResult) -> resolveInferences(preAnalysis.inferencePlans, preAnalysisResult, l));
351353
// first resolve the lookup indices, then the main indices
352354
for (var index : preAnalysis.lookupIndices) {
353-
listener = listener.andThen((l, preAnalysisResult) -> { preAnalyzeLookupIndex(index, preAnalysisResult, l); });
355+
listener = listener.andThen((l, preAnalysisResult) -> { preAnalyzeLookupIndex(index, preAnalysisResult, executionInfo, l); });
354356
}
355357
listener.<PreAnalysisResult>andThen((l, result) -> {
356358
// resolve the main indices
@@ -389,16 +391,110 @@ public void analyzedPlan(
389391
}).addListener(logicalPlanListener);
390392
}
391393

392-
private void preAnalyzeLookupIndex(IndexPattern table, PreAnalysisResult result, ActionListener<PreAnalysisResult> listener) {
393-
Set<String> fieldNames = result.wildcardJoinIndices().contains(table.indexPattern()) ? IndexResolver.ALL_FIELDS : result.fieldNames;
394+
private void preAnalyzeLookupIndex(
395+
IndexPattern table,
396+
PreAnalysisResult result,
397+
EsqlExecutionInfo executionInfo,
398+
ActionListener<PreAnalysisResult> listener
399+
) {
400+
String localPattern = table.indexPattern();
401+
assert RemoteClusterAware.isRemoteIndexName(localPattern) == false
402+
: "Lookup index name should not include remote, but got: " + localPattern;
403+
Set<String> fieldNames = result.wildcardJoinIndices().contains(localPattern) ? IndexResolver.ALL_FIELDS : result.fieldNames;
404+
// Get the list of active clusters for the lookup index
405+
Stream<EsqlExecutionInfo.Cluster> clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING);
406+
StringBuilder patternWithRemotes = new StringBuilder(localPattern);
407+
// Create a pattern with all active remote clusters
408+
clusters.forEach(cluster -> {
409+
String clusterAlias = cluster.getClusterAlias();
410+
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
411+
// Skip the local cluster, as it is already included in the localPattern
412+
return;
413+
}
414+
patternWithRemotes.append(",").append(clusterAlias).append(":").append(localPattern);
415+
});
394416
// call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types
395417
indexResolver.resolveAsMergedMapping(
396-
table.indexPattern(),
418+
patternWithRemotes.toString(),
397419
fieldNames,
398420
null,
399-
listener.map(indexResolution -> result.addLookupIndexResolution(table.indexPattern(), indexResolution))
421+
listener.map(indexResolution -> receiveLookupIndexResolution(result, localPattern, executionInfo, indexResolution))
400422
);
401-
// TODO: Verify that the resolved index actually has indexMode: "lookup"
423+
}
424+
425+
private PreAnalysisResult receiveLookupIndexResolution(
426+
PreAnalysisResult result,
427+
String index,
428+
EsqlExecutionInfo executionInfo,
429+
IndexResolution newIndexResolution
430+
) {
431+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, newIndexResolution.unavailableClusters());
432+
if (newIndexResolution.isValid() == false) {
433+
// If the index resolution is invalid, don't bother with the rest of the analysis
434+
return result.addLookupIndexResolution(index, newIndexResolution);
435+
}
436+
// Collect resolved clusters from the index resolution, verify that each cluster has a single resolution for the lookup index
437+
Map<String, String> clustersWithResolvedIndices = new HashMap<>(newIndexResolution.resolvedIndices().size());
438+
newIndexResolution.get().indexNameWithModes().forEach((indexName, indexMode) -> {
439+
if (indexMode != IndexMode.LOOKUP) {
440+
throw new VerificationException(
441+
"Lookup index [" + indexName + "] has index mode [" + indexMode + "], expected [" + IndexMode.LOOKUP + "]"
442+
);
443+
}
444+
String clusterAlias = RemoteClusterAware.parseClusterAlias(indexName);
445+
// Each cluster should have only one resolution for the lookup index
446+
if (clustersWithResolvedIndices.containsKey(clusterAlias)) {
447+
throw new VerificationException(
448+
"Multiple resolutions for lookup index [" + index + "] " + EsqlCCSUtils.inClusterName(clusterAlias)
449+
);
450+
} else {
451+
clustersWithResolvedIndices.put(clusterAlias, indexName);
452+
}
453+
});
454+
455+
// These are clusters that are still in the running, we need to have the index on all of them
456+
Stream<EsqlExecutionInfo.Cluster> clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING);
457+
// Verify that all active clusters have the lookup index resolved
458+
clusters.forEach(cluster -> {
459+
String clusterAlias = cluster.getClusterAlias();
460+
if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) {
461+
// Missing cluster resolution
462+
VerificationException error = new VerificationException(
463+
"Lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias)
464+
);
465+
// For now, local cluster can not be skipped, so we throw an error
466+
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)
467+
|| executionInfo.isSkipUnavailable(clusterAlias) == false) {
468+
throw error;
469+
} else {
470+
// If we can, skip the cluster and mark it as such
471+
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(
472+
executionInfo,
473+
clusterAlias,
474+
EsqlExecutionInfo.Cluster.Status.SKIPPED,
475+
error
476+
);
477+
}
478+
}
479+
});
480+
481+
if (clustersWithResolvedIndices.size() > 1) {
482+
// If we have multiple resolutions for the lookup index, we need to only leave the local resolution
483+
String localIndexName = clustersWithResolvedIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
484+
if (localIndexName == null) {
485+
// Get the first index name instead
486+
localIndexName = clustersWithResolvedIndices.values().iterator().next();
487+
}
488+
var localIndex = new EsIndex(index, newIndexResolution.get().mapping(), Map.of(localIndexName, IndexMode.LOOKUP));
489+
newIndexResolution = IndexResolution.valid(
490+
localIndex,
491+
localIndex.concreteIndices(),
492+
newIndexResolution.getUnavailableShards(),
493+
newIndexResolution.unavailableClusters()
494+
);
495+
}
496+
497+
return result.addLookupIndexResolution(index, newIndexResolution);
402498
}
403499

404500
private void initializeClusterData(List<IndexPattern> indices, EsqlExecutionInfo executionInfo) {

0 commit comments

Comments
 (0)