Skip to content

Commit cc3c387

Browse files
authored
Deduplicate allocation stats calls (#123267) (#123280)
These things can be quite expensive and there's no need to recompute them in parallel across all management threads as done today. This commit adds a deduplicator to avoid redundant work. Backport of #123246 to `8.x`
1 parent 9cc7573 commit cc3c387

File tree

4 files changed

+82
-7
lines changed

4 files changed

+82
-7
lines changed

docs/changelog/123246.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 123246
2+
summary: Deduplicate allocation stats calls
3+
area: Allocation
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.ActionRequestValidationException;
1515
import org.elasticsearch.action.ActionResponse;
16+
import org.elasticsearch.action.ActionRunnable;
1617
import org.elasticsearch.action.ActionType;
18+
import org.elasticsearch.action.SingleResultDeduplicator;
1719
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
1820
import org.elasticsearch.action.support.ActionFilters;
21+
import org.elasticsearch.action.support.SubscribableListener;
1922
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
2023
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
2124
import org.elasticsearch.cluster.ClusterState;
@@ -28,6 +31,7 @@
2831
import org.elasticsearch.cluster.service.ClusterService;
2932
import org.elasticsearch.common.io.stream.StreamInput;
3033
import org.elasticsearch.common.io.stream.StreamOutput;
34+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3135
import org.elasticsearch.core.Nullable;
3236
import org.elasticsearch.core.TimeValue;
3337
import org.elasticsearch.features.FeatureService;
@@ -47,7 +51,7 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
4751

4852
public static final ActionType<TransportGetAllocationStatsAction.Response> TYPE = new ActionType<>("cluster:monitor/allocation/stats");
4953

50-
private final AllocationStatsService allocationStatsService;
54+
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
5155
private final DiskThresholdSettings diskThresholdSettings;
5256
private final FeatureService featureService;
5357

@@ -70,9 +74,15 @@ public TransportGetAllocationStatsAction(
7074
TransportGetAllocationStatsAction.Request::new,
7175
indexNameExpressionResolver,
7276
TransportGetAllocationStatsAction.Response::new,
73-
threadPool.executor(ThreadPool.Names.MANAGEMENT)
77+
// DIRECT is ok here because we fork the allocation stats computation onto a MANAGEMENT thread if needed, or else we return
78+
// very cheaply.
79+
EsExecutors.DIRECT_EXECUTOR_SERVICE
80+
);
81+
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
82+
this.allocationStatsSupplier = new SingleResultDeduplicator<>(
83+
threadPool.getThreadContext(),
84+
l -> managementExecutor.execute(ActionRunnable.supply(l, allocationStatsService::stats))
7485
);
75-
this.allocationStatsService = allocationStatsService;
7686
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
7787
this.featureService = featureService;
7888
}
@@ -89,15 +99,21 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
8999

90100
@Override
91101
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
92-
listener.onResponse(
93-
new Response(
94-
request.metrics().contains(Metric.ALLOCATIONS) ? allocationStatsService.stats() : Map.of(),
102+
// NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
103+
104+
final SubscribableListener<Map<String, NodeAllocationStats>> allocationStatsStep = request.metrics().contains(Metric.ALLOCATIONS)
105+
? SubscribableListener.newForked(allocationStatsSupplier::execute)
106+
: SubscribableListener.newSucceeded(Map.of());
107+
108+
allocationStatsStep.andThenApply(
109+
allocationStats -> new Response(
110+
allocationStats,
95111
request.metrics().contains(Metric.FS)
96112
&& featureService.clusterHasFeature(clusterService.state(), AllocationStatsFeatures.INCLUDE_DISK_THRESHOLD_SETTINGS)
97113
? diskThresholdSettings
98114
: null
99115
)
100-
);
116+
).addListener(listener);
101117
}
102118

103119
@Override

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
1919
import org.elasticsearch.cluster.service.ClusterService;
2020
import org.elasticsearch.common.util.Maps;
21+
import org.elasticsearch.transport.Transports;
2122

2223
import java.util.Map;
2324

@@ -41,6 +42,8 @@ public AllocationStatsService(
4142
}
4243

4344
public Map<String, NodeAllocationStats> stats() {
45+
assert Transports.assertNotTransportThread("too expensive for a transport worker");
46+
4447
var state = clusterService.state();
4548
var info = clusterInfoService.getClusterInfo();
4649
var desiredBalance = desiredBalanceShardsAllocator != null ? desiredBalanceShardsAllocator.getDesiredBalance() : null;

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsActionTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,13 @@
3232
import java.util.EnumSet;
3333
import java.util.Map;
3434
import java.util.Set;
35+
import java.util.concurrent.CyclicBarrier;
36+
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.concurrent.atomic.AtomicInteger;
3538

3639
import static org.hamcrest.Matchers.anEmptyMap;
40+
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
3742
import static org.hamcrest.Matchers.not;
3843
import static org.mockito.ArgumentMatchers.any;
3944
import static org.mockito.ArgumentMatchers.eq;
@@ -120,4 +125,50 @@ public void testReturnsOnlyRequestedStats() throws Exception {
120125
assertNull(response.getDiskThresholdSettings());
121126
}
122127
}
128+
129+
public void testDeduplicatesStatsComputations() throws InterruptedException {
130+
final var requestCounter = new AtomicInteger();
131+
final var isExecuting = new AtomicBoolean();
132+
when(allocationStatsService.stats()).thenAnswer(invocation -> {
133+
try {
134+
assertTrue(isExecuting.compareAndSet(false, true));
135+
assertThat(Thread.currentThread().getName(), containsString("[management]"));
136+
return Map.of(Integer.toString(requestCounter.incrementAndGet()), NodeAllocationStatsTests.randomNodeAllocationStats());
137+
} finally {
138+
Thread.yield();
139+
assertTrue(isExecuting.compareAndSet(true, false));
140+
}
141+
});
142+
143+
final var threads = new Thread[between(1, 5)];
144+
final var startBarrier = new CyclicBarrier(threads.length);
145+
for (int i = 0; i < threads.length; i++) {
146+
threads[i] = new Thread(() -> {
147+
safeAwait(startBarrier);
148+
149+
final var minRequestIndex = requestCounter.get();
150+
151+
final TransportGetAllocationStatsAction.Response response = safeAwait(
152+
l -> action.masterOperation(
153+
mock(Task.class),
154+
new TransportGetAllocationStatsAction.Request(
155+
TEST_REQUEST_TIMEOUT,
156+
TaskId.EMPTY_TASK_ID,
157+
EnumSet.of(Metric.ALLOCATIONS)
158+
),
159+
ClusterState.EMPTY_STATE,
160+
l
161+
)
162+
);
163+
164+
final var requestIndex = Integer.valueOf(response.getNodeAllocationStats().keySet().iterator().next());
165+
assertThat(requestIndex, greaterThanOrEqualTo(minRequestIndex)); // did not get a stale result
166+
}, "thread-" + i);
167+
threads[i].start();
168+
}
169+
170+
for (final var thread : threads) {
171+
thread.join();
172+
}
173+
}
123174
}

0 commit comments

Comments
 (0)