Skip to content

Commit c02b484

Browse files
authored
Use mock block factory in breaker tests (#122965)
We have two leaks reported in breaker tests, but we do not have their traces. This PR integrates MockBlockFactory for breaker tests to help identify these potential leaks. Relates #122810
1 parent 28f4d87 commit c02b484

File tree

8 files changed

+72
-14
lines changed

8 files changed

+72
-14
lines changed

muted-tests.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,6 @@ tests:
295295
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
296296
method: testSearchWithRandomDisconnects
297297
issue: https://github.com/elastic/elasticsearch/issues/122707
298-
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
299-
issue: https://github.com/elastic/elasticsearch/issues/122810
300298
- class: org.elasticsearch.snapshots.DedicatedClusterSnapshotRestoreIT
301299
method: testRestoreShrinkIndex
302300
issue: https://github.com/elastic/elasticsearch/issues/121717
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.data;
9+
10+
public record BlockFactoryProvider(BlockFactory blockFactory) {
11+
12+
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import org.elasticsearch.common.unit.ByteSizeValue;
1818
import org.elasticsearch.common.util.CollectionUtils;
1919
import org.elasticsearch.compute.data.BlockFactory;
20+
import org.elasticsearch.compute.data.BlockFactoryProvider;
2021
import org.elasticsearch.compute.operator.exchange.ExchangeService;
22+
import org.elasticsearch.compute.test.MockBlockFactory;
2123
import org.elasticsearch.core.TimeValue;
2224
import org.elasticsearch.core.Tuple;
2325
import org.elasticsearch.health.node.selection.HealthNode;
@@ -60,6 +62,22 @@ public void ensureExchangesAreReleased() throws Exception {
6062
}
6163

6264
public void ensureBlocksReleased() {
65+
for (String node : internalCluster().getNodeNames()) {
66+
BlockFactoryProvider blockFactoryProvider = internalCluster().getInstance(BlockFactoryProvider.class, node);
67+
try {
68+
if (blockFactoryProvider.blockFactory() instanceof MockBlockFactory mockBlockFactory) {
69+
assertBusy(() -> {
70+
try {
71+
mockBlockFactory.ensureAllBlocksAreReleased();
72+
} catch (Exception e) {
73+
throw new AssertionError(e);
74+
}
75+
});
76+
}
77+
} catch (Exception e) {
78+
throw new RuntimeException("failed to check mock factory", e);
79+
}
80+
}
6381
for (String node : internalCluster().getNodeNames()) {
6482
CircuitBreakerService breakerService = internalCluster().getInstance(CircuitBreakerService.class, node);
6583
CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST);

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,23 @@
1212
import org.elasticsearch.action.DocWriteResponse;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414
import org.elasticsearch.common.Strings;
15+
import org.elasticsearch.common.breaker.CircuitBreaker;
1516
import org.elasticsearch.common.breaker.CircuitBreakingException;
1617
import org.elasticsearch.common.settings.Setting;
1718
import org.elasticsearch.common.settings.Settings;
1819
import org.elasticsearch.common.unit.ByteSizeValue;
20+
import org.elasticsearch.common.util.BigArrays;
1921
import org.elasticsearch.compute.data.BlockFactory;
22+
import org.elasticsearch.compute.data.BlockFactoryProvider;
2023
import org.elasticsearch.compute.operator.exchange.ExchangeService;
24+
import org.elasticsearch.compute.test.MockBlockFactory;
2125
import org.elasticsearch.core.TimeValue;
2226
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
2327
import org.elasticsearch.plugins.Plugin;
2428
import org.elasticsearch.rest.RestStatus;
2529
import org.elasticsearch.test.junit.annotations.TestLogging;
2630
import org.elasticsearch.xpack.esql.EsqlTestUtils;
31+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
2732

2833
import java.util.ArrayList;
2934
import java.util.Collection;
@@ -50,6 +55,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
5055
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
5156
plugins.add(InternalExchangePlugin.class);
5257
plugins.add(InternalTransportSettingPlugin.class);
58+
assertTrue(plugins.removeIf(p -> p.isAssignableFrom(EsqlPlugin.class)));
59+
plugins.add(EsqlTestPluginWithMockBlockFactory.class);
5360
return plugins;
5461
}
5562

@@ -79,6 +86,17 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
7986
.build();
8087
}
8188

89+
public static class EsqlTestPluginWithMockBlockFactory extends EsqlPlugin {
90+
@Override
91+
protected BlockFactoryProvider blockFactoryProvider(
92+
CircuitBreaker breaker,
93+
BigArrays bigArrays,
94+
ByteSizeValue maxPrimitiveArraySize
95+
) {
96+
return new BlockFactoryProvider(new MockBlockFactory(breaker, bigArrays, maxPrimitiveArraySize));
97+
}
98+
}
99+
82100
private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws CircuitBreakingException {
83101
setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 2048)));
84102
try {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.util.FeatureFlag;
2323
import org.elasticsearch.common.util.concurrent.EsExecutors;
2424
import org.elasticsearch.compute.data.BlockFactory;
25+
import org.elasticsearch.compute.data.BlockFactoryProvider;
2526
import org.elasticsearch.compute.data.BlockWritables;
2627
import org.elasticsearch.compute.lucene.LuceneOperator;
2728
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
@@ -111,15 +112,24 @@ public Collection<?> createComponents(PluginServices services) {
111112
BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE
112113
);
113114
BigArrays bigArrays = services.indicesService().getBigArrays().withCircuitBreaking();
114-
BlockFactory blockFactory = new BlockFactory(circuitBreaker, bigArrays, maxPrimitiveArrayBlockSize);
115+
var blockFactoryProvider = blockFactoryProvider(circuitBreaker, bigArrays, maxPrimitiveArrayBlockSize);
115116
setupSharedSecrets();
116117
return List.of(
117118
new PlanExecutor(new IndexResolver(services.client()), services.telemetryProvider().getMeterRegistry(), getLicenseState()),
118-
new ExchangeService(services.clusterService().getSettings(), services.threadPool(), ThreadPool.Names.SEARCH, blockFactory),
119-
blockFactory
119+
new ExchangeService(
120+
services.clusterService().getSettings(),
121+
services.threadPool(),
122+
ThreadPool.Names.SEARCH,
123+
blockFactoryProvider.blockFactory()
124+
),
125+
blockFactoryProvider
120126
);
121127
}
122128

