-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add cancellation support in TransportGetAllocationStatsAction
#127371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
620f23c
8d6f7cc
b423b7b
91df717
ccae434
e42fe33
001a2b7
7511df2
2e7fa50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 127371 | ||
| summary: Add cancellation support in `TransportGetAllocationStatsAction` | ||
| area: Allocation | ||
| type: feature | ||
| issues: | ||
| - 123248 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -400,6 +400,7 @@ private void updateDesireBalanceMetrics( | |
| routingAllocation.metadata(), | ||
| routingAllocation.routingNodes(), | ||
| routingAllocation.clusterInfo(), | ||
| () -> {}, | ||
|
||
| desiredBalance | ||
| ); | ||
| Map<DiscoveryNode, NodeAllocationStatsAndWeightsCalculator.NodeAllocationStatsAndWeight> filteredNodeAllocationStatsAndWeights = | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -100,6 +100,13 @@ protected boolean isFresh(Key currentKey, Key newKey) { | |
| return currentKey.equals(newKey); | ||
| } | ||
|
|
||
| /** | ||
| * Sets the currently cached item reference to {@code null}, which will result in a {@code refresh()} on the next {@code get()} call. | ||
| */ | ||
| protected void clearCurrentCachedItem() { | ||
|
||
| this.currentCachedItemRef.set(null); | ||
| } | ||
|
|
||
| /** | ||
| * Start a retrieval for the value associated with the given {@code input}, and pass it to the given {@code listener}. | ||
| * <p> | ||
|
|
@@ -110,7 +117,8 @@ protected boolean isFresh(Key currentKey, Key newKey) { | |
| * | ||
| * @param input The input to compute the desired value, converted to a {@link Key} to determine if the value that's currently | ||
| * cached or pending is fresh enough. | ||
| * @param isCancelled Returns {@code true} if the listener no longer requires the value being computed. | ||
| * @param isCancelled Returns {@code true} if the listener no longer requires the value being computed. The listener is expected to be | ||
| * completed as soon as possible when cancellation is detected. | ||
| * @param listener The listener to notify when the desired value becomes available. | ||
| */ | ||
| public final void get(Input input, BooleanSupplier isCancelled, ActionListener<Value> listener) { | ||
|
|
@@ -230,11 +238,15 @@ boolean addListener(ActionListener<Value> listener, BooleanSupplier isCancelled) | |
| ActionListener.completeWith(listener, future::actionResult); | ||
| } else { | ||
| // Refresh is still pending; it's not cancelled because there are still references. | ||
| future.addListener(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext)); | ||
| final var cancellableListener = ActionListener.notifyOnce( | ||
| ContextPreservingActionListener.wrapPreservingContext(listener, threadContext) | ||
| ); | ||
| future.addListener(cancellableListener); | ||
| final AtomicBoolean released = new AtomicBoolean(); | ||
| cancellationChecks.add(() -> { | ||
| if (released.get() == false && isCancelled.getAsBoolean() && released.compareAndSet(false, true)) { | ||
| decRef(); | ||
| cancellableListener.onFailure(new TaskCancelledException("task cancelled")); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could this be an
Executor? No need for it to beAutoCloseable(it's a disaster if you try and close one of the threadpool's executors)