Skip to content

Commit b0834ea

Browse files
committed
Deduplicate allocation stats calls (#123246)
These things can be quite expensive and there's no need to recompute them in parallel across all management threads as done today. This commit adds a deduplicator to avoid redundant work.
1 parent e716be1 commit b0834ea

File tree

4 files changed

+81
-9
lines changed

4 files changed

+81
-9
lines changed

docs/changelog/123246.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 123246
2+
summary: Deduplicate allocation stats calls
3+
area: Allocation
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.ActionRequestValidationException;
1515
import org.elasticsearch.action.ActionResponse;
16+
import org.elasticsearch.action.ActionRunnable;
1617
import org.elasticsearch.action.ActionType;
18+
import org.elasticsearch.action.SingleResultDeduplicator;
1719
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
1820
import org.elasticsearch.action.support.ActionFilters;
21+
import org.elasticsearch.action.support.SubscribableListener;
1922
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
2023
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
2124
import org.elasticsearch.cluster.ClusterState;
@@ -28,6 +31,7 @@
2831
import org.elasticsearch.cluster.service.ClusterService;
2932
import org.elasticsearch.common.io.stream.StreamInput;
3033
import org.elasticsearch.common.io.stream.StreamOutput;
34+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3135
import org.elasticsearch.core.Nullable;
3236
import org.elasticsearch.core.TimeValue;
3337
import org.elasticsearch.injection.guice.Inject;
@@ -46,7 +50,7 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
4650

4751
public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");
4852

49-
private final AllocationStatsService allocationStatsService;
53+
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
5054
private final DiskThresholdSettings diskThresholdSettings;
5155

5256
@Inject
@@ -66,9 +70,15 @@ public TransportGetAllocationStatsAction(
6670
actionFilters,
6771
TransportGetAllocationStatsAction.Request::new,
6872
TransportGetAllocationStatsAction.Response::new,
69-
threadPool.executor(ThreadPool.Names.MANAGEMENT)
73+
// DIRECT is ok here because we fork the allocation stats computation onto a MANAGEMENT thread if needed, or else we return
74+
// very cheaply.
75+
EsExecutors.DIRECT_EXECUTOR_SERVICE
76+
);
77+
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
78+
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
79+
threadPool.getThreadContext(),
80+
l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats))
7081
);
71-
this.allocationStatsService = allocationStatsService;
7282
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
7383
}
7484

@@ -84,12 +94,15 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
8494

8595
@Override
8696
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
87-
listener.onResponse(
88-
new Response(
89-
request.metrics().contains(Metric.ALLOCATIONS) ? allocationStatsService.stats() : Map.of(),
90-
request.metrics().contains(Metric.FS) ? diskThresholdSettings : null
91-
)
92-
);
97+
// NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
98+
99+
final SubscribableListener<Map<String, NodeAllocationStats>> allocationStatsStep = request.metrics().contains(Metric.ALLOCATIONS)
100+
? SubscribableListener.newForked(allocationStatsSupplier::execute)
101+
: SubscribableListener.newSucceeded(Map.of());
102+
103+
allocationStatsStep.andThenApply(
104+
allocationStats -> new Response(allocationStats, request.metrics().contains(Metric.FS) ? diskThresholdSettings : null)
105+
).addListener(listener);
93106
}
94107

95108
@Override

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
1515
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
1616
import org.elasticsearch.cluster.service.ClusterService;
17+
import org.elasticsearch.transport.Transports;
1718

1819
import java.util.Map;
1920
import java.util.function.Supplier;
@@ -46,6 +47,8 @@ public AllocationStatsService(
4647
* Returns a map of node IDs to node allocation stats.
4748
*/
4849
public Map<String, NodeAllocationStats> stats() {
50+
assert Transports.assertNotTransportThread("too expensive for a transport worker");
51+
4952
var clusterState = clusterService.state();
5053
var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights(
5154
clusterState.metadata(),

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,13 @@
3131
import java.util.EnumSet;
3232
import java.util.Map;
3333
import java.util.Set;
34+
import java.util.concurrent.CyclicBarrier;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicInteger;
3437

3538
import static org.hamcrest.Matchers.anEmptyMap;
39+
import static org.hamcrest.Matchers.containsString;
40+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3641
import static org.hamcrest.Matchers.not;
3742
import static org.mockito.Mockito.mock;
3843
import static org.mockito.Mockito.never;
@@ -112,4 +117,50 @@ public void testReturnsOnlyRequestedStats() throws Exception {
112117
assertNull(response.getDiskThresholdSettings());
113118
}
114119
}
120+
121+
public void testDeduplicatesStatsComputations() throws InterruptedException {
122+
final var requestCounter = new AtomicInteger();
123+
final var isExecuting = new AtomicBoolean();
124+
when(allocationStatsService.stats()).thenAnswer(invocation -> {
125+
try {
126+
assertTrue(isExecuting.compareAndSet(false, true));
127+
assertThat(Thread.currentThread().getName(), containsString("[management]"));
128+
return Map.of(Integer.toString(requestCounter.incrementAndGet()), NodeAllocationStatsTests.randomNodeAllocationStats());
129+
} finally {
130+
Thread.yield();
131+
assertTrue(isExecuting.compareAndSet(true, false));
132+
}
133+
});
134+
135+
final var threads = new Thread[between(1, 5)];
136+
final var startBarrier = new CyclicBarrier(threads.length);
137+
for (int i = 0; i < threads.length; i++) {
138+
threads[i] = new Thread(() -> {
139+
safeAwait(startBarrier);
140+
141+
final var minRequestIndex = requestCounter.get();
142+
143+
final TransportGetAllocationStatsAction.Response response = safeAwait(
144+
l -> action.masterOperation(
145+
mock(Task.class),
146+
new TransportGetAllocationStatsAction.Request(
147+
TEST_REQUEST_TIMEOUT,
148+
TaskId.EMPTY_TASK_ID,
149+
EnumSet.of(Metric.ALLOCATIONS)
150+
),
151+
ClusterState.EMPTY_STATE,
152+
l
153+
)
154+
);
155+
156+
final var requestIndex = Integer.valueOf(response.getNodeAllocationStats().keySet().iterator().next());
157+
assertThat(requestIndex, greaterThanOrEqualTo(minRequestIndex)); // did not get a stale result
158+
}, "thread-" + i);
159+
threads[i].start();
160+
}
161+
162+
for (final var thread : threads) {
163+
thread.join();
164+
}
165+
}
115166
}

0 commit comments

Comments
 (0)