Skip to content

Commit eb7c5c5

Browse files
Clean up, add more UTs
1 parent ba9ab52 commit eb7c5c5

File tree

9 files changed

+65
-93
lines changed

9 files changed

+65
-93
lines changed

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,6 @@ public void testLookupExplosionManyMatchesFiltered() throws IOException {
726726
// This test will only work with the expanding join optimization
727727
// that pushes the filter to the right side of the lookup.
728728
// Without the optimization, it will fail with circuit_breaking_exception
729-
// lookupEntries % reductionFactor must be 0 to ensure that the number of matches is reduced
730729
int sensorDataCount = 10000;
731730
int lookupEntries = 10000;
732731
int reductionFactor = 1000; // reduce the number of matches by this factor

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5151,7 +5151,7 @@ null | null | fred | null | null
51515151
;
51525152

51535153

5154-
lookupJoinWithPushableFilterOnLeft
5154+
lookupJoinWithPushableFilterOnRight
51555155
required_capability: join_lookup_v12
51565156
required_capability: lookup_join_on_multiple_fields
51575157

@@ -5178,7 +5178,7 @@ id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:int
51785178
14 | Nina | foo2 | omicron | 15000
51795179
;
51805180

5181-
lookupJoinWithTwoPushableFiltersOnLeft
5181+
lookupJoinWithTwoPushableFiltersOnRight
51825182
required_capability: join_lookup_v12
51835183
required_capability: lookup_join_on_multiple_fields
51845184

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
6060
import org.elasticsearch.xpack.esql.core.tree.Source;
6161
import org.elasticsearch.xpack.esql.core.type.DataType;
62+
import org.elasticsearch.xpack.esql.core.type.EsField;
6263
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
6364
import org.elasticsearch.xpack.esql.enrich.MatchConfig;
6465
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan;
@@ -228,16 +229,11 @@ public void populate(int docCount, List<String> expected, Predicate<Integer> fil
228229
}
229230
}
230231

