Skip to content

Commit c9438b4

Browse files
committed
fix test + throttle only for primary
1 parent 371dfbe commit c9438b4

File tree

2 files changed

+23
-13
lines changed

2 files changed

+23
-13
lines changed

server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import java.util.concurrent.Semaphore;
7878
import java.util.concurrent.TimeUnit;
7979
import java.util.concurrent.atomic.AtomicBoolean;
80+
import java.util.concurrent.locks.LockSupport;
8081
import java.util.stream.Collectors;
8182
import java.util.stream.Stream;
8283

@@ -206,6 +207,7 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception {
206207
// Verify that indexing is paused for the throttled shard
207208
Engine engine = shard.getEngineOrNull();
208209
assertThat(engine != null && engine.isThrottled(), equalTo(true));
210+
209211
// Try to index a document into the "test" index which is currently throttled
210212
logger.info("--> Try to index a doc while indexing is paused");
211213
IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20);
@@ -229,6 +231,12 @@ public void testSimpleRelocationWithIndexingPaused() throws Exception {
229231
.get();
230232
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
231233

234+
logger.info("--> verifying shard primary has relocated ...");
235+
indicesService = internalCluster().getInstance(IndicesService.class, node_2);
236+
shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
237+
assertThat(shard.routingEntry().primary(), equalTo(true));
238+
engine = shard.getEngineOrNull();
239+
assertThat(engine != null && engine.isThrottled(), equalTo(false));
232240
logger.info("--> verifying count after relocation ...");
233241
future.actionGet();
234242
indicesAdmin().prepareRefresh().get();
@@ -258,6 +266,7 @@ public void testRelocationWhileIndexingRandom() throws Exception {
258266
logger.info("--> creating test index ...");
259267
prepareCreate("test", indexSettings(1, numberOfReplicas)).get();
260268

269+
// Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless
261270
for (int i = 2; i <= numberOfNodes; i++) {
262271
logger.info("--> starting [node{}] ...", i);
263272
nodes[i - 1] = internalCluster().startNode(
@@ -280,15 +289,21 @@ public void testRelocationWhileIndexingRandom() throws Exception {
280289
logger.info("--> {} docs indexed", numDocs);
281290

282291
logger.info("--> starting relocations...");
283-
int nodeShiftBased = numberOfReplicas; // if we have replicas shift those
292+
293+
// When we have a replica, the primary is on node 0 and replica is on node 1. We cannot move primary
294+
// to a node containing the replica, so relocation of primary needs to happen between node 0 and 2.
295+
// When there is no replica, we only have 2 nodes and primary relocates back and forth between node 0 and 1.
284296
for (int i = 0; i < numberOfRelocations; i++) {
285297
int fromNode = (i % 2);
286298
int toNode = fromNode == 0 ? 1 : 0;
287-
fromNode += nodeShiftBased;
288-
toNode += nodeShiftBased;
299+
if (numberOfReplicas == 1) {
300+
fromNode = fromNode == 1 ? 2 : 0;
301+
toNode = toNode == 1 ? 2 : 0;
302+
}
303+
289304
numDocs = scaledRandomIntBetween(200, 1000);
290305

291-
// Throttle indexing on source shard
306+
// Throttle indexing on primary shard
292307
if (throttleIndexing) {
293308
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[fromNode]);
294309
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
@@ -303,8 +318,7 @@ public void testRelocationWhileIndexingRandom() throws Exception {
303318
indexer.continueIndexing(numDocs);
304319
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);
305320

306-
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", nodes[toNode]), "test");
307-
ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test");
321+
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, nodes[fromNode], nodes[toNode]));
308322

309323
if (rarely()) {
310324
logger.debug("--> flushing");
@@ -314,18 +328,13 @@ public void testRelocationWhileIndexingRandom() throws Exception {
314328
.setWaitForEvents(Priority.LANGUID)
315329
.setWaitForNoRelocatingShards(true)
316330
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
331+
.setWaitForGreenStatus()
317332
.get();
318333
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
319334
indexer.pauseIndexing();
320335
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
321-
// Deactivate throttle on source shard
322-
if (throttleIndexing) {
323-
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[fromNode]);
324-
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
325-
logger.info("--> deactivate throttling for shard on node {}...", nodes[fromNode]);
326-
shard.deactivateThrottling();
327-
}
328336
}
337+
329338
logger.info("--> done relocations");
330339
logger.info("--> waiting for indexing threads to stop ...");
331340
indexer.stopAndAwaitStopped();

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2754,6 +2754,7 @@ public IndexEventListener getIndexEventListener() {
27542754
* setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread.
27552755
*/
27562756
public void activateThrottling() {
2757+
assert shardRouting.primary(): "only primaries can be throttled: " + shardRouting;
27572758
try {
27582759
getEngine().activateThrottling();
27592760
} catch (AlreadyClosedException ex) {

0 commit comments

Comments
 (0)