Skip to content

Commit ab7828b

Browse files
authored
Merge branch 'main' into entitlements/fix-policy-utils-test
2 parents 11b5595 + ecaa0b1 commit ab7828b

File tree

16 files changed

+200
-142
lines changed

16 files changed

+200
-142
lines changed

docs/changelog/125764.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
pr: 125764
2+
summary: Fix `ReplaceMissingFieldsWithNull`
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 126036
7+
- 121754
8+
- 126030

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,6 @@ tests:
308308
- class: org.elasticsearch.packaging.test.DockerTests
309309
method: test012SecurityCanBeDisabled
310310
issue: https://github.com/elastic/elasticsearch/issues/116636
311-
- class: org.elasticsearch.index.engine.ThreadPoolMergeSchedulerTests
312-
method: testSchedulerCloseWaitsForRunningMerge
313-
issue: https://github.com/elastic/elasticsearch/issues/125236
314311
- class: org.elasticsearch.index.shard.StoreRecoveryTests
315312
method: testAddIndices
316313
issue: https://github.com/elastic/elasticsearch/issues/124104

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,18 +508,21 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception {
508508
)
509509
) {
510510
CountDownLatch mergeDoneLatch = new CountDownLatch(1);
511+
CountDownLatch mergeRunningLatch = new CountDownLatch(1);
511512
MergeSource mergeSource = mock(MergeSource.class);
512513
OneMerge oneMerge = mock(OneMerge.class);
513514
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
514515
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
515516
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
516517
doAnswer(invocation -> {
518+
mergeRunningLatch.countDown();
517519
OneMerge merge = (OneMerge) invocation.getArguments()[0];
518520
assertFalse(merge.isAborted());
519521
// wait to be signalled before completing the merge
520522
mergeDoneLatch.await();
521523
return null;
522524
}).when(mergeSource).merge(any(OneMerge.class));
525+
// submit the merge
523526
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
524527
Thread t = new Thread(() -> {
525528
try {
@@ -531,6 +534,8 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception {
531534
t.start();
532535
try {
533536
assertTrue(t.isAlive());
537+
// wait for the merge to actually run
538+
mergeRunningLatch.await();
534539
// ensure the merge scheduler is effectively "closed"
535540
assertBusy(() -> {
536541
MergeSource mergeSource2 = mock(MergeSource.class);

x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,3 +643,21 @@ FROM airports
643643
abbrev:k | city_name:k | city_location:geo_point | country:k | location:geo_point | name:text | region:text | boundary_wkt_length:i
644644
IDR | Indore | POINT(75.8472 22.7167) | India | POINT(75.8092915005895 22.727749187571) | Devi Ahilyabai Holkar Int'l | Indore City | 231
645645
;
646+
647+
// Regression test for https://github.com/elastic/elasticsearch/issues/126030
648+
// We had wrong layouts from ReplaceMissingFieldsWithNull in case of indices that had relevant fields for the query,
649+
// but were **missing the field we enrich on**.
650+
fieldsInOtherIndicesBug
651+
required_capability: enrich_load
652+
required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout
653+
654+
from *
655+
| keep author.keyword, book_no, scalerank, street, bytes_in, @timestamp, abbrev, city_location, distance, description, birth_date, language_code, intersects, client_ip, event_duration, version
656+
| enrich languages_policy on author.keyword
657+
| sort book_no
658+
| limit 1
659+
;
660+
661+
author.keyword:keyword|book_no:keyword|scalerank:integer|street:keyword|bytes_in:ul|@timestamp:unsupported|abbrev:keyword|city_location:geo_point|distance:double|description:unsupported|birth_date:date|language_code:integer|intersects:boolean|client_ip:unsupported|event_duration:long|version:version|language_name:keyword
662+
Fyodor Dostoevsky |1211 |null |null |null |null |null |null |null |null |null |null |null |null |null |null |null
663+
;

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,6 +1453,10 @@ emp_no:integer | language_code:integer | language_name:keyword
14531453
10093 | 3 | Spanish
14541454
;
14551455

1456+
###############################################
1457+
# Bugfixes
1458+
###############################################
1459+
14561460
multipleBatchesWithSort
14571461
required_capability: join_lookup_v12
14581462
required_capability: remove_redundant_sort
@@ -1539,3 +1543,21 @@ from *
15391543
m:integer |birth_date:datetime
15401544
null |1952-02-27T00:00:00.000Z
15411545
;
1546+
1547+
// Regression test for https://github.com/elastic/elasticsearch/issues/126030
1548+
// We had wrong layouts from ReplaceMissingFieldsWithNull
1549+
1550+
enrichLookupStatsBug
1551+
required_capability: join_lookup_v12
1552+
required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout
1553+
1554+
from *
1555+
| enrich languages_policy on cluster
1556+
| rename languages.byte as language_code
1557+
| lookup join languages_lookup on language_code
1558+
| stats salary_change.long = max(ratings), foo = max(num)
1559+
;
1560+
1561+
salary_change.long:double|foo:long
1562+
5.0 |1698069301543123456
1563+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -932,7 +932,12 @@ public enum Cap {
932932
/**
933933
* Support for sorting when aggregate_metric_doubles are present
934934
*/
935-
AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG);
935+
AGGREGATE_METRIC_DOUBLE_SORTING(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG),
936+
937+
/**
938+
* Supercedes {@link Cap#MAKE_NUMBER_OF_CHANNELS_CONSISTENT_WITH_LAYOUT}.
939+
*/
940+
FIX_REPLACE_MISSING_FIELD_WITH_NULL_DUPLICATE_NAME_ID_IN_LAYOUT;
936941

937942
private final boolean enabled;
938943

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.analysis;
99

1010
import org.elasticsearch.index.IndexMode;
11+
import org.elasticsearch.xpack.esql.plan.IndexPattern;
1112
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1213
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1314
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
@@ -25,11 +26,11 @@ public class PreAnalyzer {
2526
public static class PreAnalysis {
2627
public static final PreAnalysis EMPTY = new PreAnalysis(emptyList(), emptyList(), emptyList());
2728

28-
public final List<TableInfo> indices;
29+
public final List<IndexPattern> indices;
2930
public final List<Enrich> enriches;
30-
public final List<TableInfo> lookupIndices;
31+
public final List<IndexPattern> lookupIndices;
3132

32-
public PreAnalysis(List<TableInfo> indices, List<Enrich> enriches, List<TableInfo> lookupIndices) {
33+
public PreAnalysis(List<IndexPattern> indices, List<Enrich> enriches, List<IndexPattern> lookupIndices) {
3334
this.indices = indices;
3435
this.enriches = enriches;
3536
this.lookupIndices = lookupIndices;
@@ -45,14 +46,11 @@ public PreAnalysis preAnalyze(LogicalPlan plan) {
4546
}
4647

4748
protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
48-
List<TableInfo> indices = new ArrayList<>();
49+
List<IndexPattern> indices = new ArrayList<>();
4950
List<Enrich> unresolvedEnriches = new ArrayList<>();
50-
List<TableInfo> lookupIndices = new ArrayList<>();
51+
List<IndexPattern> lookupIndices = new ArrayList<>();
5152

52-
plan.forEachUp(UnresolvedRelation.class, p -> {
53-
List<TableInfo> list = p.indexMode() == IndexMode.LOOKUP ? lookupIndices : indices;
54-
list.add(new TableInfo(p.indexPattern()));
55-
});
53+
plan.forEachUp(UnresolvedRelation.class, p -> (p.indexMode() == IndexMode.LOOKUP ? lookupIndices : indices).add(p.indexPattern()));
5654
plan.forEachUp(Enrich.class, unresolvedEnriches::add);
5755

5856
// mark plan as preAnalyzed (if it were marked, there would be no analysis)

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/TableInfo.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceMissingFieldWithNull.java

Lines changed: 54 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
import org.elasticsearch.common.util.Maps;
1111
import org.elasticsearch.index.IndexMode;
1212
import org.elasticsearch.xpack.esql.core.expression.Alias;
13+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1314
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1415
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1516
import org.elasticsearch.xpack.esql.core.expression.Literal;
1617
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
1718
import org.elasticsearch.xpack.esql.core.type.DataType;
1819
import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField;
1920
import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext;
20-
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
2121
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
2222
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2323
import org.elasticsearch.xpack.esql.plan.logical.Filter;
@@ -26,14 +26,12 @@
2626
import org.elasticsearch.xpack.esql.plan.logical.Project;
2727
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
2828
import org.elasticsearch.xpack.esql.plan.logical.TopN;
29-
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
30-
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
3129
import org.elasticsearch.xpack.esql.rule.ParameterizedRule;
32-
import org.elasticsearch.xpack.esql.stats.SearchStats;
3330

3431
import java.util.ArrayList;
3532
import java.util.List;
3633
import java.util.Map;
34+
import java.util.function.Predicate;
3735

3836
/**
3937
* Look for any fields used in the plan that are missing locally and replace them with null.
@@ -45,82 +43,83 @@ public class ReplaceMissingFieldWithNull extends ParameterizedRule<LogicalPlan,
4543
public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLogicalOptimizerContext) {
4644
var lookupFieldsBuilder = AttributeSet.builder();
4745
plan.forEachUp(EsRelation.class, esRelation -> {
46+
// Looking only for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index
47+
// is used in the FROM command, it will not be marked with LOOKUP mode there - but STANDARD.
48+
// It seems like we could instead just look for JOINs and walk down their right hand side to find lookup fields - but this does
49+
// not work as this rule also gets called just on the right hand side of a JOIN, which means that we don't always know that
50+
// we're inside the right (or left) branch of a JOIN node. (See PlannerUtils.localPlan - this looks for FragmentExecs and
51+
// performs local logical optimization of the fragments; the right hand side of a LookupJoinExec can be a FragmentExec.)
4852
if (esRelation.indexMode() == IndexMode.LOOKUP) {
4953
lookupFieldsBuilder.addAll(esRelation.output());
5054
}
5155
});
56+
AttributeSet lookupFields = lookupFieldsBuilder.build();
5257

53-
return plan.transformUp(p -> missingToNull(p, localLogicalOptimizerContext.searchStats(), lookupFieldsBuilder.build()));
54-
}
55-
56-
private LogicalPlan missingToNull(LogicalPlan plan, SearchStats stats, AttributeSet lookupFields) {
57-
if (plan instanceof EsRelation || plan instanceof LocalRelation) {
58-
return plan;
59-
}
58+
// Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead.
59+
// Also retain fields from lookup indices because we do not have stats for these.
60+
Predicate<FieldAttribute> shouldBeRetained = f -> f.field() instanceof PotentiallyUnmappedKeywordEsField
61+
|| (localLogicalOptimizerContext.searchStats().exists(f.fieldName()) || lookupFields.contains(f));
6062

61-
if (plan instanceof Aggregate a) {
62-
// don't do anything (for now)
63-
return a;
64-
}
65-
// keep the aliased name
66-
else if (plan instanceof Project project) {
67-
var projections = project.projections();
68-
List<NamedExpression> newProjections = new ArrayList<>(projections.size());
69-
Map<DataType, Alias> nullLiteral = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
70-
AttributeSet joinAttributes = joinAttributes(project);
63+
return plan.transformUp(p -> missingToNull(p, shouldBeRetained));
64+
}
7165

72-
for (NamedExpression projection : projections) {
73-
// Do not use the attribute name, this can deviate from the field name for union types.
74-
if (projection instanceof FieldAttribute f
75-
&& stats.exists(f.fieldName()) == false
76-
&& joinAttributes.contains(f) == false
77-
&& f.field() instanceof PotentiallyUnmappedKeywordEsField == false) {
78-
// TODO: Should do a searchStats lookup for join attributes instead of just ignoring them here
79-
// See TransportSearchShardsAction
66+
private LogicalPlan missingToNull(LogicalPlan plan, Predicate<FieldAttribute> shouldBeRetained) {
67+
if (plan instanceof EsRelation relation) {
68+
// Remove missing fields from the EsRelation because this is not where we will obtain them from; replace them by an Eval right
69+
// after, instead. This allows us to safely re-use the attribute ids of the corresponding FieldAttributes.
70+
// This means that an EsRelation[field1, field2, field3] where field1 and field 3 are missing will be replaced by
71+
// Project[field1, field2, field3] <- keeps the ordering intact
72+
// \_Eval[field1 = null, field3 = null]
73+
// \_EsRelation[field2]
74+
List<Attribute> relationOutput = relation.output();
75+
Map<DataType, Alias> nullLiterals = Maps.newLinkedHashMapWithExpectedSize(DataType.types().size());
76+
List<NamedExpression> newProjections = new ArrayList<>(relationOutput.size());
77+
for (int i = 0, size = relationOutput.size(); i < size; i++) {
78+
Attribute attr = relationOutput.get(i);
79+
NamedExpression projection;
80+
if (attr instanceof FieldAttribute f && (shouldBeRetained.test(f) == false)) {
8081
DataType dt = f.dataType();
81-
Alias nullAlias = nullLiteral.get(f.dataType());
82+
Alias nullAlias = nullLiterals.get(dt);
8283
// save the first field as null (per datatype)
8384
if (nullAlias == null) {
85+
// Keep the same id so downstream query plans don't need updating
86+
// NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
87+
// In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
88+
// on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
89+
// layouts due to a duplicate name id.
90+
// If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
91+
// give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
8492
Alias alias = new Alias(f.source(), f.name(), Literal.of(f, null), f.id());
85-
nullLiteral.put(dt, alias);
93+
nullLiterals.put(dt, alias);
8694
projection = alias.toAttribute();
8795
}
88-
// otherwise point to it
96+
// otherwise point to it since this avoids creating field copies
8997
else {
90-
// since avoids creating field copies
9198
projection = new Alias(f.source(), f.name(), nullAlias.toAttribute(), f.id());
9299
}
100+
} else {
101+
projection = attr;
93102
}
94-
95103
newProjections.add(projection);
96104
}
97-
// add the first found field as null
98-
if (nullLiteral.size() > 0) {
99-
plan = new Eval(project.source(), project.child(), new ArrayList<>(nullLiteral.values()));
100-
plan = new Project(project.source(), plan, newProjections);
105+
106+
if (nullLiterals.size() == 0) {
107+
return plan;
101108
}
102-
} else if (plan instanceof Eval
109+
110+
Eval eval = new Eval(plan.source(), relation, new ArrayList<>(nullLiterals.values()));
111+
// This projection is redundant if there's another projection downstream (and no commands depend on the order until we hit it).
112+
return new Project(plan.source(), eval, newProjections);
113+
}
114+
115+
if (plan instanceof Eval
103116
|| plan instanceof Filter
104117
|| plan instanceof OrderBy
105118
|| plan instanceof RegexExtract
106119
|| plan instanceof TopN) {
107-
plan = plan.transformExpressionsOnlyUp(
108-
FieldAttribute.class,
109-
// Do not use the attribute name, this can deviate from the field name for union types.
110-
// Also skip fields from lookup indices because we do not have stats for these.
111-
// TODO: We do have stats for lookup indices in case they are being used in the FROM clause; this can be refined.
112-
f -> f.field() instanceof PotentiallyUnmappedKeywordEsField || (stats.exists(f.fieldName()) || lookupFields.contains(f))
113-
? f
114-
: Literal.of(f, null)
115-
);
116-
}
120+
return plan.transformExpressionsOnlyUp(FieldAttribute.class, f -> shouldBeRetained.test(f) ? f : Literal.of(f, null));
121+
}
117122

118123
return plan;
119124
}
120-
121-
private static AttributeSet joinAttributes(Project project) {
122-
var attributesBuilder = AttributeSet.builder();
123-
project.forEachDown(Join.class, j -> j.right().forEachDown(EsRelation.class, p -> attributesBuilder.addAll(p.output())));
124-
return attributesBuilder.build();
125-
}
126125
}

0 commit comments

Comments
 (0)