231-
Expression buildGreaterThanFilter(long value) {
232+
private Expression buildGreaterThanFilter(long value) {
232233
FieldAttribute filterAttribute = new FieldAttribute(
233234
Source.EMPTY,
234235
"l",
235-
new org.elasticsearch.xpack.esql.core.type.EsField(
236-
"l",
237-
org.elasticsearch.xpack.esql.core.type.DataType.LONG,
238-
java.util.Collections.emptyMap(),
239-
true
240-
)
236+
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
241237
);
242238
return new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
243239
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
3939
import org.elasticsearch.compute.operator.Driver;
4040
import org.elasticsearch.compute.operator.DriverContext;
41-
import org.elasticsearch.compute.operator.FilterOperator;
4241
import org.elasticsearch.compute.operator.Operator;
4342
import org.elasticsearch.compute.operator.OutputOperator;
4443
import org.elasticsearch.compute.operator.ProjectOperator;
@@ -74,14 +73,10 @@
7473
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
7574
import org.elasticsearch.xpack.esql.core.expression.Alias;
7675
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
77-
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
7876
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
7977
import org.elasticsearch.xpack.esql.core.tree.Source;
8078
import org.elasticsearch.xpack.esql.core.type.DataType;
81-
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
82-
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
8379
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
84-
import org.elasticsearch.xpack.esql.planner.Layout;
8580
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
8681
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
8782

@@ -418,27 +413,6 @@ public void onFailure(Exception e) {
418413
}
419414
}
420415

421-
private Operator filterExecOperator(
422-
FilterExec filterExec,
423-
Operator inputOperator, // not needed?
424-
EsPhysicalOperationProviders.ShardContext shardContext,
425-
DriverContext driverContext,
426-
Layout.Builder builder
427-
) {
428-
if (filterExec == null) {
429-
return null;
430-
}
431-
432-
var evaluatorFactory = EvalMapper.toEvaluator(
433-
FoldContext.small()/*is this correct*/,
434-
filterExec.condition(),
435-
builder.build(),
436-
List.of(shardContext)
437-
);
438-
var filterOperatorFactory = new FilterOperator.FilterOperatorFactory(evaluatorFactory);
439-
return filterOperatorFactory.get(driverContext);
440-
}
441-
442416
private static Operator extractFieldsOperator(
443417
EsPhysicalOperationProviders.ShardContext shardContext,
444418
DriverContext driverContext,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ protected LookupEnrichQueryGenerator queryList(
101101
Block inputBlock,
102102
Warnings warnings
103103
) {
104-
105104
List<QueryList> queryLists = new ArrayList<>();
106105
for (int i = 0; i < request.matchFields.size(); i++) {
107106
MatchConfig matchField = request.matchFields.get(i);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,8 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) {
156156
}
157157
// push the right scoped filter down to the right child
158158
if (scoped.rightFilters().isEmpty() == false && (join.right() instanceof Filter == false)) {
159-
// push the filter down to the right child
160159
List<Expression> rightPushableFilters = buildRightPushableFilters(scoped.rightFilters());
161160
if (rightPushableFilters.isEmpty() == false) {
162-
// right = new Filter(right.source(), right, Predicates.combineAnd(rightPushableFilters));
163-
// update the join with the new right child
164-
// join = (Join) join.replaceRight(right);
165161
Expression optionalRightHandSideFilters = Predicates.combineAnd(rightPushableFilters);
166162
join = join.withOptionalRightHandFilters(optionalRightHandSideFilters);
167163
optimizationApplied = true;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
2424
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
2525
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
26-
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
2726
import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
2827
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
2928
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
@@ -112,33 +111,19 @@ private PhysicalPlan mapBinary(BinaryPlan binary) {
112111
join.rightOutputFields()
113112
);
114113
}
115-
if (right instanceof FilterExec filterExec) {
116-
LookupJoinExec lookupJoinExec = getLookupJoinExec(join, filterExec.child(), left, config);
117-
if (lookupJoinExec != null) {
118-
// build the right child as a FilterExec with the original lookupJoinExec.right() as the child
119-
FilterExec newRightChild = filterExec.replaceChild(lookupJoinExec.right());
120-
return lookupJoinExec.replaceChildren(lookupJoinExec.left(), newRightChild);
121-
}
114+
if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) {
115+
return new LookupJoinExec(
116+
join.source(),
117+
left,
118+
right,
119+
config.leftFields(),
120+
config.rightFields(),
121+
join.rightOutputFields(),
122+
join.optionalRightHandFilters()
123+
);
122124
}
123-
LookupJoinExec lookupJoinExec = getLookupJoinExec(join, right, left, config);
124-
if (lookupJoinExec != null) return lookupJoinExec;
125125
}
126126

127127
return MapperUtils.unsupported(binary);
128128
}
129-
130-
private static LookupJoinExec getLookupJoinExec(Join join, PhysicalPlan right, PhysicalPlan left, JoinConfig config) {
131-
if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) {
132-
return new LookupJoinExec(
133-
join.source(),
134-
left,
135-
right,
136-
config.leftFields(),
137-
config.rightFields(),
138-
join.rightOutputFields(),
139-
join.optionalRightHandFilters()
140-
);
141-
}
142-
return null;
143-
}
144129
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
1717
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1818
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
19-
import org.elasticsearch.xpack.esql.plan.logical.Filter;
2019
import org.elasticsearch.xpack.esql.plan.logical.Fork;
2120
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
2221
import org.elasticsearch.xpack.esql.plan.logical.Limit;
@@ -227,23 +226,18 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
227226
join.rightOutputFields()
228227
);
229228
}
230-
if (right instanceof FragmentExec fragment) {
231-
boolean isIndexModeLookup = fragment.fragment() instanceof EsRelation relation && relation.indexMode() == IndexMode.LOOKUP;
232-
isIndexModeLookup = isIndexModeLookup
233-
|| fragment.fragment() instanceof Filter filter
234-
&& filter.child() instanceof EsRelation relation
235-
&& relation.indexMode() == IndexMode.LOOKUP;
236-
if (isIndexModeLookup) {
237-
return new LookupJoinExec(
238-
join.source(),
239-
left,
240-
right,
241-
config.leftFields(),
242-
config.rightFields(),
243-
join.rightOutputFields(),
244-
join.optionalRightHandFilters()
245-
);
246-
}
229+
if (right instanceof FragmentExec fragment
230+
&& fragment.fragment() instanceof EsRelation relation
231+
&& relation.indexMode() == IndexMode.LOOKUP) {
232+
return new LookupJoinExec(
233+
join.source(),
234+
left,
235+
right,
236+
config.leftFields(),
237+
config.rightFields(),
238+
join.rightOutputFields(),
239+
join.optionalRightHandFilters()
240+
);
247241
}
248242
}
249243

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
import org.elasticsearch.common.util.concurrent.EsExecutors;
2929
import org.elasticsearch.compute.data.BlockFactory;
3030
import org.elasticsearch.compute.data.BytesRefBlock;
31-
import org.elasticsearch.compute.data.BytesRefVector;
3231
import org.elasticsearch.compute.data.IntBlock;
33-
import org.elasticsearch.compute.data.IntVector;
3432
import org.elasticsearch.compute.data.LongBlock;
3533
import org.elasticsearch.compute.data.LongVector;
3634
import org.elasticsearch.compute.data.Page;
@@ -61,12 +59,18 @@
6159
import org.elasticsearch.threadpool.TestThreadPool;
6260
import org.elasticsearch.threadpool.ThreadPool;
6361
import org.elasticsearch.transport.TransportService;
62+
import org.elasticsearch.xpack.esql.core.expression.Expression;
6463
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
64+
import org.elasticsearch.xpack.esql.core.expression.Literal;
6565
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
6666
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
67+
import org.elasticsearch.xpack.esql.core.tree.Location;
6768
import org.elasticsearch.xpack.esql.core.tree.Source;
6869
import org.elasticsearch.xpack.esql.core.type.DataType;
70+
import org.elasticsearch.xpack.esql.core.type.EsField;
71+
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan;
6972
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
73+
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
7074
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
7175
import org.hamcrest.Matcher;
7276
import org.junit.After;
@@ -75,6 +79,8 @@
7579
import java.io.IOException;
7680
import java.io.UncheckedIOException;
7781
import java.util.ArrayList;
82+
import java.util.Collections;
83+
import java.util.HashSet;
7884
import java.util.List;
7985
import java.util.Map;
8086
import java.util.Set;
@@ -86,6 +92,7 @@
8692

