Skip to content

Commit b8f704b

Browse files
committed
Deduplicate allocation stats calls
These things can be quite expensive and there's no need to recompute them in parallel across all management threads as done today. This commit adds a deduplicator to avoid redundant work. Backport of elastic#123246 to `8.x`
1 parent 2edbcb8 commit b8f704b

File tree

4 files changed

+82
-7
lines changed

4 files changed

+82
-7
lines changed

docs/changelog/123246.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 123246
2+
summary: Deduplicate allocation stats calls
3+
area: Allocation
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.ActionRequestValidationException;
1515
import org.elasticsearch.action.ActionResponse;
16+
import org.elasticsearch.action.ActionRunnable;
1617
import org.elasticsearch.action.ActionType;
18+
import org.elasticsearch.action.SingleResultDeduplicator;
1719
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
1820
import org.elasticsearch.action.support.ActionFilters;
21+
import org.elasticsearch.action.support.SubscribableListener;
1922
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
2023
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
2124
import org.elasticsearch.cluster.ClusterState;
@@ -28,6 +31,7 @@
2831
import org.elasticsearch.cluster.service.ClusterService;
2932
import org.elasticsearch.common.io.stream.StreamInput;
3033
import org.elasticsearch.common.io.stream.StreamOutput;
34+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3135
import org.elasticsearch.core.Nullable;
3236
import org.elasticsearch.core.TimeValue;
3337
import org.elasticsearch.features.FeatureService;
@@ -47,7 +51,7 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
4751

4852
public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");
4953

50-
private final AllocationStatsService allocationStatsService;
54+
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
5155
private final DiskThresholdSettings diskThresholdSettings;
5256
private final FeatureService featureService;
5357

@@ -69,9 +73,15 @@ public TransportGetAllocationStatsAction(
6973
actionFilters,
7074
TransportGetAllocationStatsAction.Request::new,
7175
TransportGetAllocationStatsAction.Response::new,
72-
threadPool.executor(ThreadPool.Names.MANAGEMENT)
76+
// DIRECT is ok here because we fork the allocation stats computation onto a MANAGEMENT thread if needed, or else we return
77+
// very cheaply.
78+
EsExecutors.DIRECT_EXECUTOR_SERVICE
79+
);
80+
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
81+
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
82+
threadPool.getThreadContext(),
83+
l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats))
7384
);
74-
this.allocationStatsService = allocationStatsService;
7585
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
7686
this.featureService = featureService;
7787
}
@@ -88,15 +98,21 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
8898

8999
@Override
90100
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
91-
listener.onResponse(
92-
new Response(
93-
request.metrics().contains(Metric.ALLOCATIONS) ? allocationStatsService.stats() : Map.of(),
101+
// NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
102+
103+
final SubscribableListener<Map<String, NodeAllocationStats>> allocationStatsStep = request.metrics().contains(Metric.ALLOCATIONS)
104+
? SubscribableListener.newForked(allocationStatsSupplier::execute)
105+
: SubscribableListener.newSucceeded(Map.of());
106+
107+
allocationStatsStep.andThenApply(
108+
allocationStats -> new Response(
109+
allocationStats,
94110
request.metrics().contains(Metric.FS)
95111
&& featureService.clusterHasFeature(clusterService.state(), AllocationStatsFeatures.INCLUDE_DISK_THRESHOLD_SETTINGS)
96112
? diskThresholdSettings
97113
: null
98114
)
99-
);
115+
).addListener(listener);
100116
}
101117

102118
@Override

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
1919
import org.elasticsearch.cluster.service.ClusterService;
2020
import org.elasticsearch.common.util.Maps;
21+
import org.elasticsearch.transport.Transports;
2122

2223
import java.util.Map;
2324

@@ -41,6 +42,8 @@ public AllocationStatsService(
4142
}
4243

4344
public Map<String, NodeAllocationStats> stats() {
45+
assert Transports.assertNotTransportThread("too expensive for a transport worker");
46+
4447
var state = clusterService.state();
4548
var info = clusterInfoService.getClusterInfo();
4649
var desiredBalance = desiredBalanceShardsAllocator != null ? desiredBalanceShardsAllocator.getDesiredBalance() : null;

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,13 @@
3232
import java.util.EnumSet;
3333
import java.util.Map;
3434
import java.util.Set;
35+
import java.util.concurrent.CyclicBarrier;
36+
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.concurrent.atomic.AtomicInteger;
3538

3639
import static org.hamcrest.Matchers.anEmptyMap;
40+
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3742
import static org.hamcrest.Matchers.not;
3843
import static org.mockito.ArgumentMatchers.any;
3944
import static org.mockito.ArgumentMatchers.eq;
@@ -120,4 +125,50 @@ public void testReturnsOnlyRequestedStats() throws Exception {
120125
assertNull(response.getDiskThresholdSettings());
121126
}
122127
}
128+
129+
public void testDeduplicatesStatsComputations() throws InterruptedException {
130+
final var requestCounter = new AtomicInteger();
131+
final var isExecuting = new AtomicBoolean();
132+
when(allocationStatsService.stats()).thenAnswer(invocation -> {
133+
try {
134+
assertTrue(isExecuting.compareAndSet(false, true));
135+
assertThat(Thread.currentThread().getName(), containsString("[management]"));
136+
return Map.of(Integer.toString(requestCounter.incrementAndGet()), NodeAllocationStatsTests.randomNodeAllocationStats());
137+
} finally {
138+
Thread.yield();
139+
assertTrue(isExecuting.compareAndSet(true, false));
140+
}
141+
});
142+
143+
final var threads = new Thread[between(1, 5)];
144+
final var startBarrier = new CyclicBarrier(threads.length);
145+
for (int i = 0; i < threads.length; i++) {
146+
threads[i] = new Thread(() -> {
147+
safeAwait(startBarrier);
148+
149+
final var minRequestIndex = requestCounter.get();
150+
151+
final TransportGetAllocationStatsAction.Response response = safeAwait(
152+
l -> action.masterOperation(
153+
mock(Task.class),
154+
new TransportGetAllocationStatsAction.Request(
155+
TEST_REQUEST_TIMEOUT,
156+
TaskId.EMPTY_TASK_ID,
157+
EnumSet.of(Metric.ALLOCATIONS)
158+
),
159+
ClusterState.EMPTY_STATE,
160+
l
161+
)
162+
);
163+
164+
final var requestIndex = Integer.valueOf(response.getNodeAllocationStats().keySet().iterator().next());
165+
assertThat(requestIndex, greaterThanOrEqualTo(minRequestIndex)); // did not get a stale result
166+
}, "thread-" + i);
167+
threads[i].start();
168+
}
169+
170+
for (final var thread : threads) {
171+
thread.join();
172+
}
173+
}
123174
}

0 commit comments

Comments
 (0)