24
24
import org .elasticsearch .xpack .esql .plan .logical .Drop ;
25
25
import org .elasticsearch .xpack .esql .plan .logical .Eval ;
26
26
import org .elasticsearch .xpack .esql .plan .logical .Filter ;
27
+ import org .elasticsearch .xpack .esql .plan .logical .Fork ;
27
28
import org .elasticsearch .xpack .esql .plan .logical .Grok ;
28
29
import org .elasticsearch .xpack .esql .plan .logical .Keep ;
29
30
import org .elasticsearch .xpack .esql .plan .logical .LeafPlan ;
30
31
import org .elasticsearch .xpack .esql .plan .logical .LogicalPlan ;
31
32
import org .elasticsearch .xpack .esql .plan .logical .OrderBy ;
32
33
import org .elasticsearch .xpack .esql .plan .logical .Rename ;
33
34
import org .elasticsearch .xpack .esql .plan .logical .Sample ;
35
+ import org .elasticsearch .xpack .esql .plan .logical .join .Join ;
34
36
import org .elasticsearch .xpack .esql .session .Result ;
35
37
36
38
import java .util .List ;
@@ -74,6 +76,9 @@ public interface LogicalPlanRunner {
74
76
void run (LogicalPlan plan , ActionListener <Result > listener );
75
77
}
76
78
79
+ /**
80
+ * Commands that map one row to one row. These commands can be swapped with {@code SAMPLE}.
81
+ */
77
82
private static final Set <Class <? extends LogicalPlan >> ONE_TO_ONE_COMMANDS = Set .of (
78
83
Dissect .class ,
79
84
Drop .class ,
@@ -84,8 +89,16 @@ public interface LogicalPlanRunner {
84
89
Rename .class
85
90
);
86
91
92
+ /**
93
+ * Commands that map one row to one or zero rows. These commands can be swapped with {@code SAMPLE}.
94
+ */
87
95
private static final Set <Class <? extends LogicalPlan >> FILTER_COMMANDS = Set .of (Filter .class , Sample .class );
88
96
97
+ /**
98
+ * Commands that cannot be used anywhere in an approximated query.
99
+ */
100
+ private static final Set <Class <? extends LogicalPlan >> INCOMPATIBLE_COMMANDS = Set .of (Fork .class , Join .class );
101
+
89
102
// TODO: find a good default value, or alternative ways of setting it
90
103
private static final int SAMPLE_ROW_COUNT = 1000 ;
91
104
@@ -115,23 +128,32 @@ private boolean verifyPlan() {
115
128
if (logicalPlan .preOptimized () == false ) {
116
129
throw new IllegalStateException ("Expected pre-optimized plan" );
117
130
}
118
-
119
131
if (logicalPlan .anyMatch (plan -> plan instanceof Aggregate ) == false ) {
120
- throw new InvalidArgumentException ("query without [STATS] function cannot be approximated" );
132
+ throw new InvalidArgumentException ("query without [STATS] command cannot be approximated" );
121
133
}
134
+ logicalPlan .forEachUp (plan -> {
135
+ if (INCOMPATIBLE_COMMANDS .contains (plan .getClass ())) {
136
+ throw new InvalidArgumentException (
137
+ "query with [" + plan .nodeName ().toUpperCase (Locale .ROOT ) + "] command cannot be approximated"
138
+ );
139
+ }
140
+ });
122
141
123
142
Holder <Boolean > encounteredStats = new Holder <>(false );
124
143
Holder <Boolean > hasFilters = new Holder <>(false );
125
144
logicalPlan .transformUp (plan -> {
126
- // TODO: check/fix for JOIN / FORK / INLINESTATS / ...
127
- if (plan instanceof LeafPlan ) {
145
+ if (INCOMPATIBLE_COMMANDS .contains (plan .getClass ())) {
146
+ throw new InvalidArgumentException (
147
+ "query with [" + plan .nodeName ().toUpperCase (Locale .ROOT ) + "] command cannot be approximated"
148
+ );
149
+ } else if (plan instanceof LeafPlan ) {
128
150
encounteredStats .set (false );
129
151
} else if (encounteredStats .get () == false ) {
130
152
if (plan instanceof Aggregate ) {
131
153
encounteredStats .set (true );
132
154
} else if (ONE_TO_ONE_COMMANDS .contains (plan .getClass ()) == false && FILTER_COMMANDS .contains (plan .getClass ()) == false ) {
133
155
throw new InvalidArgumentException (
134
- "query with [" + plan .nodeName ().toUpperCase (Locale .ROOT ) + "] before [STATS] function cannot be approximated"
156
+ "query with [" + plan .nodeName ().toUpperCase (Locale .ROOT ) + "] before [STATS] command cannot be approximated"
135
157
);
136
158
} else if (FILTER_COMMANDS .contains (plan .getClass ())) {
137
159
hasFilters .set (true );
@@ -149,7 +171,6 @@ private boolean verifyPlan() {
149
171
*/
150
172
private LogicalPlan sourceCountPlan () {
151
173
LogicalPlan sourceCountPlan = logicalPlan .transformUp (plan -> {
152
- // TODO: check/fix for JOIN / FORK / INLINESTATS / ...
153
174
if (plan instanceof LeafPlan ) {
154
175
plan = new Aggregate (
155
176
Source .EMPTY ,
@@ -192,7 +213,6 @@ private ActionListener<Result> sourceCountListener(LogicalPlanRunner runner, Act
192
213
private LogicalPlan countPlan (double sampleProbability ) {
193
214
Holder <Boolean > encounteredStats = new Holder <>(false );
194
215
LogicalPlan countPlan = logicalPlan .transformUp (plan -> {
195
- // TODO: check/fix for JOIN / FORK / INLINESTATS / ...
196
216
if (plan instanceof LeafPlan ) {
197
217
encounteredStats .set (false );
198
218
} else if (encounteredStats .get () == false ) {
@@ -226,7 +246,7 @@ private LogicalPlan countPlan(double sampleProbability) {
226
246
private ActionListener <Result > countListener (LogicalPlanRunner runner , double probability , ActionListener <Result > listener ) {
227
247
return listener .delegateFailureAndWrap ((countListener , countResult ) -> {
228
248
long rowCount = rowCount (countResult );
229
- logger .debug ("countPlan result (p={}):{} rows" , probability , rowCount );
249
+ logger .debug ("countPlan result (p={}): {} rows" , probability , rowCount );
230
250
double newProbability = probability * SAMPLE_ROW_COUNT / Math .max (1 , rowCount );
231
251
if (rowCount <= SAMPLE_ROW_COUNT / 2 && newProbability < 1.0 ) {
232
252
runner .run (countPlan (newProbability ), countListener (runner , newProbability , listener ));
@@ -264,7 +284,6 @@ private LogicalPlan approximatePlan(double sampleProbability) {
264
284
logger .debug ("generating approximate plan (p={})" , sampleProbability );
265
285
Holder <Boolean > encounteredStats = new Holder <>(false );
266
286
LogicalPlan approximatePlan = logicalPlan .transformUp (plan -> {
267
- // TODO: check/fix for JOIN / FORK / INLINESTATS / ...
268
287
if (plan instanceof LeafPlan ) {
269
288
encounteredStats .set (false );
270
289
} else if (encounteredStats .get () == false ) {
0 commit comments