Skip to content

Commit eadffb9

Browse files
committed
pipeline breaker
1 parent 6ea45d9 commit eadffb9

File tree

3 files changed

+27
-13
lines changed

3 files changed

+27
-13
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1414
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1515
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
16+
import org.elasticsearch.xpack.esql.plan.logical.ChangePoint;
1617
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
1718
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
1819
import org.elasticsearch.xpack.esql.plan.logical.Limit;
@@ -23,6 +24,7 @@
2324
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
2425
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
2526
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
27+
import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec;
2628
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
2729
import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
2830
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
@@ -89,6 +91,17 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
8991
return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null);
9092
}
9193

94+
if (unary instanceof ChangePoint changePoint) {
95+
return new ChangePointExec(
96+
changePoint.source(),
97+
mappedChild,
98+
changePoint.value(),
99+
changePoint.key(),
100+
changePoint.targetType(),
101+
changePoint.targetPvalue()
102+
);
103+
}
104+
92105
//
93106
// Pipeline operators
94107
//

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xpack.esql.core.util.Holder;
1515
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1616
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
17+
import org.elasticsearch.xpack.esql.plan.logical.ChangePoint;
1718
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1819
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
1920
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
@@ -25,6 +26,7 @@
2526
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
2627
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
2728
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
29+
import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec;
2830
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
2931
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
3032
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
@@ -171,6 +173,18 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
171173
return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null);
172174
}
173175

176+
if (unary instanceof ChangePoint changePoint) {
177+
mappedChild = addExchangeForFragment(changePoint, mappedChild);
178+
return new ChangePointExec(
179+
changePoint.source(),
180+
mappedChild,
181+
changePoint.value(),
182+
changePoint.key(),
183+
changePoint.targetType(),
184+
changePoint.targetPvalue()
185+
);
186+
}
187+
174188
//
175189
// Pipeline operators
176190
//

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1414
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
1515
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
16-
import org.elasticsearch.xpack.esql.plan.logical.ChangePoint;
1716
import org.elasticsearch.xpack.esql.plan.logical.Dissect;
1817
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1918
import org.elasticsearch.xpack.esql.plan.logical.Eval;
@@ -27,7 +26,6 @@
2726
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
2827
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
2928
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
30-
import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec;
3129
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
3230
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
3331
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
@@ -100,17 +98,6 @@ static PhysicalPlan mapUnary(UnaryPlan p, PhysicalPlan child) {
10098
return new MvExpandExec(mvExpand.source(), child, mvExpand.target(), mvExpand.expanded());
10199
}
102100

103-
if (p instanceof ChangePoint changePoint) {
104-
return new ChangePointExec(
105-
changePoint.source(),
106-
child,
107-
changePoint.value(),
108-
changePoint.key(),
109-
changePoint.targetType(),
110-
changePoint.targetPvalue()
111-
);
112-
}
113-
114101
return unsupported(p);
115102
}
116103

0 commit comments

Comments
 (0)