Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
dfe639f
pause indexing and race condition diags
ankikuma May 23, 2025
2601960
commit
ankikuma May 30, 2025
ec91a19
commit
ankikuma May 30, 2025
3ddb78b
refresh branch
ankikuma May 30, 2025
f12949e
commit
ankikuma Jun 2, 2025
90670f3
commit
ankikuma Jun 2, 2025
45e3799
commit
ankikuma Jun 3, 2025
cd43ab3
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 3, 2025
bf91cab
commit
ankikuma Jun 3, 2025
e642fea
address review comments
ankikuma Jun 4, 2025
a249357
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 4, 2025
a11d2fd
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 4, 2025
82a37f5
test failure
ankikuma Jun 4, 2025
560a035
remove commented code
ankikuma Jun 4, 2025
1bb089e
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 4, 2025
048f944
minor changes
ankikuma Jun 4, 2025
3e044bf
modified testRelocationWhileIndexingRandom
ankikuma Jun 4, 2025
7abedf2
[CI] Auto commit changes from spotless
Jun 4, 2025
76f59b6
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 4, 2025
4afc8c5
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 5, 2025
c5cce6e
update index settings
ankikuma Jun 5, 2025
8485a8e
test
ankikuma Jun 5, 2025
6315189
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 5, 2025
5e81c31
address comments
ankikuma Jun 5, 2025
661fa12
test
ankikuma Jun 5, 2025
77058ad
[CI] Auto commit changes from spotless
Jun 5, 2025
1d872c1
test
ankikuma Jun 5, 2025
129622a
Merge branch '05192025/UnpauseIndexingForPermits' of github.com:ankik…
ankikuma Jun 5, 2025
610bc0f
old changes
ankikuma Jun 5, 2025
e50c200
pull changes
ankikuma Jun 5, 2025
343d9f2
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 5, 2025
94f212a
Merge branch '05192025/UnpauseIndexingForPermits' of github.com:ankik…
ankikuma Jun 5, 2025
6a2bf2b
fix test
ankikuma Jun 6, 2025
ee22887
[CI] Auto commit changes from spotless
Jun 6, 2025
8e9d456
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 6, 2025
371dfbe
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jul 9, 2025
c9438b4
fix test + throttle only for primary
ankikuma Jul 11, 2025
31fe364
[CI] Auto commit changes from spotless
Jul 11, 2025
9729ebf
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jul 11, 2025
1134a5e
Merge branch '05192025/UnpauseIndexingForPermits' of github.com:ankik…
ankikuma Jul 11, 2025
8d6fc17
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jul 30, 2025
a1bf5a3
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jul 30, 2025
3571c46
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jul 30, 2025
2ee8dfb
relax assert that throttled shard is primary
ankikuma Jul 30, 2025
f492df1
add comment
ankikuma Jul 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -162,10 +164,82 @@ public void testSimpleRelocationNoIndexing() {
assertHitCount(prepareSearch("test").setSize(0), 20);
}

// This tests that relocation can successfully suspend index throttling to grab
// indexing permits required for relocation to succeed.
public void testSimpleRelocationWithIndexingPaused() throws Exception {
logger.info("--> starting [node1] ...");
// Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate
// index throttling for a shard on this node, it will pause indexing for that shard until throttling
// is deactivated.
final String node_1 = internalCluster().startNode(
Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)
);

logger.info("--> creating test index ...");
prepareCreate("test", indexSettings(1, 0)).get();

logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us randomize the doc count to signal that 10 is not a special number:

Suggested change
for (int i = 0; i < 10; i++) {
int numDocs = between(1,10);
for (int i = 0; i < numDocs; i++) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get();
}
logger.info("--> flush so we have an actual index");
indicesAdmin().prepareFlush().get();
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not folllow why this is important to the test?

Copy link
Contributor Author

@ankikuma ankikuma Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not. I wrote this test by modifying testRelocationWhileIndexingRandom() so it's just a carry over from there. Removed it.


logger.info("--> verifying count");
indicesAdmin().prepareRefresh().get();
assertHitCount(prepareSearch("test").setSize(0), 20L);

logger.info("--> start another node");
final String node_2 = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

// Activate index throttling on "test" index primary shard
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1);
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
shard.activateThrottling();
// Verify that indexing is paused for the throttled shard
assertBusy(() -> { assertThat(shard.isIndexingPaused(), equalTo(true)); });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need assertBusy here?

Also, I prefer to use shard.getEngine().isIndexingPaused() to avoid exposing the boolean. In fact, I may prefer not to have this check and instead observe a blocked thread if we can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we shouldn't need the assertBusy() because activateThrottling happens inline, not asynchronously. I am not sure why I thought I needed it.

I also changed the check for indexingPaused to a wait for the future to complete, which times out.

