@@ -249,7 +249,7 @@ public final void testFetchAll() throws IOException {
249249 List <?> columns = (List <?>) response .get ("columns" );
250250 List <?> values = (List <?>) response .get ("values" );
251251
252- MapMatcher expectedColumns = allTypesColumnsMatcher (coordinatorVersion , minVersion (), indexMode , extractPreference , true );
252+ MapMatcher expectedColumns = allTypesColumnsMatcher (coordinatorVersion , minVersion (), indexMode , extractPreference , true , true );
253253 assertMap (nameToType (columns ), expectedColumns );
254254
255255 MapMatcher expectedAllValues = matchesMap ();
@@ -261,6 +261,7 @@ public final void testFetchAll() throws IOException {
261261 indexMode ,
262262 extractPreference ,
263263 true ,
264+ true ,
264265 indexName
265266 );
266267 expectedAllValues = expectedAllValues .entry (indexName , expectedValues );
@@ -283,13 +284,17 @@ protected static MapMatcher allTypesColumnsMatcher(
283284 TransportVersion minimumVersion ,
284285 IndexMode indexMode ,
285286 MappedFieldType .FieldExtractPreference extractPreference ,
286- boolean expectMetadataFields
287+ boolean expectMetadataFields ,
288+ boolean expectNonEnrichableFields
287289 ) {
288290 MapMatcher expectedColumns = matchesMap ().entry (LOOKUP_ID_FIELD , "integer" );
289291 for (DataType type : DataType .values ()) {
290292 if (supportedInIndex (type ) == false ) {
291293 continue ;
292294 }
295+ if (expectNonEnrichableFields == false && supportedInEnrich (type ) == false ) {
296+ continue ;
297+ }
293298 expectedColumns = expectedColumns .entry (fieldName (type ), expectedType (type , coordinatorVersion , minimumVersion , indexMode ));
294299 }
295300 if (expectMetadataFields ) {
@@ -310,6 +315,7 @@ protected static MapMatcher allTypesValuesMatcher(
310315 IndexMode indexMode ,
311316 MappedFieldType .FieldExtractPreference extractPreference ,
312317 boolean expectMetadataFields ,
318+ boolean expectNonEnrichableFields ,
313319 String indexName
314320 ) {
315321 MapMatcher expectedValues = matchesMap ();
@@ -318,6 +324,9 @@ protected static MapMatcher allTypesValuesMatcher(
318324 if (supportedInIndex (type ) == false ) {
319325 continue ;
320326 }
327+ if (expectNonEnrichableFields == false && supportedInEnrich (type ) == false ) {
328+ continue ;
329+ }
321330 expectedValues = expectedValues .entry (
322331 fieldName (type ),
323332 expectedValue (type , coordinatorVersion , minimumVersion , indexMode , extractPreference )
@@ -336,47 +345,6 @@ protected static MapMatcher allTypesValuesMatcher(
336345 return expectedValues ;
337346 }
338347
339- protected static void assertFetchAllResponse (
340- Tuple <Map <String , Object >, TransportVersion > responseAndCoordinatorVersion ,
341- Map <String , NodeInfo > expectedIndices ,
342- TransportVersion minimumVersion ,
343- IndexMode indexMode ,
344- MappedFieldType .FieldExtractPreference extractPreference ,
345- boolean expectMetadataFields
346- ) {
347- Map <String , Object > response = responseAndCoordinatorVersion .v1 ();
348- TransportVersion coordinatorVersion = responseAndCoordinatorVersion .v2 ();
349-
350- assertNoPartialResponse (response );
351-
352- List <?> columns = (List <?>) response .get ("columns" );
353- List <?> values = (List <?>) response .get ("values" );
354-
355- MapMatcher expectedColumns = allTypesColumnsMatcher (
356- coordinatorVersion ,
357- minimumVersion ,
358- indexMode ,
359- extractPreference ,
360- expectMetadataFields
361- );
362- assertMap (nameToType (columns ), expectedColumns );
363-
364- MapMatcher expectedAllValues = matchesMap ();
365- for (Map .Entry <String , NodeInfo > e : expectedIndices .entrySet ()) {
366- String indexName = e .getKey ();
367- MapMatcher expectedValues = allTypesValuesMatcher (
368- coordinatorVersion ,
369- minimumVersion ,
370- indexMode ,
371- extractPreference ,
372- expectMetadataFields ,
373- indexName
374- );
375- expectedAllValues = expectedAllValues .entry (indexName , expectedValues );
376- }
377- assertMap (indexToRow (columns , values ), expectedAllValues );
378- }
379-
380348 /**
381349 * Tests fetching {@code dense_vector} if possible. Uses the {@code dense_vector_agg_metric_double_if_fns}
382350 * work around if required.
@@ -537,7 +505,6 @@ public void testRow() throws IOException {
537505 assertMinimumVersion (coordinatorVersion , responseAndCoordinatorVersion );
538506 }
539507
540- // TODO: ROW + local ENRICH
541508 @ SuppressWarnings ("unchecked" )
542509 public void testRowLookupJoin () throws IOException {
543510 assumeTrue ("Test only requires lookup indices" , indexMode == IndexMode .LOOKUP );
@@ -563,6 +530,50 @@ public void testRowLookupJoin() throws IOException {
563530 expectedMinimumVersion ,
564531 indexMode ,
565532 extractPreference ,
533+ false ,
534+ true
535+ );
536+ assertMap (nameToType (columns ), expectedColumns );
537+
538+ MapMatcher expectedValues = allTypesValuesMatcher (
539+ coordinatorVersion ,
540+ expectedMinimumVersion ,
541+ indexMode ,
542+ extractPreference ,
543+ false ,
544+ true ,
545+ null
546+ );
547+ assertMap (nameToValue (names (columns ), (List <Object >) values .getFirst ()), expectedValues );
548+ }
549+ }
550+
551+ @ SuppressWarnings ("unchecked" )
552+ public void testRowEnrich () throws IOException {
553+ assumeTrue ("Test only requires lookup indices" , indexMode == IndexMode .LOOKUP );
554+ Map <String , NodeInfo > expectedIndices = expectedIndices (IndexMode .LOOKUP , true );
555+ for (Map .Entry <String , NodeInfo > e : expectedIndices .entrySet ()) {
556+ String policyName = e .getKey () + "_policy" ;
557+ String query = "ROW " + LOOKUP_ID_FIELD + " = 123 | ENRICH " + policyName + " ON " + LOOKUP_ID_FIELD + " | LIMIT 1" ;
558+ var responseAndCoordinatorVersion = runQuery (query );
559+ TransportVersion expectedMinimumVersion = minVersion (true );
560+
561+ assertMinimumVersion (expectedMinimumVersion , responseAndCoordinatorVersion );
562+
563+ Map <String , Object > response = responseAndCoordinatorVersion .v1 ();
564+ TransportVersion coordinatorVersion = responseAndCoordinatorVersion .v2 ();
565+
566+ assertNoPartialResponse (response );
567+
568+ List <?> columns = (List <?>) response .get ("columns" );
569+ List <?> values = (List <?>) response .get ("values" );
570+
571+ MapMatcher expectedColumns = allTypesColumnsMatcher (
572+ coordinatorVersion ,
573+ expectedMinimumVersion ,
574+ indexMode ,
575+ extractPreference ,
576+ false ,
566577 false
567578 );
568579 assertMap (nameToType (columns ), expectedColumns );
@@ -573,6 +584,7 @@ public void testRowLookupJoin() throws IOException {
573584 indexMode ,
574585 extractPreference ,
575586 false ,
587+ false ,
576588 null
577589 );
578590 assertMap (nameToValue (names (columns ), (List <Object >) values .getFirst ()), expectedValues );
@@ -653,6 +665,12 @@ protected static void createIndexForNode(RestClient client, String nodeName, Str
653665 if (false == indexExists (client , indexName )) {
654666 createAllTypesIndex (client , indexName , nodeId , mode );
655667 createAllTypesDoc (client , indexName );
668+ // We create an enrich policy for each lookup index. That's a bit of an arbitrary choice, but it's probably a good idea to
669+ // create 1 enrich policy per node, so we potentially detect misbehavior that stems from enrich policies being based on indices
670+ // that live on newer or older nodes.
671+ if (mode == IndexMode .LOOKUP ) {
672+ createEnrichPolicy (client , indexName );
673+ }
656674 }
657675 }
658676
@@ -765,8 +783,35 @@ private static void createAllTypesDoc(RestClient client, String indexName) throw
765783 client .performRequest (request );
766784 }
767785
768- private Matcher <?> expectedValue (DataType type , TransportVersion coordinatorVersion ) throws IOException {
769- return expectedValue (type , coordinatorVersion , minVersion (), indexMode , extractPreference );
786+ private static void createEnrichPolicy (RestClient client , String indexName ) throws IOException {
787+ String policyName = indexName + "_policy" ;
788+
789+ XContentBuilder policyConfig = JsonXContent .contentBuilder ().startObject ();
790+ {
791+ policyConfig .startObject ("match" );
792+
793+ policyConfig .field ("indices" , indexName );
794+ policyConfig .field ("match_field" , LOOKUP_ID_FIELD );
795+ List <String > enrichFields = new ArrayList <>();
796+ for (DataType type : DataType .values ()) {
797+ if (supportedInIndex (type ) == false || supportedInEnrich (type ) == false ) {
798+ continue ;
799+ }
800+ enrichFields .add (fieldName (type ));
801+ }
802+ policyConfig .field ("enrich_fields" , enrichFields );
803+
804+ policyConfig .endObject ();
805+ }
806+ policyConfig .endObject ();
807+
808+ Request request = new Request ("PUT" , "_enrich/policy/" + policyName );
809+ request .setJsonEntity (Strings .toString (policyConfig ));
810+ client .performRequest (request );
811+
812+ Request execute = new Request ("PUT" , "_enrich/policy/" + policyName + "/_execute" );
813+ request .addParameter ("wait_for_completion" , "true" );
814+ client .performRequest (execute );
770815 }
771816
772817 private static Matcher <?> expectedValue (
@@ -834,6 +879,20 @@ private static boolean supportedInIndex(DataType t) {
834879 };
835880 }
836881
882+ /**
883+ * Is the type supported in enrich policies?
884+ */
885+ private static boolean supportedInEnrich (DataType t ) {
886+ return switch (t ) {
887+ // Enrich policies don't work with types that have mandatory fields in the mapping.
888+ // https://github.com/elastic/elasticsearch/issues/127350
889+ case AGGREGATE_METRIC_DOUBLE , SCALED_FLOAT ,
890+ // https://github.com/elastic/elasticsearch/issues/137699
891+ DENSE_VECTOR -> false ;
892+ default -> true ;
893+ };
894+ }
895+
837896 private static Map <String , Object > nameToType (List <?> columns ) {
838897 Map <String , Object > result = new TreeMap <>();
839898 for (Object c : columns ) {
0 commit comments