129+
protected BlockFactoryProvider blockFactoryProvider(CircuitBreaker breaker, BigArrays bigArrays, ByteSizeValue maxPrimitiveArraySize) {
130+
return new BlockFactoryProvider(new BlockFactory(breaker, bigArrays, maxPrimitiveArraySize));
131+
}
132+
123133
private void setupSharedSecrets() {
124134
try {
125135
// EsqlQueryRequestBuilder.<clinit> initializes the shared secret access

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.common.io.stream.Writeable;
1919
import org.elasticsearch.common.util.BigArrays;
2020
import org.elasticsearch.compute.data.BlockFactory;
21+
import org.elasticsearch.compute.data.BlockFactoryProvider;
2122
import org.elasticsearch.injection.guice.Inject;
2223
import org.elasticsearch.tasks.Task;
2324
import org.elasticsearch.threadpool.ThreadPool;
@@ -44,7 +45,7 @@ public TransportEsqlAsyncGetResultsAction(
4445
Client client,
4546
ThreadPool threadPool,
4647
BigArrays bigArrays,
47-
BlockFactory blockFactory
48+
BlockFactoryProvider blockFactoryProvider
4849
) {
4950
super(
5051
EsqlAsyncGetResultAction.NAME,
@@ -57,7 +58,7 @@ public TransportEsqlAsyncGetResultsAction(
5758
bigArrays,
5859
EsqlQueryTask.class
5960
);
60-
this.blockFactory = blockFactory;
61+
this.blockFactory = blockFactoryProvider.blockFactory();
6162
}
6263

6364
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.service.ClusterService;
1919
import org.elasticsearch.common.util.concurrent.EsExecutors;
2020
import org.elasticsearch.compute.data.BlockFactory;
21+
import org.elasticsearch.compute.data.BlockFactoryProvider;
2122
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2223
import org.elasticsearch.injection.guice.Inject;
2324
import org.elasticsearch.tasks.Task;
@@ -64,13 +65,13 @@ public TransportEsqlAsyncStopAction(
6465
TransportEsqlAsyncGetResultsAction getResultsAction,
6566
Client client,
6667
ExchangeService exchangeService,
67-
BlockFactory blockFactory
68+
BlockFactoryProvider blockFactoryProvider
6869
) {
6970
super(EsqlAsyncStopAction.NAME, transportService, actionFilters, AsyncStopRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
7071
this.queryAction = queryAction;
7172
this.getResultsAction = getResultsAction;
7273
this.exchangeService = exchangeService;
73-
this.blockFactory = blockFactory;
74+
this.blockFactory = blockFactoryProvider.blockFactory();
7475
this.transportService = transportService;
7576
this.clusterService = clusterService;
7677
this.security = new AsyncSearchSecurity(

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.elasticsearch.common.io.stream.StreamInput;
2121
import org.elasticsearch.common.util.BigArrays;
2222
import org.elasticsearch.common.util.concurrent.EsExecutors;
23-
import org.elasticsearch.compute.data.BlockFactory;
23+
import org.elasticsearch.compute.data.BlockFactoryProvider;
2424
import org.elasticsearch.compute.operator.exchange.ExchangeService;
2525
import org.elasticsearch.core.Nullable;
2626
import org.elasticsearch.injection.guice.Inject;
@@ -92,7 +92,7 @@ public TransportEsqlQueryAction(
9292
ClusterService clusterService,
9393
ThreadPool threadPool,
9494
BigArrays bigArrays,
95-
BlockFactory blockFactory,
95+
BlockFactoryProvider blockFactoryProvider,
9696
Client client,
9797
NamedWriteableRegistry registry,
9898
IndexNameExpressionResolver indexNameExpressionResolver,
@@ -114,14 +114,14 @@ public TransportEsqlQueryAction(
114114
lookupLookupShardContextFactory,
115115
transportService,
116116
bigArrays,
117-
blockFactory
117+
blockFactoryProvider.blockFactory()
118118
);
119119
this.lookupFromIndexService = new LookupFromIndexService(
120120
clusterService,
121121
lookupLookupShardContextFactory,
122122
transportService,
123123
bigArrays,
124-
blockFactory
124+
blockFactoryProvider.blockFactory()
125125
);
126126
this.computeService = new ComputeService(
127127
searchService,
@@ -132,7 +132,7 @@ public TransportEsqlQueryAction(
132132
clusterService,
133133
threadPool,
134134
bigArrays,
135-
blockFactory
135+
blockFactoryProvider.blockFactory()
136136
);
137137
this.asyncTaskManagementService = new AsyncTaskManagementService<>(
138138
XPackPlugin.ASYNC_RESULTS_INDEX,

0 commit comments

Comments
 (0)