Skip to content

Commit 90670f3

Browse files
committed
commit
1 parent f12949e commit 90670f3

File tree

4 files changed

+52
-30
lines changed

4 files changed

+52
-30
lines changed

server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,9 @@ public Settings indexSettings() {
120120
.build();
121121
}
122122

123-
public void testSimpleRelocationIndexingPaused() {
123+
public void testSimpleRelocationNoIndexing() {
124124
logger.info("--> starting [node1] ...");
125-
final String node_1 = internalCluster().startNode(
126-
Settings.builder()
127-
.put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true));
125+
final String node_1 = internalCluster().startNode();
128126

129127
logger.info("--> creating test index ...");
130128
prepareCreate("test", indexSettings(1, 0)).get();
@@ -152,16 +150,6 @@ public void testSimpleRelocationIndexingPaused() {
152150
.get();
153151
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
154152

155-
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1);
156-
IndexService indexService = indicesService.indexService(resolveIndex("test"));
157-
IndexShard shard = indexService.getShard(0);
158-
shard.activateThrottling();
159-
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
160-
logger.info("--> index 1 more doc");
161-
IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20);
162-
var future = indexRequestBuilder.execute();
163-
//future.actionGet();
164-
165153
logger.info("--> relocate the shard from node1 to node2");
166154
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2));
167155

@@ -170,17 +158,23 @@ public void testSimpleRelocationIndexingPaused() {
170158
.setWaitForNoRelocatingShards(true)
171159
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
172160
.get();
173-
//assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
174-
assertThat(clusterHealthResponse.isTimedOut(), equalTo(true));
161+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
175162

176163
logger.info("--> verifying count again...");
177164
indicesAdmin().prepareRefresh().get();
178165
assertHitCount(prepareSearch("test").setSize(0), 20);
179166
}
180167

181-
public void testSimpleRelocationNoIndexing() {
168+
// This tests that relocation can successfully suspend index throttling to grab
169+
// indexing permits required for relocation to succeed.
170+
public void testSimpleRelocationWithIndexingPaused() throws Exception {
182171
logger.info("--> starting [node1] ...");
183-
final String node_1 = internalCluster().startNode();
172+
// Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate
173+
// index throttling for a shard on this node, it will pause indexing for that shard until throttling
174+
// is deactivated.
175+
final String node_1 = internalCluster().startNode(
176+
Settings.builder()
177+
.put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true));
184178

185179
logger.info("--> creating test index ...");
186180
prepareCreate("test", indexSettings(1, 0)).get();
@@ -208,19 +202,38 @@ public void testSimpleRelocationNoIndexing() {
208202
.get();
209203
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
210204

205+
// Activate index throttling on "test" index primary shard
206+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1);
207+
IndexService indexService = indicesService.indexService(resolveIndex("test"));
208+
IndexShard shard = indexService.getShard(0);
209+
shard.activateThrottling();
210+
// Verify that indexing is paused for the throttled shard
211+
assertBusy(() -> { assertThat(shard.isIndexingPaused(), equalTo(true)); });
212+
// Try to index a document into the "test" index which is currently throttled
213+
logger.info("--> Try to index a doc while indexing is paused");
214+
IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20);
215+
var future = indexRequestBuilder.execute();
216+
// Verify that the new document has not been indexed indicating that the indexing thread is paused.
217+
logger.info("--> verifying count is unchanged...");
218+
indicesAdmin().prepareRefresh().get();
219+
assertHitCount(prepareSearch("test").setSize(0), 20);
220+
211221
logger.info("--> relocate the shard from node1 to node2");
212222
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2));
213223

224+
// Relocation will suspend throttling for the paused shard, allow the indexing thread to proceed, thereby releasing
225+
// the indexing permit it holds, in turn allowing relocation to acquire the permits and proceed.
214226
clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
215227
.setWaitForEvents(Priority.LANGUID)
216228
.setWaitForNoRelocatingShards(true)
217229
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
218230
.get();
219231
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
220232

221-
logger.info("--> verifying count again...");
233+
logger.info("--> verifying count after relocation ...");
222234
indicesAdmin().prepareRefresh().get();
223-
assertHitCount(prepareSearch("test").setSize(0), 20);
235+
assertHitCount(prepareSearch("test").setSize(0), 21);
236+
logger.info("--> Test finished ...");
224237
}
225238

226239
public void testRelocationWhileIndexingRandom() throws Exception {

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,10 @@ protected static final class IndexThrottle {
461461
private final Condition pauseCondition = pauseIndexingLock.newCondition();
462462
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
463463
private volatile AtomicBoolean suspendThrottling = new AtomicBoolean();
464-
private final boolean pauseWhenThrottled; // Should throttling pause indexing ?
464+
465+
// Should throttling pause indexing ? This is decided by the
466+
// IndexingMemoryController#PAUSE_INDEXING_ON_THROTTLE setting for this node.
467+
private final boolean pauseWhenThrottled;
465468
private volatile ReleasableLock lock = NOOP_LOCK;
466469

467470
public IndexThrottle(boolean pause) {
@@ -547,7 +550,7 @@ boolean isIndexingPaused() {
547550
/** Suspend throttling to allow another task such as relocation to acquire all indexing permits */
548551
public void suspendThrottle() {
549552
if (pauseWhenThrottled) {
550-
try (Releasable releasableLock = pauseLockReference.acquire()) {
553+
try (Releasable ignored = pauseLockReference.acquire()) {
551554
suspendThrottling.setRelease(true);
552555
pauseCondition.signalAll();
553556
}
@@ -557,7 +560,7 @@ public void suspendThrottle() {
557560
/** Reverse what was done in {@link #suspendThrottle()} */
558561
public void resumeThrottle() {
559562
if (pauseWhenThrottled) {
560-
try (Releasable releasableLock = pauseLockReference.acquire()) {
563+
try (Releasable ignored = pauseLockReference.acquire()) {
561564
suspendThrottling.setRelease(false);
562565
pauseCondition.signalAll();
563566
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2758,6 +2758,16 @@ public void deactivateThrottling() {
27582758
}
27592759
}
27602760

2761+
public boolean isIndexingPaused() {
2762+
Engine engine = getEngineOrNull();
2763+
final boolean indexingPaused;
2764+
if (engine == null) {
2765+
indexingPaused = false;
2766+
} else {
2767+
indexingPaused = engine.isIndexingPaused();
2768+
}
2769+
return (indexingPaused);
2770+
}
27612771
public boolean suspendThrottling() {
27622772
Engine engine = getEngineOrNull();
27632773
final boolean indexingPaused;

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,11 @@ public void blockOperations(
8888
@Nullable IndexShard indexShard
8989
) {
9090
delayOperations();
91-
// If indexing is paused on the shard, suspend it so that any currently paused task can
91+
// In case indexing is paused on the shard, suspend throttling so that any currently paused task can
9292
// go ahead and release the indexing permit it holds.
93-
//boolean throttlingPaused = indexShard.suspendThrottling();
93+
indexShard.suspendThrottling();
9494
waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor);
95-
/*
96-
if (throttlingPaused) {
97-
indexShard.resumeThrottling();
98-
}
99-
*/
95+
indexShard.resumeThrottling();
10096
}
10197

10298
private void waitUntilBlocked(ActionListener<Releasable> onAcquired, long timeout, TimeUnit timeUnit, Executor executor) {

0 commit comments

Comments
 (0)