Skip to content

Commit 5ffc7d2

Browse files
committed
failure tests
1 parent 18c03c5 commit 5ffc7d2

File tree

4 files changed

+281
-71
lines changed

4 files changed

+281
-71
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10+
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1112
import org.elasticsearch.action.index.IndexRequest;
1213
import org.elasticsearch.action.support.WriteRequest;
1314
import org.elasticsearch.client.internal.Client;
1415
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.common.breaker.CircuitBreaker;
17+
import org.elasticsearch.common.breaker.CircuitBreakingException;
1518
import org.elasticsearch.common.settings.Setting;
1619
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1721
import org.elasticsearch.compute.operator.DriverTaskRunner;
1822
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1923
import org.elasticsearch.core.TimeValue;
@@ -270,6 +274,36 @@ protected void populateRemoteIndices(String clusterAlias, String indexName, int
270274
remoteClient.admin().indices().prepareRefresh(indexName).get();
271275
}
272276

277+
protected void populateLookupIndex(String clusterAlias, String indexName, int numDocs, String keyType) {
278+
Client client = client(clusterAlias);
279+
String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias;
280+
String field_tag = Strings.isEmpty(clusterAlias) ? "local_tag" : "remote_tag";
281+
assertAcked(
282+
client.admin()
283+
.indices()
284+
.prepareCreate(indexName)
285+
.setSettings(Settings.builder().put("index.mode", "lookup"))
286+
.setMapping(
287+
"lookup_key",
288+
"type=" + keyType,
289+
"lookup_name",
290+
"type=keyword",
291+
"lookup_tag",
292+
"type=keyword",
293+
field_tag,
294+
"type=keyword"
295+
)
296+
);
297+
for (int i = 0; i < numDocs; i++) {
298+
client.prepareIndex(indexName).setSource("lookup_key", i, "lookup_name", "lookup_" + i, "lookup_tag", tag, field_tag, i).get();
299+
}
300+
client.admin().indices().prepareRefresh(indexName).get();
301+
}
302+
303+
protected void populateLookupIndex(String clusterAlias, String indexName, int numDocs) {
304+
populateLookupIndex(clusterAlias, indexName, numDocs, "long");
305+
}
306+
273307
protected void setSkipUnavailable(String clusterAlias, boolean skip) {
274308
client(LOCAL_CLUSTER).admin()
275309
.cluster()
@@ -314,4 +348,14 @@ protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse
314348
static List<TaskInfo> getDriverTasks(Client client) {
315349
return client.admin().cluster().prepareListTasks().setActions(DriverTaskRunner.ACTION_NAME).setDetailed(true).get().getTasks();
316350
}
351+
352+
protected static Exception randomFailure() {
353+
return randomFrom(
354+
new IllegalStateException("driver was closed already"),
355+
new CircuitBreakingException("low memory", CircuitBreaker.Durability.PERMANENT),
356+
new IOException("broken disk"),
357+
new ResourceNotFoundException("exchange sink was not found"),
358+
new EsRejectedExecutionException("node is shutting down")
359+
);
360+
}
317361
}
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.ExceptionsHelper;
12+
import org.elasticsearch.test.junit.annotations.TestLogging;
13+
import org.elasticsearch.test.transport.MockTransportService;
14+
import org.elasticsearch.transport.TransportChannel;
15+
import org.elasticsearch.transport.TransportResponse;
16+
import org.elasticsearch.transport.TransportService;
17+
import org.elasticsearch.xpack.esql.VerificationException;
18+
19+
import java.io.IOException;
20+
import java.util.List;
21+
22+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
23+
import static org.hamcrest.Matchers.containsString;
24+
import static org.hamcrest.Matchers.empty;
25+
import static org.hamcrest.Matchers.equalTo;
26+
import static org.hamcrest.Matchers.hasItems;
27+
import static org.hamcrest.Matchers.hasSize;
28+
import static org.hamcrest.Matchers.not;
29+
30+
@TestLogging(value = "org.elasticsearch.xpack.esql.session:DEBUG", reason = "to better understand planning")
31+
public class CrossClusterLookupJoinFailuresIT extends AbstractCrossClusterTestCase {
32+
protected boolean reuseClusters() {
33+
return false;
34+
}
35+
36+
public void testLookupFail() throws IOException {
37+
setupClusters(3);
38+
populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10);
39+
populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 25);
40+
populateLookupIndex(REMOTE_CLUSTER_2, "values_lookup", 25);
41+
42+
setSkipUnavailable(REMOTE_CLUSTER_1, true);
43+
Exception simulatedFailure = randomFailure();
44+
// fail when trying to do the lookup on remote cluster
45+
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
46+
MockTransportService ts = asInstanceOf(MockTransportService.class, transportService);
47+
ts.addRequestHandlingBehavior(
48+
EsqlResolveFieldsAction.RESOLVE_REMOTE_TYPE.name(),
49+
(handler, request, channel, task) -> handler.messageReceived(request, new TransportChannel() {
50+
@Override
51+
public String getProfileName() {
52+
return channel.getProfileName();
53+
}
54+
55+
@Override
56+
public void sendResponse(TransportResponse response) {
57+
sendResponse(simulatedFailure);
58+
}
59+
60+
@Override
61+
public void sendResponse(Exception exception) {
62+
channel.sendResponse(exception);
63+
}
64+
}, task)
65+
);
66+
}
67+
68+
try {
69+
try (
70+
EsqlQueryResponse resp = runQuery(
71+
"FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
72+
randomBoolean()
73+
)
74+
) {
75+
var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList();
76+
assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag"));
77+
78+
List<List<Object>> values = getValuesList(resp);
79+
assertThat(values, hasSize(10));
80+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
81+
82+
var localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
83+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
84+
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
85+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
86+
assertThat(remoteCluster.getFailures(), not(empty()));
87+
var failure = remoteCluster.getFailures().get(0);
88+
// FIXME: this produces a wrong message currently
89+
// assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
90+
assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
91+
}
92+
93+
try (
94+
EsqlQueryResponse resp = runQuery(
95+
"FROM logs-*,*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
96+
randomBoolean()
97+
)
98+
) {
99+
var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList();
100+
assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag"));
101+
102+
List<List<Object>> values = getValuesList(resp);
103+
assertThat(values, hasSize(20));
104+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
105+
106+
var localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
107+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
108+
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
109+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
110+
var remoteCluster2 = executionInfo.getCluster(REMOTE_CLUSTER_2);
111+
assertThat(remoteCluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
112+
assertThat(remoteCluster.getFailures(), not(empty()));
113+
var failure = remoteCluster.getFailures().get(0);
114+
// FIXME: this produces a wrong message currently
115+
// assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
116+
assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
117+
}
118+
119+
// FIXME: this should work but fails
120+
/*
121+
try (
122+
EsqlQueryResponse resp = runQuery(
123+
"FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
124+
randomBoolean()
125+
)
126+
) {
127+
var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList();
128+
assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag"));
129+
130+
List<List<Object>> values = getValuesList(resp);
131+
assertThat(values, hasSize(0));
132+
133+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
134+
135+
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
136+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
137+
assertThat(remoteCluster.getFailures(), not(empty()));
138+
var failure = remoteCluster.getFailures().get(0);
139+
assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
140+
} */
141+
142+
// now fail
143+
setSkipUnavailable(REMOTE_CLUSTER_1, false);
144+
Exception ex = expectThrows(
145+
VerificationException.class,
146+
() -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
147+
);
148+
assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
149+
150+
ex = expectThrows(
151+
Exception.class,
152+
() -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
153+
);
154+
assertThat(ex.getMessage(), containsString(simulatedFailure.getMessage()));
155+
} finally {
156+
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
157+
MockTransportService ts = asInstanceOf(MockTransportService.class, transportService);
158+
ts.clearAllRules();
159+
}
160+
}
161+
}
162+
163+
public void testLookupDisconnect() throws IOException {
164+
setupClusters(2);
165+
populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10);
166+
populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 10);
167+
168+
setSkipUnavailable(REMOTE_CLUSTER_1, true);
169+
try {
170+
// close the remote cluster so that it is unavailable
171+
cluster(REMOTE_CLUSTER_1).close();
172+
173+
try (
174+
EsqlQueryResponse resp = runQuery(
175+
"FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
176+
randomBoolean()
177+
)
178+
) {
179+
var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList();
180+
// this is a bit weird but that's how it works
181+
assertThat(columns, hasSize(1));
182+
assertThat(columns, hasItems("<no-fields>"));
183+
List<List<Object>> values = getValuesList(resp);
184+
assertThat(values, hasSize(0));
185+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
186+
187+
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
188+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
189+
assertThat(remoteCluster.getFailures(), not(empty()));
190+
var failure = remoteCluster.getFailures().get(0);
191+
assertThat(failure.reason(), containsString("unable to connect to remote cluster"));
192+
}
193+
194+
try (
195+
EsqlQueryResponse resp = runQuery(
196+
"FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
197+
randomBoolean()
198+
)
199+
) {
200+
var columns = resp.columns().stream().map(ColumnInfoImpl::name).toList();
201+
assertThat(columns, hasItems("lookup_key", "lookup_name", "lookup_tag", "v", "tag"));
202+
203+
List<List<Object>> values = getValuesList(resp);
204+
assertThat(values, hasSize(10));
205+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
206+
207+
var localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
208+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
209+
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
210+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
211+
assertThat(remoteCluster.getFailures(), not(empty()));
212+
var failure = remoteCluster.getFailures().get(0);
213+
assertThat(
214+
failure.reason(),
215+
containsString("Remote cluster [cluster-a] (with setting skip_unavailable=true) is not available")
216+
);
217+
}
218+
219+
setSkipUnavailable(REMOTE_CLUSTER_1, false);
220+
Exception ex = expectThrows(
221+
ElasticsearchException.class,
222+
() -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
223+
);
224+
assertTrue(ExceptionsHelper.isRemoteUnavailableException(ex));
225+
226+
ex = expectThrows(
227+
ElasticsearchException.class,
228+
() -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
229+
);
230+
assertTrue(ExceptionsHelper.isRemoteUnavailableException(ex));
231+
} finally {
232+
clearSkipUnavailable(2);
233+
}
234+
}
235+
236+
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10-
import org.elasticsearch.client.internal.Client;
11-
import org.elasticsearch.common.Strings;
12-
import org.elasticsearch.common.settings.Settings;
1310
import org.elasticsearch.test.junit.annotations.TestLogging;
1411
import org.elasticsearch.xpack.esql.VerificationException;
1512
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -18,7 +15,6 @@
1815
import java.util.List;
1916
import java.util.Map;
2017

