Skip to content

Commit 352e148

Browse files
dnhatncbuescher
authored andcommitted
Avoid wrapping rejection exception in exchange (elastic#112178)
We should avoid wrapping EsRejectedExecutionException in an ElasticsearchException as it would change the status code from 429 to 500. Ideally, we should avoid wrapping exceptions altogether, but that would require bigger changes. Closes elastic#112106
1 parent 8e6394c commit 352e148

File tree

5 files changed

+93
-2
lines changed

5 files changed

+93
-2
lines changed

docs/changelog/112178.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 112178
2+
summary: Avoid wrapping rejection exception in exchange
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 112106

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ private void checkFailure() {
146146
Exception e = failureCollector.getFailure();
147147
if (e != null) {
148148
discardPages();
149-
throw ExceptionsHelper.convertToElastic(e);
149+
throw ExceptionsHelper.convertToRuntime(e);
150150
}
151151
}
152152

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private class ExchangeSourceImpl implements ExchangeSource {
5454
private void checkFailure() {
5555
Exception e = failure.getFailure();
5656
if (e != null) {
57-
throw ExceptionsHelper.convertToElastic(e);
57+
throw ExceptionsHelper.convertToRuntime(e);
5858
}
5959
}
6060

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10+
import org.elasticsearch.ExceptionsHelper;
1011
import org.elasticsearch.action.ActionType;
1112
import org.elasticsearch.action.support.ActionFilters;
1213
import org.elasticsearch.action.support.TransportAction;
@@ -16,6 +17,7 @@
1617
import org.elasticsearch.common.settings.Setting;
1718
import org.elasticsearch.common.settings.Settings;
1819
import org.elasticsearch.common.unit.ByteSizeValue;
20+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1921
import org.elasticsearch.compute.data.BlockFactory;
2022
import org.elasticsearch.compute.operator.DriverProfile;
2123
import org.elasticsearch.compute.operator.DriverStatus;
@@ -30,6 +32,9 @@
3032
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
3133
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
3234
import org.elasticsearch.reindex.ReindexPlugin;
35+
import org.elasticsearch.rest.RestStatus;
36+
import org.elasticsearch.test.transport.MockTransportService;
37+
import org.elasticsearch.transport.RemoteTransportException;
3338
import org.elasticsearch.transport.TransportService;
3439
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
3540
import org.elasticsearch.xpack.core.XPackSettings;
@@ -43,6 +48,7 @@
4348
import org.elasticsearch.xpack.enrich.EnrichPlugin;
4449
import org.elasticsearch.xpack.esql.EsqlTestUtils;
4550
import org.elasticsearch.xpack.esql.core.type.DataType;
51+
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
4652
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
4753
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
4854
import org.junit.After;
@@ -82,6 +88,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
8288
plugins.add(IngestCommonPlugin.class);
8389
plugins.add(ReindexPlugin.class);
8490
plugins.add(InternalTransportSettingPlugin.class);
91+
plugins.add(MockTransportService.TestPlugin.class);
8592
return plugins;
8693
}
8794

@@ -420,6 +427,24 @@ public void testManyDocuments() {
420427
}
421428
}
422429

