diff --git a/muted-tests.yml b/muted-tests.yml index cae0cd42dc6a3..c6a573246d15f 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -309,8 +309,6 @@ tests: - class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT method: testSearchWithRandomDisconnects issue: https://github.com/elastic/elasticsearch/issues/122707 -- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT - issue: https://github.com/elastic/elasticsearch/issues/122810 - class: org.elasticsearch.snapshots.DedicatedClusterSnapshotRestoreIT method: testRestoreShrinkIndex issue: https://github.com/elastic/elasticsearch/issues/121717 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactoryProvider.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactoryProvider.java new file mode 100644 index 0000000000000..4c851cc226d35 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockFactoryProvider.java @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +public record BlockFactoryProvider(BlockFactory blockFactory) { + +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java index 90bf34b499390..971a2a7705c18 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java @@ -17,7 +17,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.compute.test.MockBlockFactory; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.health.node.selection.HealthNode; @@ -60,6 +62,22 @@ public void ensureExchangesAreReleased() throws Exception { } public void ensureBlocksReleased() { + for (String node : internalCluster().getNodeNames()) { + BlockFactoryProvider blockFactoryProvider = internalCluster().getInstance(BlockFactoryProvider.class, node); + try { + if (blockFactoryProvider.blockFactory() instanceof MockBlockFactory mockBlockFactory) { + assertBusy(() -> { + try { + mockBlockFactory.ensureAllBlocksAreReleased(); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + } + } catch (Exception e) { + throw new RuntimeException("failed to check mock factory", e); + } + } for (String node : internalCluster().getNodeNames()) { CircuitBreakerService breakerService = internalCluster().getInstance(CircuitBreakerService.class, node); CircuitBreaker reqBreaker = breakerService.getBreaker(CircuitBreaker.REQUEST); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java index 57f6b55d31845..ef4a2c1e00369 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java @@ -12,18 +12,23 @@ import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.compute.test.MockBlockFactory; import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import java.util.ArrayList; import java.util.Collection; @@ -50,6 +55,8 @@ protected Collection> nodePlugins() { List> plugins = new ArrayList<>(super.nodePlugins()); plugins.add(InternalExchangePlugin.class); plugins.add(InternalTransportSettingPlugin.class); + assertTrue(plugins.removeIf(p -> p.isAssignableFrom(EsqlPlugin.class))); + plugins.add(EsqlTestPluginWithMockBlockFactory.class); return plugins; } @@ -79,6 +86,17 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { .build(); } + public static class EsqlTestPluginWithMockBlockFactory extends EsqlPlugin { + @Override + protected BlockFactoryProvider blockFactoryProvider( + CircuitBreaker breaker, + BigArrays bigArrays, + ByteSizeValue maxPrimitiveArraySize + ) { + return new BlockFactoryProvider(new MockBlockFactory(breaker, bigArrays, maxPrimitiveArraySize)); + } + } + private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws CircuitBreakingException { setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 2048))); try { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 4379e2e8041ae..7b7b056741e27 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.util.FeatureFlag; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.data.BlockWritables; import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; @@ -111,15 +112,24 @@ public Collection createComponents(PluginServices services) { BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE ); BigArrays bigArrays = services.indicesService().getBigArrays().withCircuitBreaking(); - BlockFactory blockFactory = new BlockFactory(circuitBreaker, bigArrays, maxPrimitiveArrayBlockSize); + var blockFactoryProvider = blockFactoryProvider(circuitBreaker, bigArrays, maxPrimitiveArrayBlockSize); setupSharedSecrets(); return List.of( new PlanExecutor(new IndexResolver(services.client()), services.telemetryProvider().getMeterRegistry(), getLicenseState()), - new ExchangeService(services.clusterService().getSettings(), services.threadPool(), ThreadPool.Names.SEARCH, blockFactory), - blockFactory + new ExchangeService( + services.clusterService().getSettings(), + services.threadPool(), + ThreadPool.Names.SEARCH, + blockFactoryProvider.blockFactory() + ), + blockFactoryProvider ); } + protected BlockFactoryProvider blockFactoryProvider(CircuitBreaker breaker, BigArrays bigArrays, ByteSizeValue maxPrimitiveArraySize) { + return new BlockFactoryProvider(new BlockFactory(breaker, bigArrays, maxPrimitiveArraySize)); + } + private void setupSharedSecrets() { try { // EsqlQueryRequestBuilder. initializes the shared secret access diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java index 5658db0599186..cc917dfa7a30c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -44,7 +45,7 @@ public TransportEsqlAsyncGetResultsAction( Client client, ThreadPool threadPool, BigArrays bigArrays, - BlockFactory blockFactory + BlockFactoryProvider blockFactoryProvider ) { super( EsqlAsyncGetResultAction.NAME, @@ -57,7 +58,7 @@ public TransportEsqlAsyncGetResultsAction( bigArrays, EsqlQueryTask.class ); - this.blockFactory = blockFactory; + this.blockFactory = blockFactoryProvider.blockFactory(); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java index a1a3072c69b8c..d5670d9876f1e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncStopAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -64,13 +65,13 @@ public TransportEsqlAsyncStopAction( TransportEsqlAsyncGetResultsAction getResultsAction, Client client, ExchangeService exchangeService, - BlockFactory blockFactory + BlockFactoryProvider blockFactoryProvider ) { super(EsqlAsyncStopAction.NAME, transportService, actionFilters, AsyncStopRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); this.queryAction = queryAction; this.getResultsAction = getResultsAction; this.exchangeService = exchangeService; - this.blockFactory = blockFactory; + this.blockFactory = blockFactoryProvider.blockFactory(); this.transportService = transportService; this.clusterService = clusterService; this.security = new AsyncSearchSecurity( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index baf351c27107c..c0e6704ff65ee 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -20,7 +20,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.Nullable; import org.elasticsearch.injection.guice.Inject; @@ -92,7 +92,7 @@ public TransportEsqlQueryAction( ClusterService clusterService, ThreadPool threadPool, BigArrays bigArrays, - BlockFactory blockFactory, + BlockFactoryProvider blockFactoryProvider, Client client, NamedWriteableRegistry registry, IndexNameExpressionResolver indexNameExpressionResolver, @@ -114,14 +114,14 @@ public TransportEsqlQueryAction( lookupLookupShardContextFactory, transportService, bigArrays, - blockFactory + blockFactoryProvider.blockFactory() ); this.lookupFromIndexService = new LookupFromIndexService( clusterService, lookupLookupShardContextFactory, transportService, bigArrays, - blockFactory + blockFactoryProvider.blockFactory() ); this.computeService = new ComputeService( searchService, @@ -132,7 +132,7 @@ public TransportEsqlQueryAction( clusterService, threadPool, bigArrays, - blockFactory + blockFactoryProvider.blockFactory() ); this.asyncTaskManagementService = new AsyncTaskManagementService<>( XPackPlugin.ASYNC_RESULTS_INDEX,