21-
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
2218
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
2319
import static org.hamcrest.Matchers.containsString;
2420
import static org.hamcrest.Matchers.empty;
@@ -322,7 +318,7 @@ public void testLookupJoinIndexMode() throws IOException {
322318
public void testLookupJoinFieldTypes() throws IOException {
323319
setupClusters(2);
324320
populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10);
325-
populateLookupIndexKeyword(REMOTE_CLUSTER_1, "values_lookup", 10);
321+
populateLookupIndex(REMOTE_CLUSTER_1, "values_lookup", 10, "keyword");
326322

327323
setSkipUnavailable(REMOTE_CLUSTER_1, true);
328324
var ex = expectThrows(
@@ -360,58 +356,6 @@ protected Map<String, Object> setupClustersAndLookups() throws IOException {
360356
return setupData;
361357
}
362358

363-
protected void populateLookupIndex(String clusterAlias, String indexName, int numDocs) {
364-
Client client = client(clusterAlias);
365-
String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias;
366-
String field_tag = Strings.isEmpty(clusterAlias) ? "local_tag" : "remote_tag";
367-
assertAcked(
368-
client.admin()
369-
.indices()
370-
.prepareCreate(indexName)
371-
.setSettings(Settings.builder().put("index.mode", "lookup"))
372-
.setMapping(
373-
"lookup_key",
374-
"type=long",
375-
"lookup_name",
376-
"type=keyword",
377-
"lookup_tag",
378-
"type=keyword",
379-
field_tag,
380-
"type=keyword"
381-
)
382-
);
383-
for (int i = 0; i < numDocs; i++) {
384-
client.prepareIndex(indexName).setSource("lookup_key", i, "lookup_name", "lookup_" + i, "lookup_tag", tag, field_tag, i).get();
385-
}
386-
client.admin().indices().prepareRefresh(indexName).get();
387-
}
388-
389-
protected void populateLookupIndexKeyword(String clusterAlias, String indexName, int numDocs) {
390-
Client client = client(clusterAlias);
391-
String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias;
392-
String field_tag = Strings.isEmpty(clusterAlias) ? "local_tag" : "remote_tag";
393-
assertAcked(
394-
client.admin()
395-
.indices()
396-
.prepareCreate(indexName)
397-
.setSettings(Settings.builder().put("index.mode", "lookup"))
398-
.setMapping(
399-
"lookup_key",
400-
"type=keyword",
401-
"lookup_name",
402-
"type=keyword",
403-
"lookup_tag",
404-
"type=keyword",
405-
field_tag,
406-
"type=keyword"
407-
)
408-
);
409-
for (int i = 0; i < numDocs; i++) {
410-
client.prepareIndex(indexName).setSource("lookup_key", i, "lookup_name", "lookup_" + i, "lookup_tag", tag, field_tag, i).get();
411-
}
412-
client.admin().indices().prepareRefresh(indexName).get();
413-
}
414-
415359
private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) {
416360
assertNotNull(executionInfo);
417361
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));

0 commit comments

Comments
 (0)