@@ -236,7 +236,15 @@ public void skipSnapshots() throws IOException {
236236 }
237237
238238 public final void testFetchAll () throws IOException {
239- var responseAndCoordinatorVersion = runFromAllQuery ("""
239+ doTestFetchAll ("*:%mode%*,%mode%*" , allNodeToInfo (), allNodeToInfo ());
240+ }
241+
242+ protected final void doTestFetchAll (
243+ String indexPattern ,
244+ Map <String , NodeInfo > nodesContributingIndices ,
245+ Map <String , NodeInfo > nodesInvolvedInExecution
246+ ) throws IOException {
247+ var responseAndCoordinatorVersion = runFromAllQuery (indexPattern , """
240248 , _id, _ignored, _index_mode, _score, _source, _version
241249 | LIMIT 1000
242250 """ );
@@ -253,7 +261,7 @@ public final void testFetchAll() throws IOException {
253261 assertMap (nameToType (columns ), expectedColumns );
254262
255263 MapMatcher expectedAllValues = matchesMap ();
256- for (Map .Entry <String , NodeInfo > e : expectedIndices (indexMode ).entrySet ()) {
264+ for (Map .Entry <String , NodeInfo > e : expectedIndices (indexMode , nodesContributingIndices ).entrySet ()) {
257265 String indexName = e .getKey ();
258266 MapMatcher expectedValues = allTypesValuesMatcher (
259267 coordinatorVersion ,
@@ -268,7 +276,7 @@ public final void testFetchAll() throws IOException {
268276 }
269277 assertMap (indexToRow (columns , values ), expectedAllValues );
270278
271- assertMinimumVersionFromAllQueries ( responseAndCoordinatorVersion );
279+ assertMinimumVersion ( minVersion ( nodesInvolvedInExecution ), responseAndCoordinatorVersion );
272280
273281 profileLogger .clearProfile ();
274282 }
@@ -405,7 +413,6 @@ public final void testFetchDenseVector() throws IOException {
405413 MapMatcher expectedAllValues = matchesMap ();
406414 for (Map .Entry <String , NodeInfo > e : expectedIndices (indexMode ).entrySet ()) {
407415 String indexName = e .getKey ();
408- NodeInfo nodeInfo = e .getValue ();
409416 MapMatcher expectedValues = matchesMap ();
410417 expectedValues = expectedValues .entry ("f_dense_vector" , matchesList ().item (0.5 ).item (10.0 ).item (5.9999995 ));
411418 expectedValues = expectedValues .entry ("_index" , indexName );
@@ -487,10 +494,11 @@ public final void testFetchAggregateMetricDouble() throws IOException {
487494 }
488495
489496 private Tuple <Map <String , Object >, TransportVersion > runFromAllQuery (String restOfQuery ) throws IOException {
490- var responseAndCoordinatorVersion = runQuery (
491- "FROM *:%mode%*,%mode%* METADATA _index" .replace ("%mode%" , indexMode .toString ()) + restOfQuery
492- );
493- return responseAndCoordinatorVersion ;
497+ return runFromAllQuery ("*:%mode%*,%mode%*" , restOfQuery );
498+ }
499+
500+ private Tuple <Map <String , Object >, TransportVersion > runFromAllQuery (String indexPattern , String restOfQuery ) throws IOException {
501+ return runQuery (("FROM " + indexPattern + " METADATA _index" ).replace ("%mode%" , indexMode .toString ()) + restOfQuery );
494502 }
495503
496504 public void testRow () throws IOException {
@@ -501,18 +509,18 @@ public void testRow() throws IOException {
501509 String query = "ROW x = 1 | LIMIT 1" ;
502510 var responseAndCoordinatorVersion = runQuery (query );
503511
504- assertMinimumVersion (minVersion (true ), responseAndCoordinatorVersion , false );
512+ assertMinimumVersion (minVersion (localNodeToInfo () ), responseAndCoordinatorVersion , false );
505513 }
506514
507515 @ SuppressWarnings ("unchecked" )
508516 public void testRowLookupJoin () throws IOException {
509517 assumeTrue ("Test only requires lookup indices" , indexMode == IndexMode .LOOKUP );
510- Map <String , NodeInfo > expectedIndices = expectedIndices (IndexMode .LOOKUP , true );
518+ Map <String , NodeInfo > expectedIndices = expectedIndices (IndexMode .LOOKUP , localNodeToInfo () );
511519 for (Map .Entry <String , NodeInfo > e : expectedIndices .entrySet ()) {
512520 String indexName = e .getKey ();
513521 String query = "ROW " + LOOKUP_ID_FIELD + " = 123 | LOOKUP JOIN " + indexName + " ON " + LOOKUP_ID_FIELD + " | LIMIT 1" ;
514522 var responseAndCoordinatorVersion = runQuery (query );
515- TransportVersion expectedMinimumVersion = minVersion (true );
523+ TransportVersion expectedMinimumVersion = minVersion (localNodeToInfo () );
516524
517525 assertMinimumVersion (expectedMinimumVersion , responseAndCoordinatorVersion );
518526
@@ -550,18 +558,18 @@ public void testRowLookupJoin() throws IOException {
550558 @ SuppressWarnings ("unchecked" )
551559 public void testRowEnrich () throws IOException {
552560 assumeTrue ("Test only requires lookup indices" , indexMode == IndexMode .LOOKUP );
553- Map <String , NodeInfo > expectedIndices = expectedIndices (IndexMode .LOOKUP , true );
561+ Map <String , NodeInfo > expectedIndices = expectedIndices (IndexMode .LOOKUP , localNodeToInfo () );
554562 for (Map .Entry <String , NodeInfo > e : expectedIndices .entrySet ()) {
555563 String policyName = e .getKey () + "_policy" ;
556564 String query = "ROW " + LOOKUP_ID_FIELD + " = 123 | ENRICH " + policyName + " ON " + LOOKUP_ID_FIELD + " | LIMIT 1" ;
557565 var responseAndCoordinatorVersion = runQuery (query );
558566 Map <String , Object > response = responseAndCoordinatorVersion .v1 ();
559567 TransportVersion coordinatorVersion = responseAndCoordinatorVersion .v2 ();
560- TransportVersion expectedMinimumVersion = minVersion (true );
568+ TransportVersion expectedMinimumVersion = minVersion (localNodeToInfo () );
561569
562570 Map <String , Object > profile = (Map <String , Object >) response .get ("profile" );
563571 Integer actualMinimumVersion = (Integer ) profile .get ("minimumTransportVersion" );
564- if (minVersion (true ).supports (ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION )
572+ if (minVersion (localNodeToInfo () ).supports (ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION )
565573 // Some nodes don't send back the minimum transport version because they're too old to do that.
566574 // In this case, the determined minimum version will be that of the coordinator.
567575 || (coordinatorVersion .supports (ESQL_USE_MINIMUM_VERSION_FOR_ENRICH_RESOLUTION )
@@ -1006,11 +1014,10 @@ private static boolean syntheticSourceByDefault(IndexMode indexMode) {
10061014 }
10071015
10081016 private Map <String , NodeInfo > expectedIndices (IndexMode indexMode ) throws IOException {
1009- return expectedIndices (indexMode , false );
1017+ return expectedIndices (indexMode , allNodeToInfo () );
10101018 }
10111019
1012- private Map <String , NodeInfo > expectedIndices (IndexMode indexMode , boolean onlyLocalCluster ) throws IOException {
1013- Map <String , NodeInfo > nodeToInfo = onlyLocalCluster ? localNodeToInfo () : allNodeToInfo ();
1020+ protected Map <String , NodeInfo > expectedIndices (IndexMode indexMode , Map <String , NodeInfo > nodeToInfo ) throws IOException {
10141021 Map <String , NodeInfo > result = new TreeMap <>();
10151022 if (supportsNodeAssignment ()) {
10161023 for (Map .Entry <String , NodeInfo > e : nodeToInfo .entrySet ()) {
@@ -1044,11 +1051,10 @@ private Map<String, NodeInfo> expectedIndices(IndexMode indexMode, boolean onlyL
10441051 }
10451052
10461053 protected TransportVersion minVersion () throws IOException {
1047- return minVersion (false );
1054+ return minVersion (allNodeToInfo () );
10481055 }
10491056
1050- protected TransportVersion minVersion (boolean onlyLocalCluster ) throws IOException {
1051- Map <String , NodeInfo > nodeToInfo = onlyLocalCluster ? localNodeToInfo () : allNodeToInfo ();
1057+ protected TransportVersion minVersion (Map <String , NodeInfo > nodeToInfo ) throws IOException {
10521058 return nodeToInfo .values ().stream ().map (NodeInfo ::version ).min (Comparator .naturalOrder ()).get ();
10531059 }
10541060}
0 commit comments