diff --git a/docs/changelog/125462.yaml b/docs/changelog/125462.yaml new file mode 100644 index 0000000000000..ac41311f289c1 --- /dev/null +++ b/docs/changelog/125462.yaml @@ -0,0 +1,5 @@ +pr: 125462 +summary: Unique ids when replacing missing fields +area: ES|QL +type: bug +issues: [] diff --git a/docs/changelog/125656.yaml b/docs/changelog/125656.yaml new file mode 100644 index 0000000000000..7acf10dcc69a9 --- /dev/null +++ b/docs/changelog/125656.yaml @@ -0,0 +1,6 @@ +pr: 125656 +summary: Unique ids when replacing missing fields +area: ES|QL +type: bug +issues: + - 121754 diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index b25f57b77f074..2cd93d56bbe7a 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -986,6 +986,32 @@ public void testDoubleParamsWithLookupJoin() throws IOException { ); } + public void testMultipleBatchesWithLookupJoin() throws IOException { + assumeTrue( + "Requires new null alias ids for join with multiple batches", + EsqlCapabilities.Cap.REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES.isEnabled() + ); + // Create more than 10 indices to trigger multiple batches of data node execution. + // The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout + for (int i = 1; i <= 20; i++) { + createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}"); + } + bulkLoadTestDataLookupMode(10); + // lookup join with and without sort + for (String sort : List.of("", "| sort integer")) { + var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort)); + Map result = runEsql(query); + var columns = as(result.get("columns"), List.class); + assertEquals(21, columns.size()); + var values = as(result.get("values"), List.class); + assertEquals(10, values.size()); + } + // clean up + for (int i = 1; i <= 20; i++) { + assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true)); + } + } + private void validateResultsOfDoubleParametersForIdentifiers(RequestObjectBuilder query) throws IOException { Map result = runEsql(query); Map colA = Map.of("name", "boolean", "type", "boolean"); @@ -1668,6 +1694,13 @@ private static String repeatValueAsMV(Object value) { return "[" + value + ", " + value + "]"; } + private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException { + Request request = new Request("PUT", "/" + indexName); + String settings = "\"settings\" : {\"mode\" : \"lookup\"}, "; + request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}"); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + } + public static RequestObjectBuilder requestObjectBuilder() throws IOException { return new RequestObjectBuilder(); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 1b5de3283fe63..82cace2b45ee9 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword 10092 | 1 | English 10093 | 3 | Spanish ; + +multipleBatchesWithSort +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| sort language_code, birth_date +| keep language_code +| limit 1 +; + +language_code:integer +1 +; + +multipleBatchesWithMvExpand +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| mv_expand birth_date +| sort birth_date, language_code +| limit 1 +; + +birth_date:datetime |language_code:integer +1952-02-27T00:00:00.000Z |null +; + +multipleBatchesWithAggregate1 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats x=max(birth_date), y=min(language_code) +; + +x:datetime |y:integer +1965-01-03T00:00:00.000Z |1 +; + +multipleBatchesWithAggregate2 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats m=min(birth_date) by language_code +| sort language_code +| limit 1 +; + +m:datetime |language_code:integer +null |1 +; + +multipleBatchesWithAggregate3 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats m=min(language_code) by birth_date +| sort birth_date +| limit 1 +; + +m:integer |birth_date:datetime +null |1952-02-27T00:00:00.000Z +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 3846d0e8a65a7..d7b4654643ba7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -932,7 +932,12 @@ public enum Cap { /** * Index component selector syntax (my-data-stream-name::failures) */ - INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()); + INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()), + + /** + * Create null alias with new id in ReplaceMissingFieldWithNull when there is lookup join with multiple batches. + */ + REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java index d36fae54f5162..4d1cacfbdfb2f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java @@ -10,6 +10,8 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.IndexMode; import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.AttributeMap; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.Literal; @@ -17,7 +19,6 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField; import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext; -import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; @@ -50,28 +51,32 @@ public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLog } }); - return plan.transformUp(p -> missingToNull(p, localLogicalOptimizerContext.searchStats(), lookupFields)); + AttributeMap fieldAttrReplacedBy = new AttributeMap(); + + return plan.transformUp(p -> missingToNull(p, localLogicalOptimizerContext.searchStats(), lookupFields, fieldAttrReplacedBy)); } - private LogicalPlan missingToNull(LogicalPlan plan, SearchStats stats, AttributeSet lookupFields) { + private LogicalPlan missingToNull( + LogicalPlan plan, + SearchStats stats, + AttributeSet lookupFields, + AttributeMap fieldAttrReplacedBy + ) { if (plan instanceof EsRelation || plan instanceof LocalRelation) { return plan; - } + } else if (plan instanceof Project) { + // Only create null literals for newly encountered missing fields + plan = resolveAlreadyReplacedAttributes(plan, fieldAttrReplacedBy); + Project project = (Project) plan; - if (plan instanceof Aggregate a) { - // don't do anything (for now) - return a; - } - // keep the aliased name - else if (plan instanceof Project project) { var projections = project.projections(); List newProjections = new ArrayList<>(projections.size()); Map nullLiteral = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size()); AttributeSet joinAttributes = joinAttributes(project); for (NamedExpression projection : projections) { - // Do not use the attribute name, this can deviate from the field name for union types. if (projection instanceof FieldAttribute f + // Do not use the attribute name, this can deviate from the field name for union types && stats.exists(f.fieldName()) == false && joinAttributes.contains(f) == false && f.field() instanceof PotentiallyUnmappedKeywordEsField == false) { @@ -81,24 +86,28 @@ else if (plan instanceof Project project) { Alias nullAlias = nullLiteral.get(f.dataType()); // save the first field as null (per datatype) if (nullAlias == null) { - Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id()); + Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), null); nullLiteral.put(dt, alias); projection = alias.toAttribute(); } // otherwise point to it else { // since avoids creating field copies - projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), f.id()); + projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), null); } + fieldAttrReplacedBy.put(f, projection.toAttribute()); } newProjections.add(projection); } - // add the first found field as null + + // Add an EVAL ahead of the Project to create null literals if (nullLiteral.size() > 0) { plan = new Eval(project.source(), project.child(), new ArrayList<>(nullLiteral.values())); plan = new Project(project.source(), plan, newProjections); } + + return plan; } else if (plan instanceof Eval || plan instanceof Filter || plan instanceof OrderBy @@ -113,8 +122,22 @@ else if (plan instanceof Project project) { ? f : Literal.of(f, null) ); + + return plan; } + // If we replaced any field attribute by a null literal previously, we must reference the null literal now - the initial field + // attribute is no longer available (projected away). + return resolveAlreadyReplacedAttributes(plan, fieldAttrReplacedBy); + } + + private LogicalPlan resolveAlreadyReplacedAttributes(LogicalPlan plan, AttributeMap fieldAttrReplacedBy) { + if (fieldAttrReplacedBy.size() > 0) { + plan = plan.transformExpressionsOnly(FieldAttribute.class, f -> { + Attribute replacement = fieldAttrReplacedBy.get(f); + return replacement == null ? f : replacement; + }); + } return plan; }