Skip to content

Commit f863c4a

Browse files
committed
Add test demonstrating failures
1 parent 876c456 commit f863c4a

File tree

1 file changed

+135
-0
lines changed

1 file changed

+135
-0
lines changed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.ResourceNotFoundException;
11+
import org.elasticsearch.common.breaker.CircuitBreaker;
12+
import org.elasticsearch.common.breaker.CircuitBreakingException;
13+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
14+
import org.elasticsearch.test.transport.MockTransportService;
15+
import org.elasticsearch.transport.TransportChannel;
16+
import org.elasticsearch.transport.TransportResponse;
17+
import org.elasticsearch.transport.TransportService;
18+
19+
import java.io.IOException;
20+
21+
import static org.hamcrest.Matchers.containsString;
22+
import static org.hamcrest.Matchers.empty;
23+
import static org.hamcrest.Matchers.equalTo;
24+
import static org.hamcrest.Matchers.hasSize;
25+
import static org.hamcrest.Matchers.not;
26+
27+
public class CrossClusterQueryFailsIT extends AbstractCrossClusterTestCase {
28+
29+
private static Exception randomFailure() {
30+
return randomFrom(
31+
new IllegalStateException("driver was closed already"),
32+
new CircuitBreakingException("low memory", CircuitBreaker.Durability.PERMANENT),
33+
new IOException("broken disk"),
34+
new ResourceNotFoundException("index not found"),
35+
new EsRejectedExecutionException("node is shutting down")
36+
);
37+
}
38+
39+
public void testErrorDuringIndexLookupLocalRemote() throws Exception {
40+
setupClusters(2);
41+
Exception simulatedFailure = randomFailure();
42+
setSkipUnavailable(REMOTE_CLUSTER_1, true);
43+
// This will generate a random failure on resolution of remote index
44+
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
45+
MockTransportService ts = asInstanceOf(MockTransportService.class, transportService);
46+
ts.addRequestHandlingBehavior(
47+
EsqlResolveFieldsAction.RESOLVE_REMOTE_TYPE.name(),
48+
(handler, request, channel, task) -> handler.messageReceived(request, new TransportChannel() {
49+
@Override
50+
public String getProfileName() {
51+
return channel.getProfileName();
52+
}
53+
54+
@Override
55+
public void sendResponse(TransportResponse response) {
56+
sendResponse(simulatedFailure);
57+
}
58+
59+
@Override
60+
public void sendResponse(Exception exception) {
61+
channel.sendResponse(exception);
62+
}
63+
}, task)
64+
);
65+
}
66+
try {
67+
try (EsqlQueryResponse resp = runQuery("FROM logs-*,c*:logs-* | LIMIT 1", randomBoolean())) {
68+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
69+
var localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
70+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
71+
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
72+
// FIXME: currently this fails, the remote error gets hidden and is not reported in the response
73+
// field caps response does contain FieldCapabilitiesFailure but it is ignored by mergedMappings
74+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
75+
assertThat(remoteCluster.getFailures(), not(empty()));
76+
var failure = remoteCluster.getFailures().get(0);
77+
assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
78+
}
79+
} finally {
80+
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
81+
MockTransportService ts = asInstanceOf(MockTransportService.class, transportService);
82+
ts.clearAllRules();
83+
}
84+
85+
}
86+
}
87+
88+
public void testErrorDuringIndexLookupRemoteOnly() throws Exception {
89+
setupClusters(2);
90+
Exception simulatedFailure = randomFailure();
91+
setSkipUnavailable(REMOTE_CLUSTER_1, true);
92+
// This will generate a random failure on resolution of remote index
93+
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
94+
MockTransportService ts = asInstanceOf(MockTransportService.class, transportService);
95+
ts.addRequestHandlingBehavior(
96+
EsqlResolveFieldsAction.RESOLVE_REMOTE_TYPE.name(),
97+
(handler, request, channel, task) -> handler.messageReceived(request, new TransportChannel() {
98+
@Override
99+
public String getProfileName() {
100+
return channel.getProfileName();
101+
}
102+
103+
@Override
104+
public void sendResponse(TransportResponse response) {
105+
sendResponse(simulatedFailure);
106+
}
107+
108+
@Override
109+
public void sendResponse(Exception exception) {
110+
channel.sendResponse(exception);
111+
}
112+
}, task)
113+
);
114+
}
115+
try {
116+
// FIXME: this throws exception even though skip_unavailable is set to true on the remote cluster
117+
// It should have caught it and returned an empty response with a failure in metadata
118+
try (EsqlQueryResponse resp = runQuery("FROM c*:logs-* | LIMIT 1", randomBoolean())) {
119+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
120+
assertThat(executionInfo.getClusters().keySet(), hasSize(1));
121+
var remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
122+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
123+
assertThat(remoteCluster.getFailures(), not(empty()));
124+
var failure = remoteCluster.getFailures().get(0);
125+
assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
126+
}
127+
} finally {
128+
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
129+
MockTransportService ts = asInstanceOf(MockTransportService.class, transportService);
130+
ts.clearAllRules();
131+
}
132+
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)