From 837cf7b20ce2dd86830d36a0b9bc69920e350951 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Mon, 28 Jul 2025 16:25:40 +0200 Subject: [PATCH] Make ES|QL SAMPLE not a pipeline breaker --- .../xpack/esql/planner/mapper/LocalMapper.java | 8 -------- .../elasticsearch/xpack/esql/planner/mapper/Mapper.java | 8 -------- .../xpack/esql/planner/mapper/MapperUtils.java | 6 ++++++ 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java index 29f2db102ea7e..a1671bffc5c25 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java @@ -17,7 +17,6 @@ import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; -import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.join.Join; @@ -29,7 +28,6 @@ import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; -import org.elasticsearch.xpack.esql.plan.physical.SampleExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import java.util.List; @@ -71,7 +69,6 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { // // Pipeline breakers // - if (unary instanceof Aggregate aggregate) { List intermediate = MapperUtils.intermediateAttributes(aggregate); return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate); @@ -85,14 +82,9 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null); } - if (unary instanceof Sample sample) { - return new SampleExec(sample.source(), mappedChild, sample.probability()); - } - // // Pipeline operators // - return MapperUtils.mapUnary(unary, mappedChild); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java index bf6f0b89efbec..aac1d58e5f7f1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java @@ -21,7 +21,6 @@ import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; -import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; @@ -37,7 +36,6 @@ import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.MergeExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; -import org.elasticsearch.xpack.esql.plan.physical.SampleExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.plan.physical.UnaryExec; import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec; @@ -187,12 +185,6 @@ private PhysicalPlan mapUnary(UnaryPlan unary) { ); } - // TODO: share code with local LocalMapper? - if (unary instanceof Sample sample) { - mappedChild = addExchangeForFragment(sample, mappedChild); - return new SampleExec(sample.source(), mappedChild, sample.probability()); - } - // // Pipeline operators // diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java index 4851de1616844..4db37cf6547eb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval; +import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; @@ -43,6 +44,7 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec; +import org.elasticsearch.xpack.esql.plan.physical.SampleExec; import org.elasticsearch.xpack.esql.plan.physical.ShowExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec; @@ -139,6 +141,10 @@ static PhysicalPlan mapUnary(UnaryPlan p, PhysicalPlan child) { return new RrfScoreEvalExec(rrf.source(), child, rrf.scoreAttribute(), rrf.forkAttribute()); } + if (p instanceof Sample sample) { + return new SampleExec(sample.source(), child, sample.probability()); + } + return unsupported(p); }