Skip to content

Commit f4b90b5

Browse files
committed
Load ShardHeapUsageSupplier via SPI
1 parent 55637b6 commit f4b90b5

File tree

7 files changed

+59
-36
lines changed

7 files changed

+59
-36
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.lucene.index.DirectoryReader;
1212
import org.apache.lucene.store.LockObtainFailedException;
13+
import org.apache.lucene.util.SetOnce;
1314
import org.elasticsearch.ExceptionsHelper;
1415
import org.elasticsearch.action.ActionListener;
1516
import org.elasticsearch.action.index.IndexRequest;
@@ -816,36 +817,43 @@ private static void assertAllIndicesRemovedAndDeletionCompleted(Iterable<Indices
816817
}
817818
}
818819

819-
private static class BogusShardShardHeapUsageSupplier implements ShardHeapUsageSupplier {
820+
public static class BogusShardShardHeapUsageSupplier implements ShardHeapUsageSupplier {
820821

821-
private final ClusterService clusterService;
822+
private final BogusShardHeapUsagePlugin plugin;
822823

823-
private BogusShardShardHeapUsageSupplier(ClusterService clusterService) {
824-
this.clusterService = clusterService;
824+
public BogusShardShardHeapUsageSupplier(BogusShardHeapUsagePlugin plugin) {
825+
this.plugin = plugin;
825826
}
826827

827828
@Override
828829
public void getClusterHeapUsage(ActionListener<Map<String, ShardHeapUsage>> listener) {
829830
ActionListener.completeWith(
830831
listener,
831-
() -> clusterService.state().nodes().stream().collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> {
832-
final long maxHeap = randomNonNegativeLong();
833-
final long usedHeap = (long) (randomFloat() * maxHeap);
834-
return new ShardHeapUsage(node.getId(), node.getName(), maxHeap, usedHeap);
835-
}))
832+
() -> plugin.getClusterService()
833+
.state()
834+
.nodes()
835+
.stream()
836+
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> {
837+
final long maxHeap = randomNonNegativeLong();
838+
final long usedHeap = (long) (randomFloat() * maxHeap);
839+
return new ShardHeapUsage(node.getId(), node.getName(), maxHeap, usedHeap);
840+
}))
836841
);
837842
}
838843
}
839844

840845
public static class BogusShardHeapUsagePlugin extends Plugin implements ClusterPlugin {
841846

842-
public BogusShardHeapUsagePlugin() {}
847+
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
843848

844849
@Override
845850
public Collection<?> createComponents(PluginServices services) {
846-
BogusShardShardHeapUsageSupplier bogusHeapUsageSupplier = new BogusShardShardHeapUsageSupplier(services.clusterService());
847-
services.allocationService().setHeapUsageSupplier(bogusHeapUsageSupplier);
851+
clusterService.set(services.clusterService());
848852
return List.of();
849853
}
854+
855+
public ClusterService getClusterService() {
856+
return clusterService.get();
857+
}
850858
}
851859
}

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,19 +97,26 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
9797

9898
private final Object mutex = new Object();
9999
private final List<ActionListener<ClusterInfo>> nextRefreshListeners = new ArrayList<>();
100+
private final ShardHeapUsageSupplier shardHeapUsageSupplier;
100101

101-
private ShardHeapUsageSupplier shardHeapUsageSupplier = ShardHeapUsageSupplier.EMPTY;
102102
private AsyncRefresh currentRefresh;
103103
private RefreshScheduler refreshScheduler;
104104