8793
public class LookupFromIndexOperatorTests extends OperatorTestCase {
8894
private static final int LOOKUP_SIZE = 1000;
95+
private static final int GREATER_THAN_VALUE = -10;
8996
private final ThreadPool threadPool = threadPool();
9097
private final Directory lookupIndexDirectory = newDirectory();
9198
private final List<Releasable> releasables = new ArrayList<>();
@@ -134,12 +141,17 @@ protected void assertSimpleOutput(List<Page> input, List<Page> results) {
134141
for (Page r : results) {
135142
assertThat(r.getBlockCount(), equalTo(numberOfJoinColumns + 2));
136143
LongVector match = r.<LongBlock>getBlock(0).asVector();
137-
BytesRefVector lkwd = r.<BytesRefBlock>getBlock(numberOfJoinColumns).asVector();
138-
IntVector lint = r.<IntBlock>getBlock(numberOfJoinColumns + 1).asVector();
144+
BytesRefBlock lkwdBlock = r.getBlock(numberOfJoinColumns);
145+
IntBlock lintBlock = r.getBlock(numberOfJoinColumns + 1);
139146
for (int p = 0; p < r.getPositionCount(); p++) {
140147
long m = match.getLong(p);
141-
assertThat(lkwd.getBytesRef(p, new BytesRef()).utf8ToString(), equalTo("l" + m));
142-
assertThat(lint.getInt(p), equalTo((int) -m));
148+
if (m < Math.abs(GREATER_THAN_VALUE)) {
149+
assertThat(lkwdBlock.getBytesRef(lkwdBlock.getFirstValueIndex(p), new BytesRef()).utf8ToString(), equalTo("l" + m));
150+
assertThat(lintBlock.getInt(lintBlock.getFirstValueIndex(p)), equalTo((int) -m));
151+
} else {
152+
assertTrue("at " + p, lkwdBlock.isNull(p));
153+
assertTrue("at " + p, lintBlock.isNull(p));
154+
}
143155
}
144156
}
145157
}
@@ -161,6 +173,7 @@ protected Operator.OperatorFactory simple(SimpleOptions options) {
161173
FieldAttribute.FieldName matchField = new FieldAttribute.FieldName("match" + i);
162174
matchFields.add(new MatchConfig(matchField, i, inputDataType));
163175
}
176+
164177
return new LookupFromIndexOperator.Factory(
165178
matchFields,
166179
sessionId,
@@ -171,7 +184,20 @@ protected Operator.OperatorFactory simple(SimpleOptions options) {
171184
lookupIndex,
172185
loadFields,
173186
Source.EMPTY,
174-
null
187+
buildGreaterThanFilter(GREATER_THAN_VALUE)
188+
);
189+
}
190+
191+
private Expression buildGreaterThanFilter(int value) {
192+
FieldAttribute filterAttribute = new FieldAttribute(
193+
Source.EMPTY,
194+
"lint",
195+
new EsField("lint", DataType.INTEGER, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
196+
);
197+
return new GreaterThan(
198+
new Source(new Location(0, 0), "lint > " + value),
199+
filterAttribute,
200+
new Literal(Source.EMPTY, value, DataType.INTEGER)
175201
);
176202
}
177203

@@ -187,22 +213,25 @@ protected Matcher<String> expectedToStringOfSimple() {
187213
for (int i = 0; i < numberOfJoinColumns; i++) {
188214
sb.append(" input_type=LONG match_field=match").append(i).append(" inputChannel=").append(i);
189215
}
190-
sb.append(" optional_filter=null]");
216+
sb.append(" optional_filter=lint > ").append(GREATER_THAN_VALUE).append("]");
191217
return matchesPattern(sb.toString());
192218
}
193219

194220
private LookupFromIndexService lookupService(DriverContext mainContext) {
195221
boolean beCranky = mainContext.bigArrays().breakerService() instanceof CrankyCircuitBreakerService;
196222
DiscoveryNode localNode = DiscoveryNodeUtils.create("node", "node");
223+
var builtInClusterSettings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
224+
builtInClusterSettings.add(EsqlFlags.ESQL_STRING_LIKE_ON_INDEX);
197225
ClusterService clusterService = ClusterServiceUtils.createClusterService(
198226
threadPool,
199227
localNode,
200228
Settings.builder()
201229
// Reserve 0 bytes in the sub-driver so we are more likely to hit the cranky breaker in it.
202230
.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_SIZE_SETTING, ByteSizeValue.ofKb(0))
203231
.put(BlockFactory.LOCAL_BREAKER_OVER_RESERVED_MAX_SIZE_SETTING, ByteSizeValue.ofKb(0))
232+
.put(EsqlFlags.ESQL_STRING_LIKE_ON_INDEX.getKey(), true)
204233
.build(),
205-
ClusterSettings.createBuiltInClusterSettings()
234+
new ClusterSettings(Settings.EMPTY, builtInClusterSettings)
206235
);
207236
IndicesService indicesService = mock(IndicesService.class);
208237
IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();

0 commit comments

Comments
 (0)