Skip to content

Commit 4e7b99c

Browse files
Add cancellation support in TransportGetAllocationStatsAction (elastic#127371)
Replaces the use of a SingleResultDeduplicator by refactoring the cache as a subclass of CancellableSingleObjectCache. Refactored the AllocationStatsService and NodeAllocationStatsAndWeightsCalculator to accept the Runnable used to test for cancellation. Closes elastic#123248
1 parent 4c0c9b6 commit 4e7b99c

File tree

10 files changed

+241
-77
lines changed

10 files changed

+241
-77
lines changed

docs/changelog/127371.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 127371
2+
summary: Add cancellation support in `TransportGetAllocationStatsAction`
3+
area: Allocation
4+
type: feature
5+
issues:
6+
- 123248

server/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportGetAllocationStatsAction.java

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.action.ActionResponse;
1616
import org.elasticsearch.action.ActionRunnable;
1717
import org.elasticsearch.action.ActionType;
18-
import org.elasticsearch.action.SingleResultDeduplicator;
1918
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric;
2019
import org.elasticsearch.action.support.ActionFilters;
2120
import org.elasticsearch.action.support.SubscribableListener;
@@ -31,10 +30,12 @@
3130
import org.elasticsearch.common.io.stream.StreamInput;
3231
import org.elasticsearch.common.io.stream.StreamOutput;
3332
import org.elasticsearch.common.settings.Setting;
33+
import org.elasticsearch.common.util.CancellableSingleObjectCache;
3434
import org.elasticsearch.common.util.concurrent.EsExecutors;
3535
import org.elasticsearch.core.Nullable;
3636
import org.elasticsearch.core.TimeValue;
3737
import org.elasticsearch.injection.guice.Inject;
38+
import org.elasticsearch.tasks.CancellableTask;
3839
import org.elasticsearch.tasks.Task;
3940
import org.elasticsearch.tasks.TaskId;
4041
import org.elasticsearch.threadpool.ThreadPool;
@@ -43,7 +44,8 @@
4344
import java.io.IOException;
4445
import java.util.EnumSet;
4546
import java.util.Map;
46-
import java.util.concurrent.atomic.AtomicReference;
47+
import java.util.concurrent.Executor;
48+
import java.util.function.BooleanSupplier;
4749

4850
public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAction<
4951
TransportGetAllocationStatsAction.Request,
@@ -62,7 +64,6 @@ public class TransportGetAllocationStatsAction extends TransportMasterNodeReadAc
6264
);
6365

6466
private final AllocationStatsCache allocationStatsCache;
65-
private final SingleResultDeduplicator<Map<String, NodeAllocationStats>> allocationStatsSupplier;
6667
private final DiskThresholdSettings diskThresholdSettings;
6768

6869
@Inject
@@ -85,21 +86,7 @@ public TransportGetAllocationStatsAction(
8586
// very cheaply.
8687
EsExecutors.DIRECT_EXECUTOR_SERVICE
8788
);
88-
final var managementExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
89-
this.allocationStatsCache = new AllocationStatsCache(threadPool, DEFAULT_CACHE_TTL);
90-
this.allocationStatsSupplier = new SingleResultDeduplicator<>(threadPool.getThreadContext(), l -> {
91-
final var cachedStats = allocationStatsCache.get();
92-
if (cachedStats != null) {
93-
l.onResponse(cachedStats);
94-
return;
95-
}
96-
97-
managementExecutor.execute(ActionRunnable.supply(l, () -> {
98-
final var stats = allocationStatsService.stats();
99-
allocationStatsCache.put(stats);
100-
return stats;
101-
}));
102-
});
89+
this.allocationStatsCache = new AllocationStatsCache(threadPool, allocationStatsService, DEFAULT_CACHE_TTL);
10390
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
10491
clusterService.getClusterSettings().initializeAndWatch(CACHE_TTL_SETTING, this.allocationStatsCache::setTTL);
10592
}
@@ -118,8 +105,11 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
118105
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
119106
// NB we are still on a transport thread here - if adding more functionality here make sure to fork to a different pool
120107

108+
assert task instanceof CancellableTask;
109+
final var cancellableTask = (CancellableTask) task;
110+
121111
final SubscribableListener<Map<String, NodeAllocationStats>> allocationStatsStep = request.metrics().contains(Metric.ALLOCATIONS)
122-
? SubscribableListener.newForked(allocationStatsSupplier::execute)
112+
? SubscribableListener.newForked(l -> allocationStatsCache.get(cancellableTask::isCancelled, l))
123113
: SubscribableListener.newSucceeded(Map.of());
124114

