Skip to content

Commit f12949e

Browse files
committed
commit
1 parent 3ddb78b commit f12949e

File tree

3 files changed

+65
-1
lines changed

3 files changed

+65
-1
lines changed

server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.elasticsearch.index.shard.IndexShard;
4444
import org.elasticsearch.index.shard.IndexShardState;
4545
import org.elasticsearch.index.shard.ShardId;
46+
import org.elasticsearch.indices.IndexingMemoryController;
47+
import org.elasticsearch.indices.IndicesService;
4648
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
4749
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
4850
import org.elasticsearch.plugins.Plugin;
@@ -74,6 +76,7 @@
7476
import java.util.concurrent.Semaphore;
7577
import java.util.concurrent.TimeUnit;
7678
import java.util.concurrent.atomic.AtomicBoolean;
79+
import java.util.concurrent.locks.LockSupport;
7780
import java.util.stream.Collectors;
7881
import java.util.stream.Stream;
7982

@@ -117,6 +120,64 @@ public Settings indexSettings() {
117120
.build();
118121
}
119122

123+
public void testSimpleRelocationIndexingPaused() {
124+
logger.info("--> starting [node1] ...");
125+
final String node_1 = internalCluster().startNode(
126+
Settings.builder()
127+
.put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true));
128+
129+
logger.info("--> creating test index ...");
130+
prepareCreate("test", indexSettings(1, 0)).get();
131+
132+
logger.info("--> index 10 docs");
133+
for (int i = 0; i < 10; i++) {
134+
prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get();
135+
}
136+
logger.info("--> flush so we have an actual index");
137+
indicesAdmin().prepareFlush().get();
138+
logger.info("--> index more docs so we have something in the translog");
139+
for (int i = 10; i < 20; i++) {
140+
prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get();
141+
}
142+
143+
logger.info("--> verifying count");
144+
indicesAdmin().prepareRefresh().get();
145+
assertHitCount(prepareSearch("test").setSize(0), 20L);
146+
147+
logger.info("--> start another node");
148+
final String node_2 = internalCluster().startNode();
149+
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
150+
.setWaitForEvents(Priority.LANGUID)
151+
.setWaitForNodes("2")
152+
.get();
153+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
154+
155+
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1);
156+
IndexService indexService = indicesService.indexService(resolveIndex("test"));
157+
IndexShard shard = indexService.getShard(0);
158+
shard.activateThrottling();
159+
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
160+
logger.info("--> index 1 more doc");
161+
IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20);
162+
var future = indexRequestBuilder.execute();
163+
//future.actionGet();
164+
165+
logger.info("--> relocate the shard from node1 to node2");
166+
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2));
167+
168+
clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
169+
.setWaitForEvents(Priority.LANGUID)
170+
.setWaitForNoRelocatingShards(true)
171+
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
172+
.get();
173+
//assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
174+
assertThat(clusterHealthResponse.isTimedOut(), equalTo(true));
175+
176+
logger.info("--> verifying count again...");
177+
indicesAdmin().prepareRefresh().get();
178+
assertHitCount(prepareSearch("test").setSize(0), 20);
179+
}
180+
120181
public void testSimpleRelocationNoIndexing() {
121182
logger.info("--> starting [node1] ...");
122183
final String node_1 = internalCluster().startNode();

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,7 @@ public void activate() {
495495
startOfThrottleNS = System.nanoTime();
496496
if (pauseWhenThrottled) {
497497
lock = pauseLockReference;
498+
System.out.println("Activated index throttling pause");
498499
logger.trace("Activated index throttling pause");
499500
} else {
500501
lock = lockReference;

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,13 @@ public void blockOperations(
9090
delayOperations();
9191
// If indexing is paused on the shard, suspend it so that any currently paused task can
9292
// go ahead and release the indexing permit it holds.
93-
boolean throttlingPaused = indexShard.suspendThrottling();
93+
//boolean throttlingPaused = indexShard.suspendThrottling();
9494
waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor);
95+
/*
9596
if (throttlingPaused) {
9697
indexShard.resumeThrottling();
9798
}
99+
*/
98100
}
99101

100102
private void waitUntilBlocked(ActionListener<Releasable> onAcquired, long timeout, TimeUnit timeUnit, Executor executor) {

0 commit comments

Comments
 (0)