Skip to content

Commit 5e81c31

Browse files
committed
address comments
1 parent 6315189 commit 5e81c31

File tree

6 files changed

+29
-45
lines changed

6 files changed

+29
-45
lines changed

server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.env.NodeEnvironment;
3939
import org.elasticsearch.index.IndexService;
4040
import org.elasticsearch.index.IndexSettings;
41+
import org.elasticsearch.index.engine.Engine;
4142
import org.elasticsearch.index.seqno.ReplicationTracker;
4243
import org.elasticsearch.index.seqno.RetentionLease;
4344
import org.elasticsearch.index.shard.IndexEventListener;
@@ -179,16 +180,17 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception {
179180
logger.info("--> creating test index ...");
180181
prepareCreate("test", indexSettings(1, 0)).get();
181182

182-
logger.info("--> index 10 docs");
183-
for (int i = 0; i < 10; i++) {
183+
logger.info("--> index docs");
184+
int numDocs = between(1,10);
185+
for (int i = 0; i < numDocs; i++) {
184186
prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get();
185187
}
186188
logger.info("--> flush so we have an actual index");
187189
indicesAdmin().prepareFlush().get();
188190

189191
logger.info("--> verifying count");
190192
indicesAdmin().prepareRefresh().get();
191-
assertHitCount(prepareSearch("test").setSize(0), 10L);
193+
assertHitCount(prepareSearch("test").setSize(0), numDocs);
192194

193195
logger.info("--> start another node");
194196
final String node_2 = internalCluster().startNode();
@@ -203,20 +205,21 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception {
203205
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
204206
shard.activateThrottling();
205207
// Verify that indexing is paused for the throttled shard
206-
assertThat(shard.isIndexingPaused(), equalTo(true));
208+
Engine engine = shard.getEngineOrNull();
209+
assertThat(engine != null && engine.isThrottled(), equalTo(true));
207210
// Try to index a document into the "test" index which is currently throttled
208211
logger.info("--> Try to index a doc while indexing is paused");
209212
IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20);
210213
var future = indexRequestBuilder.execute();
211-
expectThrows(ElasticsearchException.class, () -> future.actionGet(10, TimeUnit.SECONDS));
214+
expectThrows(ElasticsearchException.class, () -> future.actionGet(500, TimeUnit.MILLISECONDS));
212215
// Verify that the new document has not been indexed indicating that the indexing thread is paused.
213216
logger.info("--> verifying count is unchanged...");
214217
indicesAdmin().prepareRefresh().get();
215-
assertHitCount(prepareSearch("test").setSize(0), 10);
218+
assertHitCount(prepareSearch("test").setSize(0), numDocs);
216219

217220
logger.info("--> relocate the shard from node1 to node2");
218-
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, node_1, node_2));
219-
// updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", node_2), "test");
221+
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", node_2), "test");
222+
ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test");
220223

221224
// Relocation will suspend throttling for the paused shard, allow the indexing thread to proceed, thereby releasing
222225
// the indexing permit it holds, in turn allowing relocation to acquire the permits and proceed.
@@ -230,7 +233,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception {
230233
logger.info("--> verifying count after relocation ...");
231234
future.actionGet();
232235
indicesAdmin().prepareRefresh().get();
233-
assertHitCount(prepareSearch("test").setSize(0), 11);
236+
assertHitCount(prepareSearch("test").setSize(0), numDocs + 1);
234237
}
235238

236239
public void testRelocationWhileIndexingRandom() throws Exception {
@@ -283,7 +286,8 @@ public void testRelocationWhileIndexingRandom() throws Exception {
283286
// Activate index throttling on "test" index primary shard
284287
shard.activateThrottling();
285288
// Verify that indexing is throttled for this shard
286-
assertThat(shard.getEngineOrNull().isThrottled(), equalTo(true));
289+
Engine engine = shard.getEngineOrNull();
290+
assertThat(engine != null && engine.isThrottled(), equalTo(true));
287291
}
288292
logger.info("--> starting relocations...");
289293
int nodeShiftBased = numberOfReplicas; // if we have replicas shift those
@@ -296,10 +300,9 @@ public void testRelocationWhileIndexingRandom() throws Exception {
296300
logger.debug("--> Allow indexer to index [{}] documents", numDocs);
297301
indexer.continueIndexing(numDocs);
298302
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
299-
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, nodes[fromNode], nodes[toNode]));
300303

