Skip to content

Commit 6d2106d

Browse files
authored
ESQL: Unit tests for LOOKUP (#120559) (#120719)
This adds a unit test for LOOKUP that's going to be quite good at finding leaks.
1 parent 9489c37 commit 6d2106d

File tree

11 files changed

+300
-23
lines changed

11 files changed

+300
-23
lines changed

test/framework/src/main/java/org/elasticsearch/index/mapper/MapperServiceTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ protected final MapperService createMapperService(BooleanSupplier idFieldEnabled
196196
return createMapperService(getVersion(), getIndexSettings(), idFieldEnabled, mappings);
197197
}
198198

199-
protected final MapperService createMapperService(String mappings) throws IOException {
199+
public final MapperService createMapperService(String mappings) throws IOException {
200200
MapperService mapperService = createMapperService(mapping(b -> {}));
201201
merge(mapperService, mappings);
202202
return mapperService;
@@ -744,7 +744,7 @@ protected SearchExecutionContext createSearchExecutionContext(MapperService mapp
744744
return createSearchExecutionContext(mapperService, null, Settings.EMPTY);
745745
}
746746

747-
protected SearchExecutionContext createSearchExecutionContext(MapperService mapperService, IndexSearcher searcher) {
747+
public final SearchExecutionContext createSearchExecutionContext(MapperService mapperService, IndexSearcher searcher) {
748748
return createSearchExecutionContext(mapperService, searcher, Settings.EMPTY);
749749
}
750750

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/LocalCircuitBreaker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,9 @@ public void close() {
134134
breaker.addWithoutBreaking(-reservedBytes);
135135
}
136136
}
137+
138+
@Override
139+
public String toString() {
140+
return "LocalCircuitBreaker[" + reservedBytes + "/" + overReservedBytes + ":" + maxOverReservedBytes + "]";
141+
}
137142
}

x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ protected final void assertSimple(DriverContext context, int size) {
241241
* Tests that finish then close without calling {@link Operator#getOutput} to
242242
* retrieve a potential last page, releases all memory.
243243
*/
244-
public void testSimpleFinishClose() {
244+
public void testSimpleFinishClose() throws Exception {
245245
DriverContext driverContext = driverContext();
246246
List<Page> input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), 1));
247247
assert input.size() == 1 : "Expected single page, got: " + input;
@@ -253,7 +253,6 @@ public void testSimpleFinishClose() {
253253
operator.addInput(page);
254254
operator.finish();
255255
}
256-
assertThat(driverContext.blockFactory().breaker().getUsed(), equalTo(0L));
257256
}
258257

259258
protected final List<Page> drive(Operator operator, Iterator<Page> input, DriverContext driverContext) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,13 @@ private void runLookup(PopulateIndices populateIndices) throws IOException {
210210
new AsyncExecutionId("test", TaskId.EMPTY_TASK_ID),
211211
TEST_REQUEST_TIMEOUT
212212
);
213+
final String finalNodeWithShard = nodeWithShard;
213214
LookupFromIndexOperator.Factory lookup = new LookupFromIndexOperator.Factory(
214215
"test",
215216
parentTask,
216217
QueryPragmas.ENRICH_MAX_WORKERS.get(Settings.EMPTY),
217218
1,
218-
internalCluster().getInstance(TransportEsqlQueryAction.class, nodeWithShard).getLookupFromIndexService(),
219+
ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(),
219220
DataType.KEYWORD,
220221
"lookup",
221222
"data",

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@
133133
public abstract class AbstractLookupService<R extends AbstractLookupService.Request, T extends AbstractLookupService.TransportRequest> {
134134
private final String actionName;
135135
private final ClusterService clusterService;
136-
private final CreateShardContext createShardContext;
136+
private final LookupShardContextFactory lookupShardContextFactory;
137137
private final TransportService transportService;
138138
private final Executor executor;
139139
private final BigArrays bigArrays;
@@ -152,7 +152,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
152152
AbstractLookupService(
153153
String actionName,
154154
ClusterService clusterService,
155-
CreateShardContext createShardContext,
155+
LookupShardContextFactory lookupShardContextFactory,
156156
TransportService transportService,
157157
BigArrays bigArrays,
158158
BlockFactory blockFactory,
@@ -161,7 +161,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
161161
) {
162162
this.actionName = actionName;
163163
this.clusterService = clusterService;
164-
this.createShardContext = createShardContext;
164+
this.lookupShardContextFactory = lookupShardContextFactory;
165165
this.transportService = transportService;
166166
this.executor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH);
167167
this.bigArrays = bigArrays;
@@ -327,7 +327,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
327327
final List<Releasable> releasables = new ArrayList<>(6);
328328
boolean started = false;
329329
try {
330-
LookupShardContext shardContext = createShardContext.create(request.shardId);
330+
LookupShardContext shardContext = lookupShardContextFactory.create(request.shardId);
331331
releasables.add(shardContext.release);
332332
final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker(
333333
blockFactory.breaker(),
@@ -680,10 +680,10 @@ public boolean hasReferences() {
680680
/**
681681
* Create a {@link LookupShardContext} for a locally allocated {@link ShardId}.
682682
*/
683-
public interface CreateShardContext {
683+
public interface LookupShardContextFactory {
684684
LookupShardContext create(ShardId shardId) throws IOException;
685685

686-
static CreateShardContext fromSearchService(SearchService searchService) {
686+
static LookupShardContextFactory fromSearchService(SearchService searchService) {
687687
return shardId -> {
688688
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(shardId, 0, AliasFilter.EMPTY);
689689
return LookupShardContext.fromSearchContext(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@ public class EnrichLookupService extends AbstractLookupService<EnrichLookupServi
4747

4848
public EnrichLookupService(
4949
ClusterService clusterService,
50-
CreateShardContext createShardContext,
50+
LookupShardContextFactory lookupShardContextFactory,
5151
TransportService transportService,
5252
BigArrays bigArrays,
5353
BlockFactory blockFactory
5454
) {
5555
super(
5656
LOOKUP_ACTION_NAME,
5757
clusterService,
58-
createShardContext,
58+
lookupShardContextFactory,
5959
transportService,
6060
bigArrays,
6161
blockFactory,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.Objects;
3434
import java.util.Optional;
35+
import java.util.function.Function;
3536

3637
// TODO rename package
3738
public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndexOperator.OngoingJoin> {
@@ -40,7 +41,7 @@ public record Factory(
4041
CancellableTask parentTask,
4142
int maxOutstandingRequests,
4243
int inputChannel,
43-
LookupFromIndexService lookupService,
44+
Function<DriverContext, LookupFromIndexService> lookupService,
4445
DataType inputDataType,
4546
String lookupIndex,
4647
String matchField,
@@ -51,6 +52,8 @@ public record Factory(
5152
public String describe() {
5253
return "LookupOperator[index="
5354
+ lookupIndex
55+
+ " input_type="
56+
+ inputDataType
5457
+ " match_field="
5558
+ matchField
5659
+ " load_fields="
@@ -68,7 +71,7 @@ public Operator get(DriverContext driverContext) {
6871
parentTask,
6972
maxOutstandingRequests,
7073
inputChannel,
71-
lookupService,
74+
lookupService.apply(driverContext),
7275
inputDataType,
7376
lookupIndex,
7477
matchField,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
4646

4747
public LookupFromIndexService(
4848
ClusterService clusterService,
49-
CreateShardContext createShardContext,
49+
LookupShardContextFactory lookupShardContextFactory,
5050
TransportService transportService,
5151
BigArrays bigArrays,
5252
BlockFactory blockFactory
5353
) {
5454
super(
5555
LOOKUP_ACTION_NAME,
5656
clusterService,
57-
createShardContext,
57+
lookupShardContextFactory,
5858
transportService,
5959
bigArrays,
6060
blockFactory,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
589589
parentTask,
590590
context.queryPragmas().enrichMaxWorkers(),
591591
matchFields.get(0).channel(),
592-
lookupFromIndexService,
592+
ctx -> lookupFromIndexService,
593593
matchFields.get(0).type(),
594594
indexName,
595595
join.leftFields().get(0).name(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,18 @@ public TransportEsqlQueryAction(
108108
exchangeService.registerTransportHandler(transportService);
109109
this.exchangeService = exchangeService;
110110
this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
111-
AbstractLookupService.CreateShardContext lookupCreateShardContext = AbstractLookupService.CreateShardContext.fromSearchService(
112-
searchService
113-
);
111+
AbstractLookupService.LookupShardContextFactory lookupLookupShardContextFactory = AbstractLookupService.LookupShardContextFactory
112+
.fromSearchService(searchService);
114113
this.enrichLookupService = new EnrichLookupService(
115114
clusterService,
116-
lookupCreateShardContext,
115+
lookupLookupShardContextFactory,
117116
transportService,
118117
bigArrays,
119118
blockFactory
120119
);
121120
this.lookupFromIndexService = new LookupFromIndexService(
122121
clusterService,
123-
lookupCreateShardContext,
122+
lookupLookupShardContextFactory,
124123
transportService,
125124
bigArrays,
126125
blockFactory

0 commit comments

Comments
 (0)