Skip to content

Commit 074f070

Browse files
authored
Suspend Index throttling when relocating (#128797)
If index throttling is enabled such that it pauses all indexing threads that try to index into a shard, this can starve other tasks such as relocation that try to acquire all indexing permits. This PR addresses this by suspending throttling to allow the indexing threads that are holding the permits to pass. Addresses ES-11770.
1 parent 830076e commit 074f070

File tree

6 files changed

+218
-14
lines changed

6 files changed

+218
-14
lines changed

server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 115 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.apache.lucene.index.IndexFileNames;
1313
import org.apache.lucene.tests.util.English;
14+
import org.elasticsearch.ElasticsearchException;
1415
import org.elasticsearch.action.ActionFuture;
1516
import org.elasticsearch.action.DocWriteResponse;
1617
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -36,12 +37,15 @@
3637
import org.elasticsearch.env.NodeEnvironment;
3738
import org.elasticsearch.index.IndexService;
3839
import org.elasticsearch.index.IndexSettings;
40+
import org.elasticsearch.index.engine.Engine;
3941
import org.elasticsearch.index.seqno.ReplicationTracker;
4042
import org.elasticsearch.index.seqno.RetentionLease;
4143
import org.elasticsearch.index.shard.IndexEventListener;
4244
import org.elasticsearch.index.shard.IndexShard;
4345
import org.elasticsearch.index.shard.IndexShardState;
4446
import org.elasticsearch.index.shard.ShardId;
47+
import org.elasticsearch.indices.IndexingMemoryController;
48+
import org.elasticsearch.indices.IndicesService;
4549
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
4650
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
4751
import org.elasticsearch.plugins.Plugin;
@@ -161,10 +165,88 @@ public void testSimpleRelocationNoIndexing() {
161165
assertHitCount(prepareSearch("test").setSize(0), 20);
162166
}
163167

168+
// This tests that relocation can successfully suspend index throttling to grab
169+
// indexing permits required for relocation to succeed.
170+
public void testSimpleRelocationWithIndexingPaused() throws Exception {
171+
logger.info("--> starting [node1] ...");
172+
// Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate
173+
// index throttling for a shard on this node, it will pause indexing for that shard until throttling
174+
// is deactivated.
175+
final String node_1 = internalCluster().startNode(
176+
Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)
177+
);
178+
179+
logger.info("--> creating test index ...");
180+
prepareCreate("test", indexSettings(1, 0)).get();
181+
182+
logger.info("--> index docs");
183+
int numDocs = between(1, 10);
184+
for (int i = 0; i < numDocs; i++) {
185+
prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get();
186+
}
187+
logger.info("--> flush so we have an actual index");
188+
indicesAdmin().prepareFlush().get();
189+
190+
logger.info("--> verifying count");
191+
indicesAdmin().prepareRefresh().get();
192+
assertHitCount(prepareSearch("test").setSize(0), numDocs);
193+
194+
logger.info("--> start another node");
195+
final String node_2 = internalCluster().startNode();
196+
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
197+
.setWaitForEvents(Priority.LANGUID)
198+
.setWaitForNodes("2")
199+
.get();
200+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
201+
202+
// Activate index throttling on "test" index primary shard
203+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1);
204+
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
205+
shard.activateThrottling();
206+
// Verify that indexing is paused for the throttled shard
207+
Engine engine = shard.getEngineOrNull();
208+
assertThat(engine != null && engine.isThrottled(), equalTo(true));
209+
210+
// Try to index a document into the "test" index which is currently throttled
211+
logger.info("--> Try to index a doc while indexing is paused");
212+
IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20);
213+
var future = indexRequestBuilder.execute();
214+
expectThrows(ElasticsearchException.class, () -> future.actionGet(500, TimeUnit.MILLISECONDS));
215+
// Verify that the new document has not been indexed indicating that the indexing thread is paused.
216+
logger.info("--> verifying count is unchanged...");
217+
indicesAdmin().prepareRefresh().get();
218+
assertHitCount(prepareSearch("test").setSize(0), numDocs);
219+
220+
logger.info("--> relocate the shard from node1 to node2");
221+
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", node_2), "test");
222+
ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test");
223+
224+
// Relocation will suspend throttling for the paused shard, allow the indexing thread to proceed, thereby releasing
225+
// the indexing permit it holds, in turn allowing relocation to acquire the permits and proceed.
226+
clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
227+
.setWaitForEvents(Priority.LANGUID)
228+
.setWaitForNoRelocatingShards(true)
229+
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
230+
.get();
231+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
232+
233+
logger.info("--> verifying shard primary has relocated ...");
234+
indicesService = internalCluster().getInstance(IndicesService.class, node_2);
235+
shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
236+
assertThat(shard.routingEntry().primary(), equalTo(true));
237+
engine = shard.getEngineOrNull();
238+
assertThat(engine != null && engine.isThrottled(), equalTo(false));
239+
logger.info("--> verifying count after relocation ...");
240+
future.actionGet();
241+
indicesAdmin().prepareRefresh().get();
242+
assertHitCount(prepareSearch("test").setSize(0), numDocs + 1);
243+
}
244+
164245
public void testRelocationWhileIndexingRandom() throws Exception {
165246
int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4);
166247
int numberOfReplicas = randomBoolean() ? 0 : 1;
167248
int numberOfNodes = numberOfReplicas == 0 ? 2 : 3;
249+
boolean throttleIndexing = randomBoolean();
168250

