1010import org .elasticsearch .common .util .Maps ;
1111import org .elasticsearch .index .IndexMode ;
1212import org .elasticsearch .xpack .esql .core .expression .Alias ;
13+ import org .elasticsearch .xpack .esql .core .expression .Attribute ;
1314import org .elasticsearch .xpack .esql .core .expression .AttributeSet ;
1415import org .elasticsearch .xpack .esql .core .expression .FieldAttribute ;
1516import org .elasticsearch .xpack .esql .core .expression .Literal ;
1617import org .elasticsearch .xpack .esql .core .expression .NamedExpression ;
1718import org .elasticsearch .xpack .esql .core .type .DataType ;
1819import org .elasticsearch .xpack .esql .optimizer .LocalLogicalOptimizerContext ;
19- import org .elasticsearch .xpack .esql .plan .logical .Aggregate ;
2020import org .elasticsearch .xpack .esql .plan .logical .EsRelation ;
2121import org .elasticsearch .xpack .esql .plan .logical .Eval ;
2222import org .elasticsearch .xpack .esql .plan .logical .Filter ;
2525import org .elasticsearch .xpack .esql .plan .logical .Project ;
2626import org .elasticsearch .xpack .esql .plan .logical .RegexExtract ;
2727import org .elasticsearch .xpack .esql .plan .logical .TopN ;
28- import org .elasticsearch .xpack .esql .plan .logical .join .Join ;
29- import org .elasticsearch .xpack .esql .plan .logical .local .LocalRelation ;
3028import org .elasticsearch .xpack .esql .rule .ParameterizedRule ;
31- import org .elasticsearch .xpack .esql .stats .SearchStats ;
3229
3330import java .util .ArrayList ;
3431import java .util .List ;
3532import java .util .Map ;
33+ import java .util .function .Predicate ;
3634
3735/**
3836 * Look for any fields used in the plan that are missing locally and replace them with null.
@@ -42,79 +40,85 @@ public class ReplaceMissingFieldWithNull extends ParameterizedRule<LogicalPlan,
4240
4341 @ Override
4442 public LogicalPlan apply (LogicalPlan plan , LocalLogicalOptimizerContext localLogicalOptimizerContext ) {
43+ // Fields from lookup indices don't need to be present on the node, and our search stats don't include them, anyway. Ignore them.
4544 AttributeSet lookupFields = new AttributeSet ();
4645 plan .forEachUp (EsRelation .class , esRelation -> {
46+ // Looking only for indices in LOOKUP mode is correct: during parsing, we assign the expected mode and even if a lookup index
47+ // is used in the FROM command, it will not be marked with LOOKUP mode there - but STANDARD.
48+ // It seems like we could instead just look for JOINs and walk down their right hand side to find lookup fields - but this does
49+ // not work as this rule also gets called just on the right hand side of a JOIN, which means that we don't always know that
50+ // we're inside the right (or left) branch of a JOIN node. (See PlannerUtils.localPlan - this looks for FragmentExecs and
51+ // performs local logical optimization of the fragments; the right hand side of a LookupJoinExec can be a FragmentExec.)
4752 if (esRelation .indexMode () == IndexMode .LOOKUP ) {
4853 lookupFields .addAll (esRelation .output ());
4954 }
5055 });
5156
52- return plan .transformUp (p -> missingToNull (p , localLogicalOptimizerContext .searchStats (), lookupFields ));
53- }
54-
55- private LogicalPlan missingToNull (LogicalPlan plan , SearchStats stats , AttributeSet lookupFields ) {
56- if (plan instanceof EsRelation || plan instanceof LocalRelation ) {
57- return plan ;
58- }
57+ // Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead.
58+ // Also retain fields from lookup indices because we do not have stats for these.
59+ Predicate <FieldAttribute > shouldBeRetained = f -> (localLogicalOptimizerContext .searchStats ().exists (f .fieldName ())
60+ || lookupFields .contains (f ));
5961
60- if (plan instanceof Aggregate a ) {
61- // don't do anything (for now)
62- return a ;
63- }
64- // keep the aliased name
65- else if (plan instanceof Project project ) {
66- var projections = project .projections ();
67- List <NamedExpression > newProjections = new ArrayList <>(projections .size ());
68- Map <DataType , Alias > nullLiteral = Maps .newLinkedHashMapWithExpectedSize (DataType .types ().size ());
69- AttributeSet joinAttributes = joinAttributes (project );
62+ return plan .transformUp (p -> missingToNull (p , shouldBeRetained ));
63+ }
7064
71- for (NamedExpression projection : projections ) {
72- // Do not use the attribute name, this can deviate from the field name for union types.
73- if (projection instanceof FieldAttribute f && stats .exists (f .fieldName ()) == false && joinAttributes .contains (f ) == false ) {
74- // TODO: Should do a searchStats lookup for join attributes instead of just ignoring them here
75- // See TransportSearchShardsAction
65+ private LogicalPlan missingToNull (LogicalPlan plan , Predicate <FieldAttribute > shouldBeRetained ) {
66+ if (plan instanceof EsRelation relation ) {
67+ // Remove missing fields from the EsRelation because this is not where we will obtain them from; replace them by an Eval right
68+ // after, instead. This allows us to safely re-use the attribute ids of the corresponding FieldAttributes.
69+ // This means that an EsRelation[field1, field2, field3] where field1 and field 3 are missing will be replaced by
70+ // Project[field1, field2, field3] <- keeps the ordering intact
71+ // \_Eval[field1 = null, field3 = null]
72+ // \_EsRelation[field2]
73+ List <Attribute > relationOutput = relation .output ();
74+ Map <DataType , Alias > nullLiterals = Maps .newLinkedHashMapWithExpectedSize (DataType .types ().size ());
75+ List <NamedExpression > newProjections = new ArrayList <>(relationOutput .size ());
76+ for (int i = 0 , size = relationOutput .size (); i < size ; i ++) {
77+ Attribute attr = relationOutput .get (i );
78+ NamedExpression projection ;
79+ if (attr instanceof FieldAttribute f && (shouldBeRetained .test (f ) == false )) {
7680 DataType dt = f .dataType ();
77- Alias nullAlias = nullLiteral .get (f . dataType () );
81+ Alias nullAlias = nullLiterals .get (dt );
7882 // save the first field as null (per datatype)
7983 if (nullAlias == null ) {
84+ // Keep the same id so downstream query plans don't need updating
85+ // NOTE: THIS IS BRITTLE AND CAN LEAD TO BUGS.
86+ // In case some optimizer rule or so inserts a plan node that requires the field BEFORE the Eval that we're adding
87+ // on top of the EsRelation, this can trigger a field extraction in the physical optimizer phase, causing wrong
88+ // layouts due to a duplicate name id.
89+ // If someone reaches here AGAIN when debugging e.g. ClassCastExceptions NPEs from wrong layouts, we should probably
90+ // give up on this approach and instead insert EvalExecs in InsertFieldExtraction.
8091 Alias alias = new Alias (f .source (), f .name (), Literal .of (f , null ), f .id ());
81- nullLiteral .put (dt , alias );
92+ nullLiterals .put (dt , alias );
8293 projection = alias .toAttribute ();
8394 }
84- // otherwise point to it
95+ // otherwise point to it since this avoids creating field copies
8596 else {
86- // since avoids creating field copies
8797 projection = new Alias (f .source (), f .name (), nullAlias .toAttribute (), f .id ());
8898 }
99+ } else {
100+ projection = attr ;
89101 }
90-
91102 newProjections .add (projection );
92103 }
93- // add the first found field as null
94- if (nullLiteral .size () > 0 ) {
95- plan = new Eval (project .source (), project .child (), new ArrayList <>(nullLiteral .values ()));
96- plan = new Project (project .source (), plan , newProjections );
104+
105+ if (nullLiterals .size () == 0 ) {
106+ return plan ;
97107 }
98- } else if (plan instanceof Eval
108+
109+ Eval eval = new Eval (plan .source (), relation , new ArrayList <>(nullLiterals .values ()));
110+ // This projection is redundant if there's another projection downstream (and no commands depend on the order until we hit it).
111+ return new Project (plan .source (), eval , newProjections );
112+ }
113+
114+ if (plan instanceof Eval
99115 || plan instanceof Filter
100116 || plan instanceof OrderBy
101117 || plan instanceof RegexExtract
102118 || plan instanceof TopN ) {
103- plan = plan .transformExpressionsOnlyUp (
104- FieldAttribute .class ,
105- // Do not use the attribute name, this can deviate from the field name for union types.
106- // Also skip fields from lookup indices because we do not have stats for these.
107- // TODO: We do have stats for lookup indices in case they are being used in the FROM clause; this can be refined.
108- f -> stats .exists (f .fieldName ()) || lookupFields .contains (f ) ? f : Literal .of (f , null )
109- );
110- }
119+ return plan .transformExpressionsOnlyUp (FieldAttribute .class , f -> shouldBeRetained .test (f ) ? f : Literal .of (f , null ));
120+ }
111121
112122 return plan ;
113123 }
114-
115- private AttributeSet joinAttributes (Project project ) {
116- var attributes = new AttributeSet ();
117- project .forEachDown (Join .class , j -> j .right ().forEachDown (EsRelation .class , p -> attributes .addAll (p .output ())));
118- return attributes ;
119- }
120124}
0 commit comments