1414import org .elasticsearch .action .index .IndexRequest ;
1515import org .elasticsearch .action .support .WriteRequest ;
1616import org .elasticsearch .cluster .metadata .IndexMetadata ;
17+ import org .elasticsearch .cluster .node .DiscoveryNode ;
1718import org .elasticsearch .common .settings .Settings ;
19+ import org .elasticsearch .common .util .CollectionUtils ;
20+ import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
1821import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
1922import org .elasticsearch .compute .operator .exchange .ExchangeService ;
23+ import org .elasticsearch .core .TimeValue ;
2024import org .elasticsearch .plugins .Plugin ;
2125import org .elasticsearch .rest .RestStatus ;
2226import org .elasticsearch .search .MockSearchService ;
2630import org .elasticsearch .transport .TransportChannel ;
2731import org .elasticsearch .transport .TransportResponse ;
2832import org .elasticsearch .transport .TransportService ;
33+ import org .elasticsearch .xpack .esql .plugin .ComputeService ;
2934import org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
3035import org .hamcrest .Matchers ;
3136import org .junit .Before ;
@@ -56,6 +61,18 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
5661 return plugins ;
5762 }
5863
64+ @ Override
65+ protected Collection <Class <? extends Plugin >> nodePlugins () {
66+ return CollectionUtils .appendToCopy (super .nodePlugins (), InternalExchangePlugin .class );
67+ }
68+
69+ @ Override
70+ protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
71+ return Settings .builder ()
72+ .put (ExchangeService .INACTIVE_SINKS_INTERVAL_SETTING , TimeValue .timeValueMillis (between (3000 , 5000 )))
73+ .build ();
74+ }
75+
5976 @ Before
6077 public void setupIndices () {
6178 int numIndices = between (10 , 20 );
@@ -113,32 +130,64 @@ public void testConcurrentQueries() throws Exception {
113130 }
114131
115132 public void testRejection () throws Exception {
116- String [] nodes = internalCluster ().getNodeNames ();
117- for (String node : nodes ) {
118- MockTransportService ts = (MockTransportService ) internalCluster ().getInstance (TransportService .class , node );
119- ts .addRequestHandlingBehavior (ExchangeService .EXCHANGE_ACTION_NAME , (handler , request , channel , task ) -> {
120- handler .messageReceived (request , new TransportChannel () {
121- @ Override
122- public String getProfileName () {
123- return channel .getProfileName ();
124- }
125-
126- @ Override
127- public void sendResponse (TransportResponse response ) {
128- channel .sendResponse (new RemoteTransportException ("simulated" , new EsRejectedExecutionException ("test queue" )));
129- }
130-
131- @ Override
132- public void sendResponse (Exception exception ) {
133- channel .sendResponse (exception );
134- }
135- }, task );
133+ DiscoveryNode dataNode = randomFrom (internalCluster ().clusterService ().state ().nodes ().getDataNodes ().values ());
134+ String indexName = "single-node-index" ;
135+ client ().admin ()
136+ .indices ()
137+ .prepareCreate (indexName )
138+ .setSettings (
139+ Settings .builder ()
140+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
141+ .put ("index.routing.allocation.require._name" , dataNode .getName ())
142+ )
143+ .setMapping ("user" , "type=keyword" , "tags" , "type=keyword" )
144+ .get ();
145+ client ().prepareIndex (indexName )
146+ .setSource ("user" , "u1" , "tags" , "lucene" )
147+ .setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE )
148+ .get ();
149+
150+ MockTransportService ts = (MockTransportService ) internalCluster ().getInstance (TransportService .class , dataNode .getName ());
151+ CountDownLatch dataNodeRequestLatch = new CountDownLatch (1 );
152+ ts .addRequestHandlingBehavior (ComputeService .DATA_ACTION_NAME , (handler , request , channel , task ) -> {
153+ handler .messageReceived (request , channel , task );
154+ dataNodeRequestLatch .countDown ();
155+ });
156+
157+ ts .addRequestHandlingBehavior (ExchangeService .EXCHANGE_ACTION_NAME , (handler , request , channel , task ) -> {
158+ ts .getThreadPool ().generic ().execute (new AbstractRunnable () {
159+ @ Override
160+ public void onFailure (Exception e ) {
161+ channel .sendResponse (e );
162+ }
163+
164+ @ Override
165+ protected void doRun () throws Exception {
166+ assertTrue (dataNodeRequestLatch .await (30 , TimeUnit .SECONDS ));
167+ handler .messageReceived (request , new TransportChannel () {
168+ @ Override
169+ public String getProfileName () {
170+ return channel .getProfileName ();
171+ }
172+
173+ @ Override
174+ public void sendResponse (TransportResponse response ) {
175+ channel .sendResponse (new RemoteTransportException ("simulated" , new EsRejectedExecutionException ("test queue" )));
176+ }
177+
178+ @ Override
179+ public void sendResponse (Exception exception ) {
180+ channel .sendResponse (exception );
181+ }
182+ }, task );
183+ }
136184 });
137- }
185+ });
186+
138187 try {
139188 AtomicReference <Exception > failure = new AtomicReference <>();
140189 EsqlQueryRequest request = new EsqlQueryRequest ();
141- request .query ("from test-* | stats count(user) by tags" );
190+ request .query ("from single-node-index | stats count(user) by tags" );
142191 request .acceptedPragmaRisks (true );
143192 request .pragmas (randomPragmas ());
144193 CountDownLatch queryLatch = new CountDownLatch (1 );
@@ -151,9 +200,7 @@ public void sendResponse(Exception exception) {
151200 assertThat (ExceptionsHelper .status (failure .get ()), equalTo (RestStatus .TOO_MANY_REQUESTS ));
152201 assertThat (failure .get ().getMessage (), equalTo ("test queue" ));
153202 } finally {
154- for (String node : nodes ) {
155- ((MockTransportService ) internalCluster ().getInstance (TransportService .class , node )).clearAllRules ();
156- }
203+ ts .clearAllRules ();
157204 }
158205 }
159206
0 commit comments