diff --git a/docs/changelog/125462.yaml b/docs/changelog/125462.yaml new file mode 100644 index 0000000000000..be26ea24695ab --- /dev/null +++ b/docs/changelog/125462.yaml @@ -0,0 +1,6 @@ +pr: 125462 +summary: Assign new id to alias created by `ReplaceMissingFieldWithNull` when there + is lookup join +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index b25f57b77f074..2cd93d56bbe7a 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -986,6 +986,32 @@ public void testDoubleParamsWithLookupJoin() throws IOException { ); } + public void testMultipleBatchesWithLookupJoin() throws IOException { + assumeTrue( + "Requires new null alias ids for join with multiple batches", + EsqlCapabilities.Cap.REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES.isEnabled() + ); + // Create more than 10 indices to trigger multiple batches of data node execution. + // The sort field should be missing on some indices to reproduce NullPointerException caused by duplicated items in layout + for (int i = 1; i <= 20; i++) { + createIndex("idx" + i, randomBoolean(), "\"mappings\": {\"properties\" : {\"a\" : {\"type\" : \"keyword\"}}}"); + } + bulkLoadTestDataLookupMode(10); + // lookup join with and without sort + for (String sort : List.of("", "| sort integer")) { + var query = requestObjectBuilder().query(format(null, "from * | lookup join {} on integer {}", testIndexName(), sort)); + Map result = runEsql(query); + var columns = as(result.get("columns"), List.class); + assertEquals(21, columns.size()); + var values = as(result.get("values"), List.class); + assertEquals(10, values.size()); + } + // clean up + for (int i = 1; i <= 20; i++) { + assertThat(deleteIndex("idx" + i).isAcknowledged(), is(true)); + } + } + private void validateResultsOfDoubleParametersForIdentifiers(RequestObjectBuilder query) throws IOException { Map result = runEsql(query); Map colA = Map.of("name", "boolean", "type", "boolean"); @@ -1668,6 +1694,13 @@ private static String repeatValueAsMV(Object value) { return "[" + value + ", " + value + "]"; } + private static void createIndex(String indexName, boolean lookupMode, String mapping) throws IOException { + Request request = new Request("PUT", "/" + indexName); + String settings = "\"settings\" : {\"mode\" : \"lookup\"}, "; + request.setJsonEntity("{" + (lookupMode ? settings : "") + mapping + "}"); + assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode()); + } + public static RequestObjectBuilder requestObjectBuilder() throws IOException { return new RequestObjectBuilder(); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 1b5de3283fe63..82cace2b45ee9 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1452,3 +1452,90 @@ emp_no:integer | language_code:integer | language_name:keyword 10092 | 1 | English 10093 | 3 | Spanish ; + +multipleBatchesWithSort +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| sort language_code, birth_date +| keep language_code +| limit 1 +; + +language_code:integer +1 +; + +multipleBatchesWithMvExpand +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| mv_expand birth_date +| sort birth_date, language_code +| limit 1 +; + +birth_date:datetime |language_code:integer +1952-02-27T00:00:00.000Z |null +; + +multipleBatchesWithAggregate1 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats x=max(birth_date), y=min(language_code) +; + +x:datetime |y:integer +1965-01-03T00:00:00.000Z |1 +; + +multipleBatchesWithAggregate2 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats m=min(birth_date) by language_code +| sort language_code +| limit 1 +; + +m:datetime |language_code:integer +null |1 +; + +multipleBatchesWithAggregate3 +required_capability: join_lookup_v12 +required_capability: remove_redundant_sort +required_capability: replace_missing_field_with_null_new_alias_id_for_join_and_multiple_batches + +from * +| rename city.country.continent.planet.name as message +| lookup join message_types_lookup on message +| keep birth_date, language_code +| stats m=min(language_code) by birth_date +| sort birth_date +| limit 1 +; + +m:integer |birth_date:datetime +null |1952-02-27T00:00:00.000Z +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 3846d0e8a65a7..d7b4654643ba7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -932,7 +932,12 @@ public enum Cap { /** * Index component selector syntax (my-data-stream-name::failures) */ - INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()); + INDEX_COMPONENT_SELECTORS(DataStream.isFailureStoreFeatureFlagEnabled()), + + /** + * Create null alias with new id in ReplaceMissingFieldWithNull when there is lookup join with multiple batches. + */ + REPLACE_MISSING_FIELD_WITH_NULL_NEW_ALIAS_ID_FOR_JOIN_AND_MULTIPLE_BATCHES; private final boolean enabled; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java index d36fae54f5162..87405eb1d9078 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java @@ -10,10 +10,12 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.IndexMode; import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField; import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext; @@ -22,6 +24,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.OrderBy; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.RegexExtract; @@ -81,7 +84,11 @@ else if (plan instanceof Project project) { Alias nullAlias = nullLiteral.get(f.dataType()); // save the first field as null (per datatype) if (nullAlias == null) { - Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id()); + // In case of batch executions on data nodes and join exists, SearchStats may not always be available for all + // fields, creating a new alias for null with the same id as the field id can potentially cause planEval to add a + // duplicated ChannelSet to a layout, and Layout.builder().build() could throw a NullPointerException. + // As a workaround, assign a new alias id to the null alias when join exists and SearchStats is not available. + Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), joinAttributes.isEmpty() ? f.id() : null); nullLiteral.put(dt, alias); projection = alias.toAttribute(); } @@ -113,14 +120,40 @@ else if (plan instanceof Project project) { ? f : Literal.of(f, null) ); + } else if (plan instanceof MvExpand m) { + NamedExpression target = m.target(); + AttributeSet joinAttributes = joinAttributes(m); + if (joinAttributes.isEmpty() == false // rewrite only when there is join, TODO do we want to rewrite when there is no join? + && target instanceof FieldAttribute f + && stats.exists(f.fieldName()) == false + && joinAttributes.contains(f) == false + && f.field() instanceof PotentiallyUnmappedKeywordEsField == false) { + // Replace missing target field with null. + Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null)); + NamedExpression nullTarget = alias.toAttribute(); + plan = new Eval(m.source(), m.child(), List.of(alias)); + // The expanded reference is built on top of target field with the same name, and the parent plans all reference to the + // expanded reference other than the target field, keep expanded's id unchanged, otherwise the parent plans cannot find + // it. + Attribute nullExpanded = new ReferenceAttribute( + nullTarget.source(), + nullTarget.name(), + nullTarget.dataType(), + nullTarget.nullable(), + m.expanded().id(), + false + ); + plan = new MvExpand(m.source(), plan, nullTarget, nullExpanded); + } } - return plan; } - private AttributeSet joinAttributes(Project project) { + private AttributeSet joinAttributes(LogicalPlan plan) { var attributes = new AttributeSet(); - project.forEachDown(Join.class, j -> j.right().forEachDown(EsRelation.class, p -> attributes.addAll(p.output()))); + if (plan instanceof Project || plan instanceof MvExpand) { + plan.forEachDown(Join.class, j -> j.right().forEachDown(EsRelation.class, p -> attributes.addAll(p.output()))); + } return attributes; } }