Skip to content

Commit 8aa7e8e

Browse files
committed
More tests & fixes
1 parent 96e9bb8 commit 8aa7e8e

File tree

4 files changed

+135
-67
lines changed

4 files changed

+135
-67
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java

Lines changed: 91 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,21 @@ public void testLookupJoinAcrossClusters() throws IOException {
6969
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
7070
assertCCSExecutionInfoDetails(executionInfo);
7171
}
72+
73+
// populateLookupIndex(LOCAL_CLUSTER, "values_lookup2", 5);
74+
// populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup2", 5);
75+
// FIXME: this currently does not work
76+
// try (
77+
// EsqlQueryResponse resp = runQuery(
78+
// "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key | LOOKUP JOIN values_lookup2 ON
79+
// lookup_tag",
80+
// randomBoolean()
81+
// )
82+
// ) {
83+
// List<List<Object>> values = getValuesList(resp);
84+
// assertThat(values, hasSize(20));
85+
//
86+
// }
7287
}
7388

7489
public void testLookupJoinMissingRemoteIndex() throws IOException {
@@ -98,31 +113,50 @@ public void testLookupJoinMissingRemoteIndex() throws IOException {
98113
assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
99114
}
100115
// Without local
116+
// FIXME: this is inconsistent due to how field-caps works - if there's no index at all, it fails, but if there's one but not
117+
// another, it succeeds. Ideally, this would be empty result with remote1 skipped, but field-caps fails.
118+
var ex = expectThrows(
119+
VerificationException.class,
120+
() -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
121+
);
122+
assertThat(ex.getMessage(), containsString("Unknown index [cluster-a:values_lookup]"));
123+
124+
setSkipUnavailable(REMOTE_CLUSTER_1, false);
125+
// then missing index is an error
126+
ex = expectThrows(
127+
VerificationException.class,
128+
() -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
129+
);
130+
assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
131+
}
132+
133+
public void testLookupJoinMissingRemoteIndexTwoRemotes() throws IOException {
134+
setupClusters(3);
135+
populateLookupIndex(REMOTE_CLUSTER_2, "values_lookup", 10);
136+
137+
setSkipUnavailable(REMOTE_CLUSTER_1, true);
138+
setSkipUnavailable(REMOTE_CLUSTER_2, false);
139+
140+
// FIXME: inconsistent with the previous test, remote1:values_lookup still missing, but now it succeeds with remote1 skipped
101141
try (
102142
EsqlQueryResponse resp = runQuery(
103-
"FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
143+
"FROM *:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
104144
randomBoolean()
105145
)
106146
) {
107147
List<List<Object>> values = getValuesList(resp);
108-
assertThat(values, hasSize(0));
148+
assertThat(values, hasSize(10));
109149
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
110-
assertThat(executionInfo.getClusters().size(), equalTo(1));
150+
assertThat(executionInfo.getClusters().size(), equalTo(2));
111151

112-
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
113-
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
114-
assertThat(remoteCluster.getFailures(), not(empty()));
115-
var failure = remoteCluster.getFailures().get(0);
152+
var remoteCluster1 = executionInfo.getCluster(REMOTE_CLUSTER_1);
153+
assertThat(remoteCluster1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
154+
assertThat(remoteCluster1.getFailures(), not(empty()));
155+
var failure = remoteCluster1.getFailures().get(0);
116156
assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
157+
var remoteCluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2);
158+
assertThat(remoteCluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
117159
}
118-
119-
setSkipUnavailable(REMOTE_CLUSTER_1, false);
120-
// then missing index is an error
121-
var ex = expectThrows(
122-
VerificationException.class,
123-
() -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
124-
);
125-
assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
126160
}
127161

128162
public void testLookupJoinMissingLocalIndex() throws IOException {
@@ -134,6 +168,26 @@ public void testLookupJoinMissingLocalIndex() throws IOException {
134168
() -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
135169
);
136170
assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in local cluster"));
171+
172+
// Without local in the query it's ok
173+
try (
174+
EsqlQueryResponse resp = runQuery(
175+
"FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
176+
randomBoolean()
177+
)
178+
) {
179+
List<List<Object>> values = getValuesList(resp);
180+
assertThat(values, hasSize(10));
181+
var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList();
182+
assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag", "remote_tag"));
183+
assertThat(columns, not(hasItems("local_tag")));
184+
185+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
186+
assertThat(executionInfo.getClusters().size(), equalTo(1));
187+
188+
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
189+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
190+
}
137191
}
138192

139193
public void testLookupJoinMissingKey() throws IOException {
@@ -143,6 +197,7 @@ public void testLookupJoinMissingKey() throws IOException {
143197

144198
setSkipUnavailable(REMOTE_CLUSTER_1, true);
145199
try (
200+
// Using local_tag as key which is not present in remote index
146201
EsqlQueryResponse resp = runQuery(
147202
"FROM logs-*,c*:logs-* | EVAL local_tag = to_string(v) | LOOKUP JOIN values_lookup ON local_tag",
148203
randomBoolean()
@@ -153,12 +208,11 @@ public void testLookupJoinMissingKey() throws IOException {
153208
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
154209
assertThat(executionInfo.getClusters().size(), equalTo(2));
155210

211+
var localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
212+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
156213
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
157214
// FIXME: verify whether we need to skip or succeed here
158-
// assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
159-
// assertThat(remoteCluster.getFailures(), not(empty()));
160-
// var failure = remoteCluster.getFailures().get(0);
161-
// assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
215+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
162216
}
163217

164218
// TODO: verify whether this should be an error or not when the key field is missing
@@ -175,6 +229,24 @@ public void testLookupJoinMissingKey() throws IOException {
175229
assertThat(ex.getMessage(), containsString("Unknown column [local_tag] in right side of join"));
176230

177231
setSkipUnavailable(REMOTE_CLUSTER_1, false);
232+
try (
233+
// Using local_tag as key which is not present in remote index
234+
EsqlQueryResponse resp = runQuery(
235+
"FROM logs-*,c*:logs-* | EVAL local_tag = to_string(v) | LOOKUP JOIN values_lookup ON local_tag",
236+
randomBoolean()
237+
)
238+
) {
239+
List<List<Object>> values = getValuesList(resp);
240+
assertThat(values, hasSize(20));
241+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
242+
assertThat(executionInfo.getClusters().size(), equalTo(2));
243+
244+
var localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
245+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
246+
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
247+
// FIXME: verify whether we need to succeed or fail here
248+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
249+
}
178250
}
179251

180252
public void testLookupJoinIndexMode() throws IOException {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -95,32 +95,32 @@ public void postAnalysisVerification(Failures failures) {
9595
checkRemoteJoin(this, failures);
9696
}
9797
// TODO: this is probably not necessary anymore as we check it in analysis stage?
98-
right().forEachDown(EsRelation.class, esr -> {
99-
var indexNameWithModes = esr.indexNameWithModes();
100-
if (indexNameWithModes.size() != 1) {
101-
failures.add(
102-
fail(
103-
esr,
104-
"Lookup Join requires a single lookup mode index; [{}] resolves to [{}] indices",
105-
esr.indexPattern(),
106-
indexNameWithModes.size()
107-
)
108-
);
109-
return;
110-
}
111-
var indexAndMode = indexNameWithModes.entrySet().iterator().next();
112-
if (indexAndMode.getValue() != IndexMode.LOOKUP) {
113-
failures.add(
114-
fail(
115-
esr,
116-
"Lookup Join requires a single lookup mode index; [{}] resolves to [{}] in [{}] mode",
117-
esr.indexPattern(),
118-
indexAndMode.getKey(),
119-
indexAndMode.getValue()
120-
)
121-
);
122-
}
123-
});
98+
// right().forEachDown(EsRelation.class, esr -> {
99+
// var indexNameWithModes = esr.indexNameWithModes();
100+
// if (indexNameWithModes.size() != 1) {
101+
// failures.add(
102+
// fail(
103+
// esr,
104+
// "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] indices",
105+
// esr.indexPattern(),
106+
// indexNameWithModes.size()
107+
// )
108+
// );
109+
// return;
110+
// }
111+
// var indexAndMode = indexNameWithModes.entrySet().iterator().next();
112+
// if (indexAndMode.getValue() != IndexMode.LOOKUP) {
113+
// failures.add(
114+
// fail(
115+
// esr,
116+
// "Lookup Join requires a single lookup mode index; [{}] resolves to [{}] in [{}] mode",
117+
// esr.indexPattern(),
118+
// indexAndMode.getKey(),
119+
// indexAndMode.getValue()
120+
// )
121+
// );
122+
// }
123+
// });
124124
}
125125

126126
private static void checkRemoteJoin(LogicalPlan plan, Failures failures) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.elasticsearch.logging.Logger;
6161
import org.elasticsearch.node.Node;
6262
import org.elasticsearch.tasks.CancellableTask;
63+
import org.elasticsearch.transport.RemoteClusterAware;
6364
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
6465
import org.elasticsearch.xpack.esql.action.ColumnInfoImpl;
6566
import org.elasticsearch.xpack.esql.core.expression.Alias;
@@ -122,6 +123,7 @@
122123
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
123124
import org.elasticsearch.xpack.esql.score.ScoreMapper;
124125
import org.elasticsearch.xpack.esql.session.Configuration;
126+
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
125127

126128
import java.util.ArrayList;
127129
import java.util.List;
@@ -720,15 +722,24 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
720722
if (localSourceExec.indexMode() != IndexMode.LOOKUP) {
721723
throw new IllegalArgumentException("can't plan [" + join + "]");
722724
}
723-
Map<String, IndexMode> indicesWithModes = localSourceExec.indexNameWithModes();
724-
if (indicesWithModes.size() != 1) {
725-
throw new IllegalArgumentException("can't plan [" + join + "], found more than 1 index");
725+
726+
var maybeEntry = localSourceExec.indexNameWithModes()
727+
.entrySet()
728+
.stream()
729+
.filter(e -> RemoteClusterAware.parseClusterAlias(e.getKey()).equals(clusterAlias))
730+
.findFirst();
731+
732+
if (maybeEntry.isEmpty()) {
733+
throw new IllegalArgumentException(
734+
"can't plan [" + join + "]: no matching index found " + EsqlCCSUtils.inClusterName(clusterAlias)
735+
);
726736
}
727-
var entry = indicesWithModes.entrySet().iterator().next();
737+
var entry = maybeEntry.get();
738+
728739
if (entry.getValue() != IndexMode.LOOKUP) {
729740
throw new IllegalArgumentException("can't plan [" + join + "], found index with mode [" + entry.getValue() + "]");
730741
}
731-
String indexName = entry.getKey();
742+
String indexName = RemoteClusterAware.splitIndexName(entry.getKey())[1];
732743
if (join.leftFields().size() != join.rightFields().size()) {
733744
throw new IllegalArgumentException("can't plan [" + join + "]: mismatching left and right field count");
734745
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ private void preAnalyzeLookupIndex(
409409
} else {
410410
patternWithRemotes = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING)
411411
.map(c -> RemoteClusterAware.buildRemoteIndexName(c.getClusterAlias(), localPattern))
412-
.collect(Collectors.joining(", "));
412+
.collect(Collectors.joining(","));
413413
}
414414
if (patternWithRemotes.isEmpty()) {
415415
return;
@@ -483,23 +483,8 @@ private PreAnalysisResult receiveLookupIndexResolution(
483483
}
484484
});
485485

486-
if (clustersWithResolvedIndices.size() > 1) {
487-
// If we have multiple resolutions for the lookup index, we need to only leave the local resolution
488-
String localIndexName = clustersWithResolvedIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
489-
if (localIndexName == null) {
490-
// Get the first index name instead
491-
localIndexName = RemoteClusterAware.splitIndexName(clustersWithResolvedIndices.values().iterator().next())[1];
492-
}
493-
var localIndex = new EsIndex(index, newIndexResolution.get().mapping(), Map.of(localIndexName, IndexMode.LOOKUP));
494-
newIndexResolution = IndexResolution.valid(
495-
localIndex,
496-
localIndex.concreteIndices(),
497-
newIndexResolution.getUnavailableShards(),
498-
newIndexResolution.unavailableClusters()
499-
);
500-
}
501-
502486
return result.addLookupIndexResolution(index, newIndexResolution);
487+
503488
}
504489

505490
private void initializeClusterData(List<IndexPattern> indices, EsqlExecutionInfo executionInfo) {

0 commit comments

Comments
 (0)