105105
@SuppressWarnings("this-escape")
106-
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
106+
public InternalClusterInfoService(
107+
Settings settings,
108+
ClusterService clusterService,
109+
ThreadPool threadPool,
110+
Client client,
111+
ShardHeapUsageSupplier shardHeapUsageSupplier
112+
) {
107113
this.leastAvailableSpaceUsages = Map.of();
108114
this.mostAvailableSpaceUsages = Map.of();
109115
this.shardHeapUsages = Map.of();
110116
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
111117
this.threadPool = threadPool;
112118
this.client = client;
119+
this.shardHeapUsageSupplier = shardHeapUsageSupplier;
113120
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
114121
this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
115122
this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
@@ -134,16 +141,6 @@ void setUpdateFrequency(TimeValue updateFrequency) {
134141
this.updateFrequency = updateFrequency;
135142
}
136143

137-
/**
138-
* This can be provided by plugins, which are initialised long after the ClusterInfoService is created
139-
*
140-
* @param shardHeapUsageSupplier The HeapUsageSupplier to use
141-
*/
142-
public void setShardHeapUsageSupplier(ShardHeapUsageSupplier shardHeapUsageSupplier) {
143-
assert this.shardHeapUsageSupplier == ShardHeapUsageSupplier.EMPTY;
144-
this.shardHeapUsageSupplier = shardHeapUsageSupplier;
145-
}
146-
147144
@Override
148145
public void clusterChanged(ClusterChangedEvent event) {
149146
final Runnable newRefresh;

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
import org.elasticsearch.cluster.ClusterChangedEvent;
1616
import org.elasticsearch.cluster.ClusterInfoService;
1717
import org.elasticsearch.cluster.ClusterState;
18-
import org.elasticsearch.cluster.InternalClusterInfoService;
1918
import org.elasticsearch.cluster.RestoreInProgress;
20-
import org.elasticsearch.cluster.ShardHeapUsageSupplier;
2119
import org.elasticsearch.cluster.health.ClusterHealthStatus;
2220
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
2321
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -639,12 +637,6 @@ public void addAllocFailuresResetListenerTo(ClusterService clusterService) {
639637
});
640638
}
641639

642-
public void setHeapUsageSupplier(ShardHeapUsageSupplier shardHeapUsageSupplier) {
643-
if (clusterInfoService instanceof InternalClusterInfoService internalClusterInfoService) {
644-
internalClusterInfoService.setShardHeapUsageSupplier(shardHeapUsageSupplier);
645-
}
646-
}
647-
648640
/**
649641
* We should reset allocation/relocation failure count to allow further retries when:
650642
*

server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.client.internal.node.NodeClient;
1414
import org.elasticsearch.cluster.ClusterInfoService;
1515
import org.elasticsearch.cluster.InternalClusterInfoService;
16+
import org.elasticsearch.cluster.ShardHeapUsageSupplier;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.service.ClusterService;
1819
import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -74,7 +75,17 @@ ClusterInfoService newClusterInfoService(
7475
ThreadPool threadPool,
7576
NodeClient client
7677
) {
77-
final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client);
78+
final ShardHeapUsageSupplier shardHeapUsageSupplier = pluginsService.loadSingletonServiceProvider(
79+
ShardHeapUsageSupplier.class,
80+
() -> ShardHeapUsageSupplier.EMPTY
81+
);
82+
final InternalClusterInfoService service = new InternalClusterInfoService(
83+
settings,
84+
clusterService,
85+
threadPool,
86+
client,
87+
shardHeapUsageSupplier
88+
);
7889
if (DiscoveryNode.isMasterNode(settings)) {
7990
// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
8091
clusterService.addListener(service);

server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,14 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
7676
final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
7777

7878
final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool);
79-
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client);
8079
final ShardHeapUsageSupplier mockShardHeapUsageSupplier = spy(new StubShardShardHeapUsageSupplier());
81-
clusterInfoService.setShardHeapUsageSupplier(mockShardHeapUsageSupplier);
80+
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(
81+
settings,
82+
clusterService,
83+
threadPool,
84+
client,
85+
mockShardHeapUsageSupplier
86+
);
8287
clusterService.addListener(clusterInfoService);
8388
clusterInfoService.addListener(ignored -> {});
8489

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#
2+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
# or more contributor license agreements. Licensed under the "Elastic License
4+
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
# Public License v 1"; you may not use this file except in compliance with, at
6+
# your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
# License v3.0 only", or the "Server Side Public License, v 1".
8+
#
9+
10+
org.elasticsearch.index.shard.IndexShardIT$BogusShardShardHeapUsageSupplier

test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static class TestPlugin extends Plugin {}
4343
private volatile BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction;
4444

4545
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
46-
super(settings, clusterService, threadPool, client);
46+
super(settings, clusterService, threadPool, client, ShardHeapUsageSupplier.EMPTY);
4747
}
4848

4949
public void setDiskUsageFunctionAndRefresh(BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFn) {

0 commit comments

Comments
 (0)