301-
// updateIndexSettings(Settings.builder().put("index.routing.allocation.include._id", nodes[toNode]), "test");
302-
// ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test");
304+
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", nodes[toNode]), "test");
305+
ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test");
303306

304307
if (rarely()) {
305308
logger.debug("--> flushing");
@@ -321,7 +324,8 @@ public void testRelocationWhileIndexingRandom() throws Exception {
321324
shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
322325
shard.activateThrottling();
323326
// Verify that indexing is throttled for this shard
324-
assertThat(shard.getEngineOrNull().isThrottled(), equalTo(true));
327+
Engine engine = shard.getEngineOrNull();
328+
assertThat(engine != null && engine.isThrottled(), equalTo(true));
325329
}
326330
}
327331
logger.info("--> done relocations");

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2276,8 +2276,6 @@ public interface Warmer {
22762276
*/
22772277
public abstract void resumeThrottling();
22782278

2279-
public abstract boolean isIndexingPaused();
2280-
22812279
/**
22822280
* This method replays translog to restore the Lucene index which might be reverted previously.
22832281
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2867,11 +2867,6 @@ public boolean isThrottled() {
28672867
return throttle.isThrottled();
28682868
}
28692869

2870-
@Override
2871-
public boolean isIndexingPaused() {
2872-
return throttle.isIndexingPaused();
2873-
}
2874-
28752870
boolean throttleLockIsHeldByCurrentThread() { // to be used in assertions and tests only
28762871
return throttle.throttleLockIsHeldByCurrentThread();
28772872
}

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -513,11 +513,6 @@ public void suspendThrottling() {}
513513
@Override
514514
public void resumeThrottling() {}
515515

516-
@Override
517-
public boolean isIndexingPaused() {
518-
return (false);
519-
}
520-
521516
@Override
522517
public void trimUnreferencedTranslogFiles() {}
523518

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2757,26 +2757,15 @@ public void deactivateThrottling() {
27572757
}
27582758
}
27592759

2760-
public boolean isIndexingPaused() {
2761-
Engine engine = getEngineOrNull();
2762-
final boolean indexingPaused;
2763-
if (engine == null) {
2764-
indexingPaused = false;
2765-
} else {
2766-
indexingPaused = engine.isIndexingPaused();
2767-
}
2768-
return (indexingPaused);
2769-
}
2770-
2771-
public void suspendThrottling() {
2760+
private void suspendThrottling() {
27722761
try {
27732762
getEngine().suspendThrottling();
27742763
} catch (AlreadyClosedException ex) {
27752764
// ignore
27762765
}
27772766
}
27782767

2779-
public void resumeThrottling() {
2768+
private void resumeThrottling() {
27802769
try {
27812770
getEngine().resumeThrottling();
27822771
} catch (AlreadyClosedException ex) {
@@ -3850,13 +3839,16 @@ public void blockOperations(
38503839
final TimeUnit timeUnit,
38513840
final Executor executor
38523841
) {
3853-
indexShardOperationPermits.delayOperations();
38543842
// In case indexing is paused on the shard, suspend throttling so that any currently paused task can
38553843
// go ahead and release the indexing permit it holds.
38563844
suspendThrottling();
3857-
indexShardOperationPermits.waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor);
3858-
// TODO: Does this do anything ? Looks like the relocated shard does not have throttling enabled
3859-
resumeThrottling();
3845+
try {
3846+
indexShardOperationPermits.blockOperations(ActionListener.runAfter(onAcquired, this::resumeThrottling),
3847+
timeout, timeUnit, executor);
3848+
} catch (IndexShardClosedException e) {
3849+
resumeThrottling();
3850+
throw e;
3851+
}
38603852
}
38613853

38623854
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void blockOperations(
9090
waitUntilBlocked(ActionListener.assertOnce(onAcquired), timeout, timeUnit, executor);
9191
}
9292

93-
protected void waitUntilBlocked(ActionListener<Releasable> onAcquired, long timeout, TimeUnit timeUnit, Executor executor) {
93+
private void waitUntilBlocked(ActionListener<Releasable> onAcquired, long timeout, TimeUnit timeUnit, Executor executor) {
9494
executor.execute(new AbstractRunnable() {
9595

9696
final Releasable released = Releasables.releaseOnce(() -> releaseDelayedOperations());
@@ -126,7 +126,7 @@ protected void doRun() {
126126
});
127127
}
128128

129-
protected void delayOperations() {
129+
private void delayOperations() {
130130
if (closed) {
131131
throw new IndexShardClosedException(shardId);
132132
}

0 commit comments

Comments
 (0)