4545import java .util .concurrent .atomic .AtomicInteger ;
4646import java .util .concurrent .atomic .AtomicReference ;
4747
48+ import static org .hamcrest .Matchers .allOf ;
4849import static org .hamcrest .Matchers .equalTo ;
50+ import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
4951import static org .hamcrest .Matchers .instanceOf ;
52+ import static org .hamcrest .Matchers .lessThan ;
5053import static org .hamcrest .Matchers .lessThanOrEqualTo ;
5154
5255/**
@@ -264,7 +267,7 @@ public void testLimitConcurrentShards() {
264267 public void testCancelUnnecessaryRequests () {
265268 assumeTrue ("Requires pragmas" , canUseQueryPragmas ());
266269 internalCluster ().ensureAtLeastNumDataNodes (3 );
267-
270+ var dataNodes = internalCluster (). numDataNodes ();
268271 var coordinatingNode = internalCluster ().getNodeNames ()[0 ];
269272
270273 var exchanges = new AtomicInteger (0 );
@@ -281,9 +284,9 @@ public void testCancelUnnecessaryRequests() {
281284 query .query ("from test-* | LIMIT 1" );
282285 query .pragmas (new QueryPragmas (Settings .builder ().put (QueryPragmas .MAX_CONCURRENT_NODES_PER_CLUSTER .getKey (), 1 ).build ()));
283286
284- try (var result = safeGet (client ().execute (EsqlQueryAction .INSTANCE , query ))) {
287+ try (var result = safeGet (client (coordinatingNode ).execute (EsqlQueryAction .INSTANCE , query ))) {
285288 assertThat (Iterables .size (result .rows ()), equalTo (1L ));
286- assertThat (exchanges .get (), lessThanOrEqualTo ( 2 ));
289+ assertThat (exchanges .get (), allOf ( greaterThanOrEqualTo ( 1 ), lessThan ( dataNodes ) ));
287290 } finally {
288291 coordinatorNodeTransport .clearAllRules ();
289292 }
0 commit comments