169251
logger.info(
170252
"testRelocationWhileIndexingRandom(numRelocations={}, numberOfReplicas={}, numberOfNodes={})",
@@ -173,16 +255,22 @@ public void testRelocationWhileIndexingRandom() throws Exception {
173255
numberOfNodes
174256
);
175257

258+
// Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless
176259
String[] nodes = new String[numberOfNodes];
177260
logger.info("--> starting [node1] ...");
178-
nodes[0] = internalCluster().startNode();
261+
nodes[0] = internalCluster().startNode(
262+
Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean())
263+
);
179264

180265
logger.info("--> creating test index ...");
181266
prepareCreate("test", indexSettings(1, numberOfReplicas)).get();
182267

268+
// Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless
183269
for (int i = 2; i <= numberOfNodes; i++) {
184270
logger.info("--> starting [node{}] ...", i);
185-
nodes[i - 1] = internalCluster().startNode();
271+
nodes[i - 1] = internalCluster().startNode(
272+
Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean())
273+
);
186274
if (i != numberOfNodes) {
187275
ClusterHealthResponse healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
188276
.setWaitForEvents(Priority.LANGUID)
@@ -200,17 +288,37 @@ public void testRelocationWhileIndexingRandom() throws Exception {
200288
logger.info("--> {} docs indexed", numDocs);
201289

202290
logger.info("--> starting relocations...");
203-
int nodeShiftBased = numberOfReplicas; // if we have replicas shift those
291+
292+
// When we have a replica, the primary is on node 0 and replica is on node 1. We cannot move primary
293+
// to a node containing the replica, so relocation of primary needs to happen between node 0 and 2.
294+
// When there is no replica, we only have 2 nodes and primary relocates back and forth between node 0 and 1.
204295
for (int i = 0; i < numberOfRelocations; i++) {
205296
int fromNode = (i % 2);
206297
int toNode = fromNode == 0 ? 1 : 0;
207-
fromNode += nodeShiftBased;
208-
toNode += nodeShiftBased;
298+
if (numberOfReplicas == 1) {
299+
fromNode = fromNode == 1 ? 2 : 0;
300+
toNode = toNode == 1 ? 2 : 0;
301+
}
302+
209303
numDocs = scaledRandomIntBetween(200, 1000);
304+
305+
// Throttle indexing on primary shard
306+
if (throttleIndexing) {
307+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[fromNode]);
308+
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
309+
// Activate index throttling on "test" index primary shard
310+
logger.info("--> activate throttling for shard on node {}...", nodes[fromNode]);
311+
shard.activateThrottling();
312+
// Verify that indexing is throttled for this shard
313+
Engine engine = shard.getEngineOrNull();
314+
assertThat(engine != null && engine.isThrottled(), equalTo(true));
315+
}
210316
logger.debug("--> Allow indexer to index [{}] documents", numDocs);
211317
indexer.continueIndexing(numDocs);
212318
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
319+
213320
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, nodes[fromNode], nodes[toNode]));
321+
214322
if (rarely()) {
215323
logger.debug("--> flushing");
216324
indicesAdmin().prepareFlush().get();
@@ -219,11 +327,13 @@ public void testRelocationWhileIndexingRandom() throws Exception {
219327
.setWaitForEvents(Priority.LANGUID)
220328
.setWaitForNoRelocatingShards(true)
221329
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
330+
.setWaitForGreenStatus()
222331
.get();
223332
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
224333
indexer.pauseIndexing();
225334
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
226335
}
336+
227337
logger.info("--> done relocations");
228338
logger.info("--> waiting for indexing threads to stop ...");
229339
indexer.stopAndAwaitStopped();

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ public abstract class Engine implements Closeable {
147147
protected final ReentrantLock failEngineLock = new ReentrantLock();
148148
protected final SetOnce<Exception> failedEngine = new SetOnce<>();
149149
protected final boolean enableRecoverySource;
150+
// This should only be enabled in serverless. In stateful clusters, where we have
151+
// indexing replicas, if pause throttling gets enabled on replicas, it will indirectly
152+
// pause the primary as well which might prevent us from relocating the primary shard.
150153
protected final boolean pauseIndexingOnThrottle;
151154

152155
private final AtomicBoolean isClosing = new AtomicBoolean();
@@ -483,7 +486,10 @@ protected static final class IndexThrottle {
483486
private final Condition pauseCondition = pauseIndexingLock.newCondition();
484487
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
485488
private volatile AtomicBoolean suspendThrottling = new AtomicBoolean();
486-
private final boolean pauseWhenThrottled; // Should throttling pause indexing ?
489+
490+
// Should throttling pause indexing ? This is decided by the
491+
// IndexingMemoryController#PAUSE_INDEXING_ON_THROTTLE setting for this node.
492+
private final boolean pauseWhenThrottled;
487493
private volatile ReleasableLock lock = NOOP_LOCK;
488494

489495
public IndexThrottle(boolean pause) {
@@ -514,7 +520,6 @@ public Releasable acquireThrottle() {
514520
/** Activate throttling, which switches the lock to be a real lock */
515521
public void activate() {
516522
assert lock == NOOP_LOCK : "throttling activated while already active";
517-
518523
startOfThrottleNS = System.nanoTime();
519524
if (pauseWhenThrottled) {
520525
lock = pauseLockReference;
@@ -562,10 +567,14 @@ boolean isThrottled() {
562567
return lock != NOOP_LOCK;
563568
}
564569

570+
boolean isIndexingPaused() {
571+
return (lock == pauseLockReference);
572+
}
573+
565574
/** Suspend throttling to allow another task such as relocation to acquire all indexing permits */
566575
public void suspendThrottle() {
567576
if (pauseWhenThrottled) {
568-
try (Releasable releasableLock = pauseLockReference.acquire()) {
577+
try (Releasable ignored = pauseLockReference.acquire()) {
569578
suspendThrottling.setRelease(true);
570579
pauseCondition.signalAll();
571580
}
@@ -575,7 +584,7 @@ public void suspendThrottle() {
575584
/** Reverse what was done in {@link #suspendThrottle()} */
576585
public void resumeThrottle() {
577586
if (pauseWhenThrottled) {
578-
try (Releasable releasableLock = pauseLockReference.acquire()) {
587+
try (Releasable ignored = pauseLockReference.acquire()) {
579588
suspendThrottling.setRelease(false);
580589
pauseCondition.signalAll();
581590
}
@@ -2297,6 +2306,18 @@ public interface Warmer {
22972306
*/
22982307
public abstract void deactivateThrottling();
22992308

2309+
/**
2310+
* If indexing is throttled to the point where it is paused completely,
2311+
* another task trying to get indexing permits might want to pause throttling
2312+
* by letting one thread pass at a time so that it does not get starved.
2313+
*/
2314+
public abstract void suspendThrottling();
2315+
2316+
/**
2317+
* Reverses a previous {@link #suspendThrottling} call.
2318+
*/
2319+
public abstract void resumeThrottling();
2320+
23002321
/**
23012322
* This method replays translog to restore the Lucene index which might be reverted previously.
23022323
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2883,6 +2883,16 @@ public void deactivateThrottling() {
28832883
}
28842884
}
28852885

2886+
@Override
2887+
public void suspendThrottling() {
2888+
throttle.suspendThrottle();
2889+
}
2890+
2891+
@Override
2892+
public void resumeThrottling() {
2893+
throttle.resumeThrottle();
2894+
}
2895+
28862896
@Override
28872897
public boolean isThrottled() {
28882898
return throttle.isThrottled();

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,12 @@ public void activateThrottling() {}
500500
@Override
501501
public void deactivateThrottling() {}
502502

503+
@Override
504+
public void suspendThrottling() {}
505+
506+
@Override
507+
public void resumeThrottling() {}
508+
503509
@Override
504510
public void trimUnreferencedTranslogFiles() {}
505511

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -810,7 +810,7 @@ public void relocated(
810810
) throws IllegalIndexShardStateException, IllegalStateException {
811811
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
812812
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
813-
indexShardOperationPermits.blockOperations(new ActionListener<>() {
813+
blockOperations(new ActionListener<>() {
814814
@Override
815815
public void onResponse(Releasable releasable) {
816816
boolean success = false;
@@ -888,8 +888,13 @@ public void onFailure(Exception e) {
888888
listener.onFailure(e);
889889
}
890890
}
891-
}, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by
892-
// CancellableThreads and we want to be able to interrupt it
891+
},
892+
30L,
893+
TimeUnit.MINUTES,
894+
// Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
895+
EsExecutors.DIRECT_EXECUTOR_SERVICE
896+
);
897+
893898
}
894899
}
895900

@@ -2765,6 +2770,22 @@ public void deactivateThrottling() {
27652770
}
27662771
}
27672772

2773+
private void suspendThrottling() {
2774+
try {
2775+
getEngine().suspendThrottling();
2776+
} catch (AlreadyClosedException ex) {
2777+
// ignore
2778+
}
2779+
}
2780+
2781+
private void resumeThrottling() {
2782+
try {
2783+
getEngine().resumeThrottling();
2784+
} catch (AlreadyClosedException ex) {
2785+
// ignore
2786+
}
2787+
}
2788+
27682789
private void handleRefreshException(Exception e) {
27692790
if (e instanceof AlreadyClosedException) {
27702791
// ignore
@@ -3823,6 +3844,39 @@ private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final Acti
38233844
});
38243845
}
38253846

3847+
/**
3848+
* Immediately delays operations and uses the {@code executor} to wait for in-flight operations to finish and then acquires all
3849+
* permits. When all permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are
3850+
* started. Delayed operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in
3851+
* this case the {@code onFailure} handler will be invoked after delayed operations are released.
3852+
*
3853+
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed. This listener should not throw.
3854+
* @param timeout the maximum time to wait for the in-flight operations block
3855+
* @param timeUnit the time unit of the {@code timeout} argument
3856+
* @param executor executor on which to wait for in-flight operations to finish and acquire all permits
3857+
*/
3858+
public void blockOperations(
3859+
final ActionListener<Releasable> onAcquired,
3860+
final long timeout,
3861+
final TimeUnit timeUnit,
3862+
final Executor executor
3863+
) {
3864+
// In case indexing is paused on the shard, suspend throttling so that any currently paused task can
3865+
// go ahead and release the indexing permit it holds.
3866+
suspendThrottling();
3867+
try {
3868+
indexShardOperationPermits.blockOperations(
3869+
ActionListener.runAfter(onAcquired, this::resumeThrottling),
3870+
timeout,
3871+
timeUnit,
3872+
executor
3873+
);
3874+
} catch (IndexShardClosedException e) {
3875+
resumeThrottling();
3876+
throw e;
3877+
}
3878+
}
3879+
38263880
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
38273881
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
38283882
final ActionListener<Releasable> wrappedListener = ActionListener.wrap(r -> {
@@ -3833,7 +3887,7 @@ private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, l
38333887
onPermitAcquired.onFailure(e);
38343888
});
38353889
try {
3836-
indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic());
3890+
blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic());
38373891
} catch (Exception e) {
38383892
forceRefreshes.close();
38393893
throw e;

0 commit comments

Comments
 (0)