// Try to index a document into the "test" index which is currently throttled
logger.info("--> Try to index a doc while indexing is paused");
IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20);
var future = indexRequestBuilder.execute();
// Verify that the new document has not been indexed indicating that the indexing thread is paused.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wait for the thread to be blocked on the condition here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a change to wait for the future to complete, which times out. Were you thinking of something different ?

logger.info("--> verifying count is unchanged...");
indicesAdmin().prepareRefresh().get();
assertHitCount(prepareSearch("test").setSize(0), 20);

logger.info("--> relocate the shard from node1 to node2");
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to set an allocation rule through index settings. Someting like index.routing.allocation.include._id = node_2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow this comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe I do. Like this ?
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", node_2), "test");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to change this everywhere in this file ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how to make that work. I tried this:
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", nodes[toNode]), "test");
ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test");

But it looks like this is not enough to ensure that the shard has moved to the target node.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to use ._name if you use node_2, like done here (though that one excludes, you can do that too - or use include, both should work).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!


// Relocation will suspend throttling for the paused shard, allow the indexing thread to proceed, thereby releasing
// the indexing permit it holds, in turn allowing relocation to acquire the permits and proceed.
clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

// Relocated shard is not throttled
assertThat(shard.isIndexingPaused(), equalTo(false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems surprising, why is it not throttled?

Copy link
Contributor Author

@ankikuma ankikuma Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought it might be because the node that we relocate the shard to does not have PAUSE_THROTTLING enabled. But that doesn't help either. So I am guessing it has to do with how we do the relocation, wouldn't we have to recreate the engine on the new node and it probably will not transfer throttling ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But wait, this is the original source shard we are talking about, not the relocated target shard, so it should have throttling enabled after we resume throttling. I will need to look into this a bit more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I figured it out, it's because the engine is null for the source shard. I will just get rid of this check, I don't think it is useful.

logger.info("--> verifying count after relocation ...");
indicesAdmin().prepareRefresh().get();
assertHitCount(prepareSearch("test").setSize(0), 21);
}

public void testRelocationWhileIndexingRandom() throws Exception {
int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4);
int numberOfReplicas = randomBoolean() ? 0 : 1;
int numberOfNodes = numberOfReplicas == 0 ? 2 : 3;
boolean throttleIndexing = randomBoolean();

logger.info(
"testRelocationWhileIndexingRandom(numRelocations={}, numberOfReplicas={}, numberOfNodes={})",
Expand All @@ -174,9 +248,12 @@ public void testRelocationWhileIndexingRandom() throws Exception {
numberOfNodes
);

// Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate
// index throttling for a shard on this node, it will pause indexing for that shard until throttling
// is deactivated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate
// index throttling for a shard on this node, it will pause indexing for that shard until throttling
// is deactivated.
// Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

String[] nodes = new String[numberOfNodes];
logger.info("--> starting [node1] ...");
nodes[0] = internalCluster().startNode();
nodes[0] = internalCluster().startNode(Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true));

logger.info("--> creating test index ...");
prepareCreate("test", indexSettings(1, numberOfReplicas)).get();
Expand All @@ -200,6 +277,14 @@ public void testRelocationWhileIndexingRandom() throws Exception {
waitForDocs(numDocs, indexer);
logger.info("--> {} docs indexed", numDocs);

if (throttleIndexing) {
// Activate index throttling on "test" index primary shard
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[0]);
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
shard.activateThrottling();
// Verify that indexing is paused for the throttled shard
assertBusy(() -> { assertThat(shard.isIndexingPaused(), equalTo(true)); });
}
logger.info("--> starting relocations...");
int nodeShiftBased = numberOfReplicas; // if we have replicas shift those
for (int i = 0; i < numberOfRelocations; i++) {
Expand Down
28 changes: 24 additions & 4 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,10 @@ protected static final class IndexThrottle {
private final Condition pauseCondition = pauseIndexingLock.newCondition();
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
private volatile AtomicBoolean suspendThrottling = new AtomicBoolean();
private final boolean pauseWhenThrottled; // Should throttling pause indexing ?

// Should throttling pause indexing ? This is decided by the
// IndexingMemoryController#PAUSE_INDEXING_ON_THROTTLE setting for this node.
private final boolean pauseWhenThrottled;
private volatile ReleasableLock lock = NOOP_LOCK;

public IndexThrottle(boolean pause) {
Expand Down Expand Up @@ -491,7 +494,6 @@ public Releasable acquireThrottle() {
/** Activate throttling, which switches the lock to be a real lock */
public void activate() {
assert lock == NOOP_LOCK : "throttling activated while already active";

startOfThrottleNS = System.nanoTime();
if (pauseWhenThrottled) {
lock = pauseLockReference;
Expand Down Expand Up @@ -539,10 +541,14 @@ boolean isThrottled() {
return lock != NOOP_LOCK;
}

boolean isIndexingPaused() {
return (lock == pauseLockReference);
}

/** Suspend throttling to allow another task such as relocation to acquire all indexing permits */
public void suspendThrottle() {
if (pauseWhenThrottled) {
try (Releasable releasableLock = pauseLockReference.acquire()) {
try (Releasable ignored = pauseLockReference.acquire()) {
suspendThrottling.setRelease(true);
pauseCondition.signalAll();
}
Expand All @@ -552,7 +558,7 @@ public void suspendThrottle() {
/** Reverse what was done in {@link #suspendThrottle()} */
public void resumeThrottle() {
if (pauseWhenThrottled) {
try (Releasable releasableLock = pauseLockReference.acquire()) {
try (Releasable ignored = pauseLockReference.acquire()) {
suspendThrottling.setRelease(false);
pauseCondition.signalAll();
}
Expand Down Expand Up @@ -2258,6 +2264,20 @@ public interface Warmer {
*/
public abstract void deactivateThrottling();

/**
* If indexing is throttled to the point where it is paused completely,
* another task trying to get indexing permits might want to pause throttling
* by letting one thread pass at a time so that it does not get starved.
*/
public abstract void suspendThrottling();

/**
* Reverses a previous {@link #suspendThrottling} call.
*/
public abstract void resumeThrottling();

public abstract boolean isIndexingPaused();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe my comment above could remove the last usage of this - in that case we should remove the method too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* This method replays translog to restore the Lucene index which might be reverted previously.
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2852,11 +2852,26 @@ public void deactivateThrottling() {
}
}

@Override
public void suspendThrottling() {
throttle.suspendThrottle();
}

@Override
public void resumeThrottling() {
throttle.resumeThrottle();
}

@Override
public boolean isThrottled() {
return throttle.isThrottled();
}

@Override
public boolean isIndexingPaused() {
return throttle.isIndexingPaused();
}

boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
return throttle.throttleLockIsHeldByCurrentThread();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,17 @@ public void activateThrottling() {}
@Override
public void deactivateThrottling() {}

@Override
public void suspendThrottling() {}

@Override
public void resumeThrottling() {}

@Override
public boolean isIndexingPaused() {
return (false);
}

@Override
public void trimUnreferencedTranslogFiles() {}

Expand Down
46 changes: 43 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -879,8 +879,14 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
}
}, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by
// CancellableThreads and we want to be able to interrupt it
},
30L,
TimeUnit.MINUTES,
// Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
EsExecutors.DIRECT_EXECUTOR_SERVICE,
this
);

}
}

Expand Down Expand Up @@ -2752,6 +2758,40 @@ public void deactivateThrottling() {
}
}

public boolean isIndexingPaused() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem necessary to expose outside IndexShard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was calling it from RelocationIT. I can remove it. Just for my understanding, why is it risky to expose this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in the latest upload

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It exposes internal state from the engine. As such it is not "risky", but exposing more than necessary breaks encapsulation. In particular this one is only there for testing and can be fetched just as easily without this. The IndexShard interface is huge and I'd like to keep the surface it has down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining Henning

Engine engine = getEngineOrNull();
final boolean indexingPaused;
if (engine == null) {
indexingPaused = false;
} else {
indexingPaused = engine.isIndexingPaused();
}
return (indexingPaused);
}

public boolean suspendThrottling() {
Engine engine = getEngineOrNull();
final boolean indexingPaused;
if (engine == null) {
indexingPaused = false;
} else {
indexingPaused = engine.isIndexingPaused();
}
if (indexingPaused) {
engine.suspendThrottling();
return (true);
}
return (false);
}

public void resumeThrottling() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void resumeThrottling() {
private void resumeThrottling() {

try {
getEngine().resumeThrottling();
} catch (AlreadyClosedException ex) {
// ignore
}
}

private void handleRefreshException(Exception e) {
if (e instanceof AlreadyClosedException) {
// ignore
Expand Down Expand Up @@ -3811,7 +3851,7 @@ private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, l
onPermitAcquired.onFailure(e);
});
try {
indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic());
indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic(), this);
} catch (Exception e) {
forceRefreshes.close();
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,16 @@ public void blockOperations(
final ActionListener<Releasable> onAcquired,
final long timeout,
final TimeUnit timeUnit,
final Executor executor
final Executor executor,
@Nullable IndexShard indexShard
) {
delayOperations();
// In case indexing is paused on the shard, suspend throttling so that any currently paused task can
// go ahead and release the indexing permit it holds.
indexShard.suspendThrottling();
waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor);
// TODO: Does this do anything ? Looks like the relocated shard does not have throttling enabled
indexShard.resumeThrottling();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would prefer to handle this outside this class, we can make a method in IndexShard that wraps blockOperations and does this, avoiding sending an object to this method and the effect on testing etc.

Also, notice that this is sort of incorrect as is in that we sometimes call this with the executor set to the generic thread pool. We should instead resume throttling when the listener is called, that will handle all cases.

}

private void waitUntilBlocked(ActionListener<Releasable> onAcquired, long timeout, TimeUnit timeUnit, Executor executor) {
Expand Down
Loading