|
21 | 21 | import org.elasticsearch.cluster.node.DiscoveryNode; |
22 | 22 | import org.elasticsearch.cluster.node.DiscoveryNodeUtils; |
23 | 23 | import org.elasticsearch.cluster.node.DiscoveryNodes; |
| 24 | +import org.elasticsearch.cluster.routing.RerouteService; |
| 25 | +import org.elasticsearch.cluster.routing.allocation.NodeUsageStatsForThreadPoolsMonitor; |
24 | 26 | import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; |
25 | 27 | import org.elasticsearch.cluster.service.ClusterApplierService; |
26 | 28 | import org.elasticsearch.cluster.service.ClusterService; |
27 | 29 | import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; |
28 | 30 | import org.elasticsearch.cluster.service.MasterService; |
| 31 | +import org.elasticsearch.common.Priority; |
29 | 32 | import org.elasticsearch.common.settings.ClusterSettings; |
30 | 33 | import org.elasticsearch.common.settings.Settings; |
31 | 34 | import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; |
@@ -95,6 +98,18 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { |
95 | 98 | mockEstimatedHeapUsageCollector, |
96 | 99 | nodeUsageStatsForThreadPoolsCollector |
97 | 100 | ); |
| 101 | + final NodeUsageStatsForThreadPoolsMonitor usageMonitor = spy( |
| 102 | + new NodeUsageStatsForThreadPoolsMonitor( |
| 103 | + clusterService.getClusterSettings(), |
| 104 | + threadPool.relativeTimeInMillisSupplier(), |
| 105 | + clusterService::state, |
| 106 | + new RerouteService() { |
| 107 | + @Override |
| 108 | + public void reroute(String reason, Priority priority, ActionListener<Void> listener) {} |
| 109 | + } |
| 110 | + ) |
| 111 | + ); |
| 112 | + clusterInfoService.addListener(usageMonitor::onNewInfo); |
98 | 113 | clusterService.addListener(clusterInfoService); |
99 | 114 | clusterInfoService.addListener(ignored -> {}); |
100 | 115 |
|
@@ -132,13 +147,15 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { |
132 | 147 | for (int i = 0; i < 3; i++) { |
133 | 148 | Mockito.clearInvocations(mockEstimatedHeapUsageCollector); |
134 | 149 | Mockito.clearInvocations(nodeUsageStatsForThreadPoolsCollector); |
| 150 | + Mockito.clearInvocations(usageMonitor); |
135 | 151 | final int initialRequestCount = client.requestCount; |
136 | 152 | final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); |
137 | 153 | runFor(deterministicTaskQueue, duration); |
138 | 154 | deterministicTaskQueue.runAllRunnableTasks(); |
139 | 155 | assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval |
140 | 156 | verify(mockEstimatedHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval |
141 | 157 | verify(nodeUsageStatsForThreadPoolsCollector).collectUsageStats(any(), any(), any()); |
| 158 | + verify(usageMonitor).onNewInfo(any()); |
142 | 159 | } |
143 | 160 |
|
144 | 161 | final AtomicBoolean failMaster2 = new AtomicBoolean(); |
|
0 commit comments