430+
public void testRejection() {
431+
for (var ts : internalCluster().getInstances(TransportService.class)) {
432+
((MockTransportService) ts).addRequestHandlingBehavior(EnrichLookupService.LOOKUP_ACTION_NAME, (h, r, channel, t) -> {
433+
EsRejectedExecutionException ex = new EsRejectedExecutionException("test", false);
434+
channel.sendResponse(new RemoteTransportException("test", ex));
435+
});
436+
}
437+
try {
438+
String query = "FROM listen* | " + enrichSongCommand();
439+
Exception error = expectThrows(Exception.class, () -> run(query).close());
440+
assertThat(ExceptionsHelper.status(error), equalTo(RestStatus.TOO_MANY_REQUESTS));
441+
} finally {
442+
for (var ts : internalCluster().getInstances(TransportService.class)) {
443+
((MockTransportService) ts).clearAllRules();
444+
}
445+
}
446+
}
447+
423448
public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin {
424449

425450
public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,24 @@
88
package org.elasticsearch.xpack.esql.action;
99

1010
import org.apache.lucene.tests.util.LuceneTestCase;
11+
import org.elasticsearch.ExceptionsHelper;
12+
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.action.bulk.BulkRequestBuilder;
1214
import org.elasticsearch.action.index.IndexRequest;
1315
import org.elasticsearch.action.support.WriteRequest;
1416
import org.elasticsearch.cluster.metadata.IndexMetadata;
1517
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
19+
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1620
import org.elasticsearch.plugins.Plugin;
21+
import org.elasticsearch.rest.RestStatus;
1722
import org.elasticsearch.search.MockSearchService;
1823
import org.elasticsearch.search.SearchService;
24+
import org.elasticsearch.test.transport.MockTransportService;
25+
import org.elasticsearch.transport.RemoteTransportException;
26+
import org.elasticsearch.transport.TransportChannel;
27+
import org.elasticsearch.transport.TransportResponse;
28+
import org.elasticsearch.transport.TransportService;
1929
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
2030
import org.hamcrest.Matchers;
2131
import org.junit.Before;
@@ -27,6 +37,10 @@
2737
import java.util.concurrent.CountDownLatch;
2838
import java.util.concurrent.TimeUnit;
2939
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.concurrent.atomic.AtomicReference;
41+
42+
import static org.hamcrest.Matchers.equalTo;
43+
import static org.hamcrest.Matchers.instanceOf;
3044

3145
/**
3246
* Make sures that we can run many concurrent requests with large number of shards with any data_partitioning.
@@ -38,6 +52,7 @@ public class ManyShardsIT extends AbstractEsqlIntegTestCase {
3852
protected Collection<Class<? extends Plugin>> getMockPlugins() {
3953
var plugins = new ArrayList<>(super.getMockPlugins());
4054
plugins.add(MockSearchService.TestPlugin.class);
55+
plugins.add(MockTransportService.TestPlugin.class);
4156
return plugins;
4257
}
4358

@@ -97,6 +112,51 @@ public void testConcurrentQueries() throws Exception {
97112
}
98113
}
99114

115+
public void testRejection() throws Exception {
116+
String[] nodes = internalCluster().getNodeNames();
117+
for (String node : nodes) {
118+
MockTransportService ts = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
119+
ts.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
120+
handler.messageReceived(request, new TransportChannel() {
121+
@Override
122+
public String getProfileName() {
123+
return channel.getProfileName();
124+
}
125+
126+
@Override
127+
public void sendResponse(TransportResponse response) {
128+
channel.sendResponse(new RemoteTransportException("simulated", new EsRejectedExecutionException("test queue")));
129+
}
130+
131+
@Override
132+
public void sendResponse(Exception exception) {
133+
channel.sendResponse(exception);
134+
}
135+
}, task);
136+
});
137+
}
138+
try {
139+
AtomicReference<Exception> failure = new AtomicReference<>();
140+
EsqlQueryRequest request = new EsqlQueryRequest();
141+
request.query("from test-* | stats count(user) by tags");
142+
request.acceptedPragmaRisks(true);
143+
request.pragmas(randomPragmas());
144+
CountDownLatch queryLatch = new CountDownLatch(1);
145+
client().execute(EsqlQueryAction.INSTANCE, request, ActionListener.runAfter(ActionListener.wrap(r -> {
146+
r.close();
147+
throw new AssertionError("expected failure");
148+
}, failure::set), queryLatch::countDown));
149+
assertTrue(queryLatch.await(10, TimeUnit.SECONDS));
150+
assertThat(failure.get(), instanceOf(EsRejectedExecutionException.class));
151+
assertThat(ExceptionsHelper.status(failure.get()), equalTo(RestStatus.TOO_MANY_REQUESTS));
152+
assertThat(failure.get().getMessage(), equalTo("test queue"));
153+
} finally {
154+
for (String node : nodes) {
155+
((MockTransportService) internalCluster().getInstance(TransportService.class, node)).clearAllRules();
156+
}
157+
}
158+
}
159+
100160
static class SearchContextCounter {
101161
private final int maxAllowed;
102162
private final AtomicInteger current = new AtomicInteger();

0 commit comments

Comments
 (0)