|
10 | 10 |
|
11 | 11 | import org.apache.lucene.index.DirectoryReader; |
12 | 12 | import org.apache.lucene.store.LockObtainFailedException; |
| 13 | +import org.apache.lucene.util.SetOnce; |
13 | 14 | import org.elasticsearch.ExceptionsHelper; |
14 | 15 | import org.elasticsearch.action.ActionListener; |
15 | 16 | import org.elasticsearch.action.index.IndexRequest; |
|
19 | 20 | import org.elasticsearch.cluster.ClusterInfoServiceUtils; |
20 | 21 | import org.elasticsearch.cluster.ClusterState; |
21 | 22 | import org.elasticsearch.cluster.InternalClusterInfoService; |
| 23 | +import org.elasticsearch.cluster.ShardHeapUsage; |
| 24 | +import org.elasticsearch.cluster.ShardHeapUsageCollector; |
22 | 25 | import org.elasticsearch.cluster.metadata.IndexMetadata; |
23 | 26 | import org.elasticsearch.cluster.node.DiscoveryNode; |
24 | 27 | import org.elasticsearch.cluster.node.DiscoveryNodeUtils; |
|
62 | 65 | import org.elasticsearch.indices.IndicesService; |
63 | 66 | import org.elasticsearch.indices.breaker.CircuitBreakerService; |
64 | 67 | import org.elasticsearch.indices.recovery.RecoveryState; |
| 68 | +import org.elasticsearch.plugins.ClusterPlugin; |
65 | 69 | import org.elasticsearch.plugins.Plugin; |
66 | 70 | import org.elasticsearch.search.builder.SearchSourceBuilder; |
67 | 71 | import org.elasticsearch.test.DummyShardLock; |
|
82 | 86 | import java.util.Collections; |
83 | 87 | import java.util.List; |
84 | 88 | import java.util.Locale; |
| 89 | +import java.util.Map; |
85 | 90 | import java.util.Optional; |
86 | 91 | import java.util.concurrent.BrokenBarrierException; |
87 | 92 | import java.util.concurrent.CountDownLatch; |
|
90 | 95 | import java.util.concurrent.atomic.AtomicBoolean; |
91 | 96 | import java.util.concurrent.atomic.AtomicReference; |
92 | 97 | import java.util.function.Predicate; |
| 98 | +import java.util.stream.Collectors; |
93 | 99 | import java.util.stream.Stream; |
94 | 100 |
|
95 | 101 | import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength; |
|
111 | 117 | import static org.hamcrest.Matchers.equalTo; |
112 | 118 | import static org.hamcrest.Matchers.greaterThan; |
113 | 119 | import static org.hamcrest.Matchers.instanceOf; |
| 120 | +import static org.hamcrest.Matchers.lessThanOrEqualTo; |
114 | 121 |
|
115 | 122 | public class IndexShardIT extends ESSingleNodeTestCase { |
116 | 123 |
|
117 | 124 | @Override |
118 | 125 | protected Collection<Class<? extends Plugin>> getPlugins() { |
119 | | - return pluginList(InternalSettingsPlugin.class); |
| 126 | + return pluginList(InternalSettingsPlugin.class, BogusShardHeapUsagePlugin.class); |
120 | 127 | } |
121 | 128 |
|
122 | 129 | public void testLockTryingToDelete() throws Exception { |
@@ -254,6 +261,20 @@ public void testExpectedShardSizeIsPresent() throws InterruptedException { |
254 | 261 | assertThat(dataSetSize.get(), greaterThan(0L)); |
255 | 262 | } |
256 | 263 |
|
| 264 | + public void testHeapUsageEstimateIsPresent() { |
| 265 | + InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); |
| 266 | + ClusterInfoServiceUtils.refresh(clusterInfoService); |
| 267 | + ClusterState state = getInstanceFromNode(ClusterService.class).state(); |
| 268 | + Map<String, ShardHeapUsage> shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages(); |
| 269 | + assertNotNull(shardHeapUsages); |
| 270 | + assertEquals(state.nodes().size(), shardHeapUsages.size()); |
| 271 | + for (DiscoveryNode node : state.nodes()) { |
| 272 | + assertTrue(shardHeapUsages.containsKey(node.getId())); |
| 273 | + ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId()); |
| 274 | + assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes())); |
| 275 | + } |
| 276 | + } |
| 277 | + |
257 | 278 | public void testIndexCanChangeCustomDataPath() throws Exception { |
258 | 279 | final String index = "test-custom-data-path"; |
259 | 280 | final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10)); |
@@ -797,4 +818,40 @@ private static void assertAllIndicesRemovedAndDeletionCompleted(Iterable<Indices |
797 | 818 | assertBusy(() -> assertFalse(indicesService.hasUncompletedPendingDeletes()), 1, TimeUnit.MINUTES); |
798 | 819 | } |
799 | 820 | } |
| 821 | + |
| 822 | + public static class BogusShardShardHeapUsageCollector implements ShardHeapUsageCollector { |
| 823 | + |
| 824 | + private final BogusShardHeapUsagePlugin plugin; |
| 825 | + |
| 826 | + public BogusShardShardHeapUsageCollector(BogusShardHeapUsagePlugin plugin) { |
| 827 | + this.plugin = plugin; |
| 828 | + } |
| 829 | + |
| 830 | + @Override |
| 831 | + public void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener) { |
| 832 | + ActionListener.completeWith( |
| 833 | + listener, |
| 834 | + () -> plugin.getClusterService() |
| 835 | + .state() |
| 836 | + .nodes() |
| 837 | + .stream() |
| 838 | + .collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> randomNonNegativeLong())) |
| 839 | + ); |
| 840 | + } |
| 841 | + } |
| 842 | + |
| 843 | + public static class BogusShardHeapUsagePlugin extends Plugin implements ClusterPlugin { |
| 844 | + |
| 845 | + private final SetOnce<ClusterService> clusterService = new SetOnce<>(); |
| 846 | + |
| 847 | + @Override |
| 848 | + public Collection<?> createComponents(PluginServices services) { |
| 849 | + clusterService.set(services.clusterService()); |
| 850 | + return List.of(); |
| 851 | + } |
| 852 | + |
| 853 | + public ClusterService getClusterService() { |
| 854 | + return clusterService.get(); |
| 855 | + } |
| 856 | + } |
800 | 857 | } |
0 commit comments