8
8
package org .elasticsearch .xpack .esql .action ;
9
9
10
10
import org .apache .lucene .tests .util .LuceneTestCase ;
11
+ import org .elasticsearch .ExceptionsHelper ;
12
+ import org .elasticsearch .action .ActionListener ;
11
13
import org .elasticsearch .action .bulk .BulkRequestBuilder ;
12
14
import org .elasticsearch .action .index .IndexRequest ;
13
15
import org .elasticsearch .action .support .WriteRequest ;
14
16
import org .elasticsearch .cluster .metadata .IndexMetadata ;
15
17
import org .elasticsearch .common .settings .Settings ;
18
+ import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
19
+ import org .elasticsearch .compute .operator .exchange .ExchangeService ;
16
20
import org .elasticsearch .plugins .Plugin ;
21
+ import org .elasticsearch .rest .RestStatus ;
17
22
import org .elasticsearch .search .MockSearchService ;
18
23
import org .elasticsearch .search .SearchService ;
24
+ import org .elasticsearch .test .transport .MockTransportService ;
25
+ import org .elasticsearch .transport .RemoteTransportException ;
26
+ import org .elasticsearch .transport .TransportChannel ;
27
+ import org .elasticsearch .transport .TransportResponse ;
28
+ import org .elasticsearch .transport .TransportService ;
19
29
import org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
20
30
import org .hamcrest .Matchers ;
21
31
import org .junit .Before ;
27
37
import java .util .concurrent .CountDownLatch ;
28
38
import java .util .concurrent .TimeUnit ;
29
39
import java .util .concurrent .atomic .AtomicInteger ;
40
+ import java .util .concurrent .atomic .AtomicReference ;
41
+
42
+ import static org .hamcrest .Matchers .equalTo ;
43
+ import static org .hamcrest .Matchers .instanceOf ;
30
44
31
45
/**
32
46
* Make sures that we can run many concurrent requests with large number of shards with any data_partitioning.
@@ -38,6 +52,7 @@ public class ManyShardsIT extends AbstractEsqlIntegTestCase {
38
52
protected Collection <Class <? extends Plugin >> getMockPlugins () {
39
53
var plugins = new ArrayList <>(super .getMockPlugins ());
40
54
plugins .add (MockSearchService .TestPlugin .class );
55
+ plugins .add (MockTransportService .TestPlugin .class );
41
56
return plugins ;
42
57
}
43
58
@@ -97,6 +112,51 @@ public void testConcurrentQueries() throws Exception {
97
112
}
98
113
}
99
114
115
+ public void testRejection () throws Exception {
116
+ String [] nodes = internalCluster ().getNodeNames ();
117
+ for (String node : nodes ) {
118
+ MockTransportService ts = (MockTransportService ) internalCluster ().getInstance (TransportService .class , node );
119
+ ts .addRequestHandlingBehavior (ExchangeService .EXCHANGE_ACTION_NAME , (handler , request , channel , task ) -> {
120
+ handler .messageReceived (request , new TransportChannel () {
121
+ @ Override
122
+ public String getProfileName () {
123
+ return channel .getProfileName ();
124
+ }
125
+
126
+ @ Override
127
+ public void sendResponse (TransportResponse response ) {
128
+ channel .sendResponse (new RemoteTransportException ("simulated" , new EsRejectedExecutionException ("test queue" )));
129
+ }
130
+
131
+ @ Override
132
+ public void sendResponse (Exception exception ) {
133
+ channel .sendResponse (exception );
134
+ }
135
+ }, task );
136
+ });
137
+ }
138
+ try {
139
+ AtomicReference <Exception > failure = new AtomicReference <>();
140
+ EsqlQueryRequest request = new EsqlQueryRequest ();
141
+ request .query ("from test-* | stats count(user) by tags" );
142
+ request .acceptedPragmaRisks (true );
143
+ request .pragmas (randomPragmas ());
144
+ CountDownLatch queryLatch = new CountDownLatch (1 );
145
+ client ().execute (EsqlQueryAction .INSTANCE , request , ActionListener .runAfter (ActionListener .wrap (r -> {
146
+ r .close ();
147
+ throw new AssertionError ("expected failure" );
148
+ }, failure ::set ), queryLatch ::countDown ));
149
+ assertTrue (queryLatch .await (10 , TimeUnit .SECONDS ));
150
+ assertThat (failure .get (), instanceOf (EsRejectedExecutionException .class ));
151
+ assertThat (ExceptionsHelper .status (failure .get ()), equalTo (RestStatus .TOO_MANY_REQUESTS ));
152
+ assertThat (failure .get ().getMessage (), equalTo ("test queue" ));
153
+ } finally {
154
+ for (String node : nodes ) {
155
+ ((MockTransportService ) internalCluster ().getInstance (TransportService .class , node )).clearAllRules ();
156
+ }
157
+ }
158
+ }
159
+
100
160
static class SearchContextCounter {
101
161
private final int maxAllowed ;
102
162
private final AtomicInteger current = new AtomicInteger ();
0 commit comments