diff --git a/docs/changelog/133398.yaml b/docs/changelog/133398.yaml
new file mode 100644
index 0000000000000..178ccddae9d4f
--- /dev/null
+++ b/docs/changelog/133398.yaml
@@ -0,0 +1,5 @@
+pr: 133398
+summary: "Aggs: Fix CB on reduction phase"
+area: Aggregations
+type: bug
+issues: []
diff --git a/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java
new file mode 100644
index 0000000000000..df789dcf69927
--- /dev/null
+++ b/modules/aggregations/src/internalClusterTest/java/org/elasticsearch/aggregations/bucket/AggregationReductionCircuitBreakingIT.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.aggregations.bucket;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.aggregations.AggregationIntegTestCase;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING;
+import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_TYPE_SETTING;
+import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING;
+import static org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService.USE_REAL_MEMORY_USAGE_SETTING;
+import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.composite;
+import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
+
+@ESIntegTestCase.ClusterScope(minNumDataNodes = 1, maxNumDataNodes = 2, numClientNodes = 1)
+public class AggregationReductionCircuitBreakingIT extends AggregationIntegTestCase {
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+ // Most of the settings here exist to make the search as stable and deterministic as possible
+ var settings = Settings.builder()
+ .put(super.nodeSettings(nodeOrdinal, otherSettings))
+ .put(REQUEST_CIRCUIT_BREAKER_TYPE_SETTING.getKey(), "memory")
+ // More threads may lead to more consumption and the test failing in the datanodes
+ .put("thread_pool.search.size", 1);
+ if (NODE_ROLES_SETTING.get(otherSettings).isEmpty()) {
+ // Coordinator
+ settings.put(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "7MB");
+ } else {
+ // Datanode
+ // To avoid OOMs
+ settings.put(USE_REAL_MEMORY_USAGE_SETTING.getKey(), true).put(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "80%");
+ }
+ return settings.build();
+ }
+
+ /**
+ * Expect the breaker to trip in `QueryPhaseResultConsume#reduce()`, when reducing `MergeResult`s.
+ *
+ * After testing this, the agg serialized size is around 5MB.
+ * The CB is set to 7MB, as the reduction is expected to add an extra 50% overhead.
+ *
+ */
+ public void testCBTrippingOnReduction() throws IOException {
+ createIndex();
+ addDocs(100, 100, 100);
+
+ // Some leaks (Check ESTestCase#loggedLeaks) aren't logged unless we run the test twice.
+ // So we run it multiple times to ensure everything gets collected before the final test checks.
+ for (int i = 0; i < 10; i++) {
+ assertCBTrip(
+ () -> internalCluster().coordOnlyNodeClient()
+ .prepareSearch("index")
+ .setSize(0)
+ .addAggregation(
+ composite(
+ "composite",
+ List.of(
+ new TermsValuesSourceBuilder("integer").field("integer"),
+ new TermsValuesSourceBuilder("long").field("long")
+ )
+ ).size(5000).subAggregation(topHits("top_hits").size(10))
+ )
+ .setBatchedReduceSize(randomIntBetween(2, 5)),
+ e -> {
+ var completeException = ExceptionsHelper.stackTrace(e);
+ // If a shard fails, we can't check reduction
+ assumeTrue(completeException, e.shardFailures().length == 0);
+ assertThat(e.getCause(), instanceOf(CircuitBreakingException.class));
+ assertThat(completeException, containsString("QueryPhaseResultConsumer.reduce"));
+ }
+ );
+ }
+ }
+
+ public void assertCBTrip(Supplier requestSupplier, Consumer exceptionCallback) {
+ try {
+ requestSupplier.get().get().decRef();
+
+ fail("Expected the breaker to trip");
+ } catch (SearchPhaseExecutionException e) {
+ exceptionCallback.accept(e);
+ }
+ }
+
+ private void createIndex() throws IOException {
+ XContentBuilder mappingBuilder = XContentFactory.jsonBuilder();
+ mappingBuilder.startObject();
+ mappingBuilder.startObject("properties");
+ {
+ mappingBuilder.startObject("integer");
+ mappingBuilder.field("type", "integer");
+ mappingBuilder.endObject();
+ }
+ {
+ mappingBuilder.startObject("long");
+ mappingBuilder.field("type", "long");
+ mappingBuilder.endObject();
+ }
+
+ mappingBuilder.endObject(); // properties
+ mappingBuilder.endObject();
+
+ assertAcked(
+ prepareCreate("index").setSettings(Settings.builder().put("index.number_of_shards", randomIntBetween(1, 10)).build())
+ .setMapping(mappingBuilder)
+ );
+ }
+
+ private void addDocs(int docCount, int integerFieldMvCount, int longFieldMvCount) throws IOException {
+ List docs = new ArrayList<>();
+ for (int i = 0; i < docCount; i++) {
+ XContentBuilder docSource = XContentFactory.jsonBuilder();
+ docSource.startObject();
+ final int docNumber = i;
+ List integerValues = IntStream.range(0, integerFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
+ List longValues = LongStream.range(0, longFieldMvCount).map(x -> docNumber + x * 100).boxed().toList();
+ docSource.field("integer", integerValues);
+ docSource.field("long", longValues);
+ docSource.endObject();
+
+ docs.add(prepareIndex("index").setOpType(DocWriteRequest.OpType.CREATE).setSource(docSource));
+ }
+ indexRandom(true, false, false, false, docs);
+ forceMerge(false);
+ flushAndRefresh("index");
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
index ec63d38616153..bee0004199596 100644
--- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
+++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
@@ -222,31 +222,35 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
final List topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
final Deque> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
- // consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
- // execution for shards on the coordinating node itself
- if (mergeResult != null) {
- consumePartialMergeResult(mergeResult, topDocsList, aggsList);
- }
- Tuple batchedResult;
- while ((batchedResult = batchedResults.poll()) != null) {
- topDocsStats.add(batchedResult.v1());
- consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
- }
- for (QuerySearchResult result : buffer) {
- topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
- if (topDocsList != null) {
- TopDocsAndMaxScore topDocs = result.consumeTopDocs();
- setShardIndex(topDocs.topDocs, result.getShardIndex());
- topDocsList.add(topDocs.topDocs);
- }
- }
+
SearchPhaseController.ReducedQueryPhase reducePhase;
long breakerSize = circuitBreakerBytes;
final InternalAggregations aggs;
try {
+ // consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
+ // execution for shards on the coordinating node itself
+ if (mergeResult != null) {
+ consumePartialMergeResult(mergeResult, topDocsList, aggsList);
+ breakerSize = addEstimateAndMaybeBreak(mergeResult.estimatedSize);
+ }
+ Tuple batchedResult;
+ while ((batchedResult = batchedResults.poll()) != null) {
+ topDocsStats.add(batchedResult.v1());
+ consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
+ // Add the estimate of the agg size
+ breakerSize = addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize);
+ }
+ for (QuerySearchResult result : buffer) {
+ topDocsStats.add(result.topDocs(), result.searchTimedOut(), result.terminatedEarly());
+ if (topDocsList != null) {
+ TopDocsAndMaxScore topDocs = result.consumeTopDocs();
+ setShardIndex(topDocs.topDocs, result.getShardIndex());
+ topDocsList.add(topDocs.topDocs);
+ }
+ }
if (aggsList != null) {
// Add an estimate of the final reduce size
- breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize));
+ breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(circuitBreakerBytes));
aggs = aggregate(buffer.iterator(), new Iterator<>() {
@Override
public boolean hasNext() {
@@ -275,7 +279,13 @@ public DelayableWriteable next() {
);
buffer = null;
} finally {
- releaseAggs(buffer);
+ // Buffer is non-null on exception
+ if (buffer != null) {
+ releaseAggs(buffer);
+ if (aggsList != null) {
+ Releasables.close(aggsList);
+ }
+ }
}
if (hasAggs
// reduced aggregations can be null if all shards failed
@@ -335,7 +345,7 @@ private MergeResult partialReduce(
List toConsume,
List processedShards,
TopDocsStats topDocsStats,
- MergeResult lastMerge,
+ @Nullable MergeResult lastMerge,
int numReducePhases
) {
// ensure consistent ordering
@@ -392,7 +402,7 @@ private MergeResult partialReduce(
return new MergeResult(
processedShards,
newTopDocs,
- newAggs == null ? null : DelayableWriteable.referencing(newAggs),
+ newAggs != null ? DelayableWriteable.referencing(newAggs) : null,
newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0
);
}
@@ -457,12 +467,13 @@ private long ramBytesUsedQueryResult(QuerySearchResult result) {
* Returns an estimation of the size that a reduce of the provided size
* would take on memory.
* This size is estimated as roughly 1.5 times the size of the serialized
- * aggregations that need to be reduced. This estimation can be completely
- * off for some aggregations but it is corrected with the real size after
- * the reduce completes.
+ * aggregations that need to be reduced.
+ * This method expects an already accounted size, so only an extra 0.5x is returned.
+ * This estimation can be completely off for some aggregations
+ * but it is corrected with the real size after the reduce completes.
*/
private static long estimateRamBytesUsedForReduce(long size) {
- return Math.round(1.5d * size - size);
+ return Math.round(0.5d * size);
}
private void consume(QuerySearchResult result, Runnable next) {
@@ -639,7 +650,7 @@ public void onFailure(Exception exc) {
});
}
- private static void releaseAggs(List toConsume) {
+ private static void releaseAggs(@Nullable List toConsume) {
if (toConsume != null) {
for (QuerySearchResult result : toConsume) {
result.releaseAggs();
diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java
index a2b6be6ab0363..ba66c2e9ae898 100644
--- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java
+++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java
@@ -75,7 +75,7 @@ public static DelayableWriteable referencing(Writeable.
}
}
- private DelayableWriteable() {}
+ protected DelayableWriteable() {}
/**
* Returns a {@linkplain DelayableWriteable} that stores its contents
diff --git a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java
index e0b68647289b2..7c2d8fe8d0166 100644
--- a/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java
+++ b/server/src/test/java/org/elasticsearch/action/search/QueryPhaseResultConsumerTests.java
@@ -13,8 +13,14 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
+import org.elasticsearch.common.io.stream.DelayableWriteable;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
+import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig;
@@ -25,7 +31,9 @@
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
+import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
@@ -40,7 +48,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.mock;
public class QueryPhaseResultConsumerTests extends ESTestCase {
@@ -148,6 +159,142 @@ public void testProgressListenerExceptionsAreCaught() throws Exception {
}
}
+ /**
+ * Adds batches with a high simulated size, expecting the CB to trip before deserialization.
+ */
+ public void testBatchedEstimateSizeTooBig() throws Exception {
+ SearchRequest searchRequest = new SearchRequest("index");
+ searchRequest.source(new SearchSourceBuilder().aggregation(new SumAggregationBuilder("sum")));
+
+ var aggCount = randomIntBetween(1, 10);
+ var circuitBreakerLimit = ByteSizeValue.ofMb(256);
+ var circuitBreaker = newLimitedBreaker(circuitBreakerLimit);
+ // More than what the CircuitBreaker should allow
+ long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 1.05 / aggCount);
+
+ try (
+ QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer(
+ searchRequest,
+ executor,
+ circuitBreaker,
+ searchPhaseController,
+ () -> false,
+ new SearchProgressListener() {
+ },
+ 10,
+ e -> {}
+ )
+ ) {
+ for (int i = 0; i < aggCount; i++) {
+ // Add a dummy merge result with a high estimated size
+ var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> {
+ fail("This shouldn't be called");
+ return null;
+ }), aggregationEstimatedSize);
+ queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult);
+ }
+
+ try {
+ queryPhaseResultConsumer.reduce();
+ fail("Expecting a circuit breaking exception to be thrown");
+ } catch (CircuitBreakingException e) {
+ // The last merge result estimate should break
+ assertThat(e.getBytesWanted(), equalTo(aggregationEstimatedSize));
+ }
+ }
+ }
+
+ /**
+ * Adds batches with a high simulated size, expecting the CB to trip before deserialization.
+ *
+ * Similar to {@link #testBatchedEstimateSizeTooBig()}, but this tests the extra size
+ *
+ */
+ public void testBatchedEstimateSizeTooBigAfterDeserialization() throws Exception {
+ SearchRequest searchRequest = new SearchRequest("index");
+ searchRequest.source(new SearchSourceBuilder().aggregation(new SumAggregationBuilder("sum")));
+
+ var aggCount = randomIntBetween(1, 10);
+ var circuitBreakerLimit = ByteSizeValue.ofMb(256);
+ var circuitBreaker = newLimitedBreaker(circuitBreakerLimit);
+ // Less than the CB, but more after the 1.5x
+ long aggregationEstimatedSize = (long) (circuitBreakerLimit.getBytes() * 0.75 / aggCount);
+ long totalAggregationsEstimatedSize = aggregationEstimatedSize * aggCount;
+
+ try (
+ QueryPhaseResultConsumer queryPhaseResultConsumer = new QueryPhaseResultConsumer(
+ searchRequest,
+ executor,
+ circuitBreaker,
+ searchPhaseController,
+ () -> false,
+ new SearchProgressListener() {
+ },
+ 10,
+ e -> {}
+ )
+ ) {
+ for (int i = 0; i < aggCount; i++) {
+ // Add a dummy merge result with a high estimated size
+ var mergeResult = new QueryPhaseResultConsumer.MergeResult(List.of(), null, new DelegatingDelayableWriteable<>(() -> {
+ fail("This shouldn't be called");
+ return null;
+ }), aggregationEstimatedSize);
+ queryPhaseResultConsumer.addBatchedPartialResult(new SearchPhaseController.TopDocsStats(0), mergeResult);
+ }
+
+ try {
+ queryPhaseResultConsumer.reduce();
+ fail("Expecting a circuit breaking exception to be thrown");
+ } catch (CircuitBreakingException e) {
+ assertThat(circuitBreaker.getUsed(), greaterThanOrEqualTo(aggregationEstimatedSize));
+ // A final +0.5x is added to account for the serialized->deserialized extra size
+ assertThat(e.getBytesWanted(), equalTo(Math.round(totalAggregationsEstimatedSize * 0.5)));
+ }
+ }
+ }
+
+ /**
+ * DelayableWriteable that delegates expansion to a supplier.
+ */
+ private static class DelegatingDelayableWriteable extends DelayableWriteable {
+ private final Supplier supplier;
+
+ private DelegatingDelayableWriteable(Supplier supplier) {
+ this.supplier = supplier;
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) {
+ throw new UnsupportedOperationException("Not to be called");
+ }
+
+ @Override
+ public T expand() {
+ return supplier.get();
+ }
+
+ @Override
+ public Serialized asSerialized(Reader reader, NamedWriteableRegistry registry) {
+ throw new UnsupportedOperationException("Not to be called");
+ }
+
+ @Override
+ public boolean isSerialized() {
+ return true;
+ }
+
+ @Override
+ public long getSerializedSize() {
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ // noop
+ }
+ }
+
private static class ThrowingSearchProgressListener extends SearchProgressListener {
private final AtomicInteger onQueryResult = new AtomicInteger(0);
private final AtomicInteger onPartialReduce = new AtomicInteger(0);
diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
index 1bf5e7d307777..4cd9ddaf88abe 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java
@@ -263,7 +263,7 @@
*
* A test cluster creates a set of nodes in the background before the test starts. The number of nodes in the cluster is
* determined at random and can change across tests. The {@link ClusterScope} allows configuring the initial number of nodes
- * that are created before the tests start.
+ * that are created before the tests start. More information about node configurations and settings in {@link InternalTestCluster}.
*
* {@literal @}NodeScope(scope=Scope.SUITE, numDataNodes=3)
* public class SomeIT extends ESIntegTestCase {
@@ -2149,7 +2149,7 @@ public enum Scope {
/**
* Indicates whether the cluster can have dedicated master nodes. If {@code false} means data nodes will serve as master nodes
- * and there will be no dedicated master (and data) nodes. Default is {@code false} which means
+ * and there will be no dedicated master (and data) nodes. Default is {@code true} which means
* dedicated master nodes will be randomly used.
*/
boolean supportsDedicatedMasters() default true;
diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
index 453cddd504364..dbf88cee69778 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
@@ -176,6 +176,20 @@
* to the async nature of Elasticsearch in combination with randomized testing. Once Threads and asynchronous calls
* are involved reproducibility is very limited. This class should only be used through {@link ESIntegTestCase}.
*
+ * {@link NodeConfigurationSource#nodeSettings} ordinals
+ * The supplied node ordinal for this node settings getter is generated as follows:
+ *
+ * -
+ * The first indices correspond to the dedicated master nodes, enabled with the {@code randomlyAddDedicatedMasters} setting.
+ * Its amount can't currently be configured.
+ *
+ * -
+ * Then, the data nodes, controlled by the @{code minNumDataNodes} and @{code maxNumDataNodes} settings.
+ *
+ * -
+ * Finally, the coordinating-only, configured with the {@code numClientNodes} setting.
+ *
+ *
*/
public final class InternalTestCluster extends TestCluster {