77
88package org .elasticsearch .xpack .esql .action ;
99
10+ import org .elasticsearch .ExceptionsHelper ;
11+ import org .elasticsearch .ResourceNotFoundException ;
1012import org .elasticsearch .client .internal .Client ;
1113import org .elasticsearch .common .Strings ;
1214import org .elasticsearch .common .breaker .CircuitBreaker ;
2123import org .elasticsearch .transport .TransportService ;
2224import org .elasticsearch .xcontent .XContentBuilder ;
2325import org .elasticsearch .xcontent .json .JsonXContent ;
26+ import org .elasticsearch .xpack .esql .EsqlTestUtils ;
2427import org .elasticsearch .xpack .esql .plugin .ComputeService ;
2528
2629import java .io .IOException ;
@@ -148,6 +151,7 @@ public void testOneRemoteClusterPartial() throws Exception {
148151
149152 public void testFailToReceiveClusterResponse () throws Exception {
150153 populateIndices ();
154+ Exception simulatedFailure = randomFailure ();
151155 // fetched pages, but failed to receive the cluster response
152156 for (TransportService transportService : cluster (REMOTE_CLUSTER_1 ).getInstances (TransportService .class )) {
153157 MockTransportService ts = asInstanceOf (MockTransportService .class , transportService );
@@ -161,7 +165,7 @@ public String getProfileName() {
161165
162166 @ Override
163167 public void sendResponse (TransportResponse response ) {
164- sendResponse (new CircuitBreakingException ( "simulated" , CircuitBreaker . Durability . PERMANENT ) );
168+ sendResponse (simulatedFailure );
165169 }
166170
167171 @ Override
@@ -177,8 +181,10 @@ public void sendResponse(Exception exception) {
177181 request .includeCCSMetadata (randomBoolean ());
178182 {
179183 request .allowPartialResults (false );
180- var error = expectThrows (CircuitBreakingException .class , () -> runQuery (request ).close ());
181- assertThat (error .getMessage (), equalTo ("simulated" ));
184+ Exception error = expectThrows (Exception .class , () -> runQuery (request ).close ());
185+ var unwrapped = ExceptionsHelper .unwrap (error , simulatedFailure .getClass ());
186+ assertNotNull (unwrapped );
187+ assertThat (unwrapped .getMessage (), equalTo (simulatedFailure .getMessage ()));
182188 }
183189 request .allowPartialResults (true );
184190 try (var resp = runQuery (request )) {
@@ -211,25 +217,27 @@ public void sendResponse(Exception exception) {
211217
212218 public void testFailToStartRequestOnRemoteCluster () throws Exception {
213219 populateIndices ();
220+ Exception simulatedFailure = randomFailure ();
214221 for (TransportService transportService : cluster (REMOTE_CLUSTER_1 ).getInstances (TransportService .class )) {
215222 MockTransportService ts = asInstanceOf (MockTransportService .class , transportService );
216223 String actionToFail = randomFrom (
217224 ExchangeService .EXCHANGE_ACTION_NAME ,
218225 ExchangeService .OPEN_EXCHANGE_ACTION_NAME ,
219226 ComputeService .CLUSTER_ACTION_NAME
220227 );
221- ts .addRequestHandlingBehavior (actionToFail , (handler , request , channel , task ) -> {
222- channel .sendResponse (new IllegalStateException ("simulated" ));
223- });
228+ ts .addRequestHandlingBehavior (actionToFail , (handler , request , channel , task ) -> { channel .sendResponse (simulatedFailure ); });
224229 }
225230 try {
226231 EsqlQueryRequest request = new EsqlQueryRequest ();
227232 request .query ("FROM ok*,*a:ok* | KEEP id" );
228233 request .includeCCSMetadata (randomBoolean ());
229234 {
230235 request .allowPartialResults (false );
231- var error = expectThrows (IllegalStateException .class , () -> runQuery (request ).close ());
232- assertThat (error .getMessage (), containsString ("simulated" ));
236+ var error = expectThrows (Exception .class , () -> runQuery (request ).close ());
237+ EsqlTestUtils .assertEsqlFailure (error );
238+ var unwrapped = ExceptionsHelper .unwrap (error , simulatedFailure .getClass ());
239+ assertNotNull (unwrapped );
240+ assertThat (unwrapped .getMessage (), equalTo (simulatedFailure .getMessage ()));
233241 }
234242 request .allowPartialResults (true );
235243 try (var resp = runQuery (request )) {
@@ -260,6 +268,15 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception {
260268 }
261269 }
262270
271+ private static Exception randomFailure () {
272+ return randomFrom (
273+ new IllegalStateException ("driver was closed already" ),
274+ new CircuitBreakingException ("low memory" , CircuitBreaker .Durability .PERMANENT ),
275+ new IOException ("broken disk" ),
276+ new ResourceNotFoundException ("exchange sink was not found" )
277+ );
278+ }
279+
263280 private Set <String > populateIndex (String clusterAlias , String indexName , int numShards ) {
264281 Client client = client (clusterAlias );
265282 assertAcked (
@@ -309,7 +326,7 @@ private Set<String> populateIndexWithFailingFields(String clusterAlias, String i
309326 );
310327 Set <String > ids = new HashSet <>();
311328 String tag = clusterAlias .isEmpty () ? "local" : clusterAlias ;
312- int numDocs = between (1 , 100 ); // large enough to have failing documents in every shard
329+ int numDocs = between (50 , 100 ); // large enough to have failing documents in every shard
313330 for (int i = 0 ; i < numDocs ; i ++) {
314331 String id = Long .toString (nextDocId .incrementAndGet ());
315332 client .prepareIndex (indexName ).setSource ("id" , id , "tag" , tag , "v" , i ).get ();
0 commit comments