-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Suspend Index throttling when relocating #128797
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ankikuma
merged 45 commits into
elastic:main
from
ankikuma:05192025/UnpauseIndexingForPermits
Jul 30, 2025
Merged
Changes from 43 commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
dfe639f
pause indexing and race condition diags
ankikuma 2601960
commit
ankikuma ec91a19
commit
ankikuma 3ddb78b
refresh branch
ankikuma f12949e
commit
ankikuma 90670f3
commit
ankikuma 45e3799
commit
ankikuma cd43ab3
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma bf91cab
commit
ankikuma e642fea
address review comments
ankikuma a249357
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma a11d2fd
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma 82a37f5
test failure
ankikuma 560a035
remove commented code
ankikuma 1bb089e
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma 048f944
minor changes
ankikuma 3e044bf
modified testRelocationWhileIndexingRandom
ankikuma 7abedf2
[CI] Auto commit changes from spotless
76f59b6
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma 4afc8c5
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma c5cce6e
update index settings
ankikuma 8485a8e
test
ankikuma 6315189
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma 5e81c31
address comments
ankikuma 661fa12
test
ankikuma 77058ad
[CI] Auto commit changes from spotless
1d872c1
test
ankikuma 129622a
Merge branch '05192025/UnpauseIndexingForPermits' of github.com:ankik…
ankikuma 610bc0f
old changes
ankikuma e50c200
pull changes
ankikuma 343d9f2
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma 94f212a
Merge branch '05192025/UnpauseIndexingForPermits' of github.com:ankik…
ankikuma 6a2bf2b
fix test
ankikuma ee22887
[CI] Auto commit changes from spotless
8e9d456
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma 371dfbe
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma c9438b4
fix test + throttle only for primary
ankikuma 31fe364
[CI] Auto commit changes from spotless
9729ebf
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma 1134a5e
Merge branch '05192025/UnpauseIndexingForPermits' of github.com:ankik…
ankikuma 8d6fc17
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma a1bf5a3
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma 3571c46
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma 2ee8dfb
relax assert that throttled shard is primary
ankikuma f492df1
add comment
ankikuma File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -810,7 +810,7 @@ public void relocated( | |||||
| ) throws IllegalIndexShardStateException, IllegalStateException { | ||||||
| assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; | ||||||
| try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { | ||||||
| indexShardOperationPermits.blockOperations(new ActionListener<>() { | ||||||
| blockOperations(new ActionListener<>() { | ||||||
| @Override | ||||||
| public void onResponse(Releasable releasable) { | ||||||
| boolean success = false; | ||||||
|
|
@@ -888,8 +888,13 @@ public void onFailure(Exception e) { | |||||
| listener.onFailure(e); | ||||||
| } | ||||||
| } | ||||||
| }, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by | ||||||
| // CancellableThreads and we want to be able to interrupt it | ||||||
| }, | ||||||
| 30L, | ||||||
| TimeUnit.MINUTES, | ||||||
| // Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it | ||||||
| EsExecutors.DIRECT_EXECUTOR_SERVICE | ||||||
| ); | ||||||
|
|
||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -2750,6 +2755,7 @@ public IndexEventListener getIndexEventListener() { | |||||
| * setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread. | ||||||
| */ | ||||||
| public void activateThrottling() { | ||||||
| assert shardRouting.primary() : "only primaries can be throttled: " + shardRouting; | ||||||
| try { | ||||||
| getEngine().activateThrottling(); | ||||||
| } catch (AlreadyClosedException ex) { | ||||||
|
|
@@ -2765,6 +2771,22 @@ public void deactivateThrottling() { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| private void suspendThrottling() { | ||||||
| try { | ||||||
| getEngine().suspendThrottling(); | ||||||
| } catch (AlreadyClosedException ex) { | ||||||
| // ignore | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private void resumeThrottling() { | ||||||
| try { | ||||||
| getEngine().resumeThrottling(); | ||||||
| } catch (AlreadyClosedException ex) { | ||||||
| // ignore | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private void handleRefreshException(Exception e) { | ||||||
| if (e instanceof AlreadyClosedException) { | ||||||
| // ignore | ||||||
|
|
@@ -3823,6 +3845,39 @@ private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final Acti | |||||
| }); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Immediately delays operations and uses the {@code executor} to wait for in-flight operations to finish and then acquires all | ||||||
| * permits. When all permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are | ||||||
| * started. Delayed operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in | ||||||
| * this case the {@code onFailure} handler will be invoked after delayed operations are released. | ||||||
| * | ||||||
| * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed. This listener should not throw. | ||||||
| * @param timeout the maximum time to wait for the in-flight operations block | ||||||
| * @param timeUnit the time unit of the {@code timeout} argument | ||||||
| * @param executor executor on which to wait for in-flight operations to finish and acquire all permits | ||||||
| */ | ||||||
| public void blockOperations( | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be private?
Suggested change
|
||||||
| final ActionListener<Releasable> onAcquired, | ||||||
| final long timeout, | ||||||
| final TimeUnit timeUnit, | ||||||
| final Executor executor | ||||||
| ) { | ||||||
| // In case indexing is paused on the shard, suspend throttling so that any currently paused task can | ||||||
| // go ahead and release the indexing permit it holds. | ||||||
| suspendThrottling(); | ||||||
| try { | ||||||
| indexShardOperationPermits.blockOperations( | ||||||
| ActionListener.runAfter(onAcquired, this::resumeThrottling), | ||||||
| timeout, | ||||||
| timeUnit, | ||||||
| executor | ||||||
| ); | ||||||
| } catch (IndexShardClosedException e) { | ||||||
| resumeThrottling(); | ||||||
| throw e; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) { | ||||||
| final Releasable forceRefreshes = refreshListeners.forceRefreshes(); | ||||||
| final ActionListener<Releasable> wrappedListener = ActionListener.wrap(r -> { | ||||||
|
|
@@ -3833,7 +3888,7 @@ private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, l | |||||
| onPermitAcquired.onFailure(e); | ||||||
| }); | ||||||
| try { | ||||||
| indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic()); | ||||||
| blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic()); | ||||||
| } catch (Exception e) { | ||||||
| forceRefreshes.close(); | ||||||
| throw e; | ||||||
|
|
||||||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we wait for the thread to be blocked on the condition here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a change to wait for the future to complete, which times out. Were you thinking of something different ?