125115
allocationStatsStep.andThenApply(
@@ -167,6 +157,11 @@ public EnumSet<Metric> metrics() {
167157
public ActionRequestValidationException validate() {
168158
return null;
169159
}
160+
161+
@Override
162+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
163+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
164+
}
170165
}
171166

172167
public static class Response extends ActionResponse {
@@ -209,39 +204,60 @@ public DiskThresholdSettings getDiskThresholdSettings() {
209204
}
210205
}
211206

212-
private record CachedAllocationStats(Map<String, NodeAllocationStats> stats, long timestampMillis) {}
213-
214-
private static class AllocationStatsCache {
207+
private static class AllocationStatsCache extends CancellableSingleObjectCache<Long, Long, Map<String, NodeAllocationStats>> {
215208
private volatile long ttlMillis;
216209
private final ThreadPool threadPool;
217-
private final AtomicReference<CachedAllocationStats> cachedStats;
210+
private final Executor executor;
211+
private final AllocationStatsService allocationStatsService;
218212

219-
AllocationStatsCache(ThreadPool threadPool, TimeValue ttl) {
213+
AllocationStatsCache(ThreadPool threadPool, AllocationStatsService allocationStatsService, TimeValue ttl) {
214+
super(threadPool.getThreadContext());
220215
this.threadPool = threadPool;
221-
this.cachedStats = new AtomicReference<>();
216+
this.executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
217+
this.allocationStatsService = allocationStatsService;
222218
setTTL(ttl);
223219
}
224220

225221
void setTTL(TimeValue ttl) {
226222
ttlMillis = ttl.millis();
227-
if (ttlMillis == 0L) {
228-
cachedStats.set(null);
229-
}
223+
clearCacheIfDisabled();
230224
}
231225

232-
Map<String, NodeAllocationStats> get() {
233-
if (ttlMillis == 0L) {
234-
return null;
226+
void get(BooleanSupplier isCancelled, ActionListener<Map<String, NodeAllocationStats>> listener) {
227+
get(threadPool.relativeTimeInMillis(), isCancelled, listener);
228+
}
229+
230+
@Override
231+
protected void refresh(
232+
Long aLong,
233+
Runnable ensureNotCancelled,
234+
BooleanSupplier supersedeIfStale,
235+
ActionListener<Map<String, NodeAllocationStats>> listener
236+
) {
237+
if (supersedeIfStale.getAsBoolean() == false) {
238+
executor.execute(
239+
ActionRunnable.supply(
240+
// If caching is disabled the item is only cached long enough to prevent duplicate concurrent requests.
241+
ActionListener.runBefore(listener, this::clearCacheIfDisabled),
242+
() -> allocationStatsService.stats(ensureNotCancelled)
243+
)
244+
);
235245
}
246+
}
236247

237-
// We don't set the atomic ref to null here upon expiration since we know it is about to be replaced with a fresh instance.
238-
final var stats = cachedStats.get();
239-
return stats == null || threadPool.relativeTimeInMillis() - stats.timestampMillis > ttlMillis ? null : stats.stats;
248+
@Override
249+
protected Long getKey(Long timestampMillis) {
250+
return timestampMillis;
251+
}
252+
253+
@Override
254+
protected boolean isFresh(Long currentKey, Long newKey) {
255+
return ttlMillis == 0 || newKey - currentKey <= ttlMillis;
240256
}
241257

242-
void put(Map<String, NodeAllocationStats> stats) {
243-
if (ttlMillis > 0L) {
244-
cachedStats.set(new CachedAllocationStats(stats, threadPool.relativeTimeInMillis()));
258+
private void clearCacheIfDisabled() {
259+
if (ttlMillis == 0) {
260+
clearCurrentCachedItem();
245261
}
246262
}
247263
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,22 @@ public AllocationStatsService(
4747
* Returns a map of node IDs to node allocation stats.
4848
*/
4949
public Map<String, NodeAllocationStats> stats() {
50+
return stats(() -> {});
51+
}
52+
53+
/**
54+
* Returns a map of node IDs to node allocation stats, promising to execute the provided {@link Runnable} during the computation to
55+
* test for cancellation.
56+
*/
57+
public Map<String, NodeAllocationStats> stats(Runnable ensureNotCancelled) {
5058
assert Transports.assertNotTransportThread("too expensive for a transport worker");
5159

5260
var clusterState = clusterService.state();
5361
var nodesStatsAndWeights = nodeAllocationStatsAndWeightsCalculator.nodesAllocationStatsAndWeights(
5462
clusterState.metadata(),
5563
clusterState.getRoutingNodes(),
5664
clusterInfoService.getClusterInfo(),
65+
ensureNotCancelled,
5766
desiredBalanceSupplier.get()
5867
);
5968
return nodesStatsAndWeights.entrySet()

server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
5858
Metadata metadata,
5959
RoutingNodes routingNodes,
6060
ClusterInfo clusterInfo,
61+
Runnable ensureNotCancelled,
6162
@Nullable DesiredBalance desiredBalance
6263
) {
6364
if (metadata.hasAnyIndices()) {
@@ -78,6 +79,7 @@ public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
7879
long forecastedDiskUsage = 0;
7980
long currentDiskUsage = 0;
8081
for (ShardRouting shardRouting : node) {
82+
ensureNotCancelled.run();
8183
if (shardRouting.relocating()) {
8284
// Skip the shard if it is moving off this node. The node running recovery will count it.
8385
continue;

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,13 @@ public void resetDesiredBalance() {
391391
resetCurrentDesiredBalance = true;
392392
}
393393

394+
/**
395+
* Used as the argument for the {@code ensureNotCancelled} {@code Runnable} when calling the
396+
* {@code nodeAllocationStatsAndWeightsCalculator} since there is no cancellation mechanism when called from
397+
* {@code updateDesireBalanceMetrics()}.
398+
*/
399+
private static final Runnable NEVER_CANCELLED = () -> {};
400+
394401
private void updateDesireBalanceMetrics(
395402
DesiredBalance desiredBalance,
396403
RoutingAllocation routingAllocation,
@@ -400,6 +407,7 @@ private void updateDesireBalanceMetrics(
400407
routingAllocation.metadata(),
401408
routingAllocation.routingNodes(),
402409
routingAllocation.clusterInfo(),
410+
NEVER_CANCELLED,
403411
desiredBalance
404412
);
405413
Map<DiscoveryNode, NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight> filteredNodeAllocationStatsAndWeights =

server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ protected boolean isFresh(Key currentKey, Key newKey) {
100100
return currentKey.equals(newKey);
101101
}
102102

103+
/**
104+
* Sets the currently cached item reference to {@code null}, which will result in a {@code refresh()} on the next {@code get()} call.
105+
*/
106+
protected final void clearCurrentCachedItem() {
107+
this.currentCachedItemRef.set(null);
108+
}
109+
103110
/**
104111
* Start a retrieval for the value associated with the given {@code input}, and pass it to the given {@code listener}.
105112
* <p>
@@ -110,7 +117,8 @@ protected boolean isFresh(Key currentKey, Key newKey) {
110117
*
111118
* @param input The input to compute the desired value, converted to a {@link Key} to determine if the value that's currently
112119
* cached or pending is fresh enough.
113-
* @param isCancelled Returns {@code true} if the listener no longer requires the value being computed.
120+
* @param isCancelled Returns {@code true} if the listener no longer requires the value being computed. The listener is expected to be
121+
* completed as soon as possible when cancellation is detected.
114122
* @param listener The listener to notify when the desired value becomes available.
115123
*/
116124
public final void get(Input input, BooleanSupplier isCancelled, ActionListener<Value> listener) {
@@ -230,11 +238,15 @@ boolean addListener(ActionListener<Value> listener, BooleanSupplier isCancelled)
230238
ActionListener.completeWith(listener, future::actionResult);
231239
} else {
232240
// Refresh is still pending; it's not cancelled because there are still references.
233-
future.addListener(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));
241+
final var cancellableListener = ActionListener.notifyOnce(
242+
ContextPreservingActionListener.wrapPreservingContext(listener, threadContext)
243+
);
244+
future.addListener(cancellableListener);
234245
final AtomicBoolean released = new AtomicBoolean();
235246
cancellationChecks.add(() -> {
236247
if (released.get() == false && isCancelled.getAsBoolean() && released.compareAndSet(false, true)) {
237248
decRef();
249+
cancellableListener.onFailure(new TaskCancelledException("task cancelled"));
238250
}
239251
});
240252
}

0 commit comments

Comments
 (0)