Skip to content

Commit 62fee4c

Browse files
Tim-Brooksgmjehovich
authored andcommitted
Trigger after shard listener before post recovery (elastic#134419)
Trigger this listener prior to transitioning to post recovery state so that the shard is not yet writeable.
1 parent 5d8dc11 commit 62fee4c

File tree

3 files changed

+71
-3
lines changed

3 files changed

+71
-3
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,13 @@ default void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings index
202202
listener.onResponse(null);
203203
}
204204

205+
/**
206+
* Called after the recover process is completed. The recovery state is DONE at this point. However, this is triggered prior to the
207+
* index shard state transitions to either POST_RECOVERY to STARTED
208+
*
209+
* @param indexShard the shard that was recovered
210+
* @param listener listener notified when this step completes
211+
*/
205212
default void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void> listener) {
206213
listener.onResponse(null);
207214
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1900,6 +1900,7 @@ public void postRecovery(String reason, ActionListener<Void> listener) throws In
19001900
SubscribableListener<Void> subscribableListener = new SubscribableListener<>();
19011901
postRecoveryComplete = subscribableListener;
19021902
final ActionListener<Void> finalListener = ActionListener.runBefore(listener, () -> subscribableListener.onResponse(null));
1903+
19031904
try {
19041905
// Some engine implementations try to acquire the engine reset write lock during refresh: in case something else is holding the
19051906
// engine read lock at the same time then the refresh is a no-op for those engines. Here we acquire the engine reset write lock
@@ -1922,9 +1923,21 @@ public void postRecovery(String reason, ActionListener<Void> listener) throws In
19221923
throw new IndexShardStartedException(shardId);
19231924
}
19241925
recoveryState.setStage(RecoveryState.Stage.DONE);
1925-
changeState(IndexShardState.POST_RECOVERY, reason);
19261926
}
1927-
indexEventListener.afterIndexShardRecovery(this, finalListener);
1927+
1928+
SubscribableListener.newForked(
1929+
(CheckedConsumer<ActionListener<Void>, Exception>) l -> indexEventListener.afterIndexShardRecovery(IndexShard.this, l)
1930+
).andThenAccept(v -> {
1931+
synchronized (mutex) {
1932+
if (state == IndexShardState.CLOSED) {
1933+
throw new IndexShardClosedException(shardId);
1934+
}
1935+
if (state == IndexShardState.STARTED) {
1936+
throw new IndexShardStartedException(shardId);
1937+
}
1938+
changeState(IndexShardState.POST_RECOVERY, reason);
1939+
}
1940+
}).addListener(finalListener);
19281941
} catch (Exception e) {
19291942
finalListener.onFailure(e);
19301943
}

server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.indices;
1010

1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.support.PlainActionFuture;
1213
import org.elasticsearch.cluster.metadata.IndexMetadata;
1314
import org.elasticsearch.cluster.node.DiscoveryNode;
1415
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -24,18 +25,22 @@
2425
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
2526
import org.elasticsearch.index.shard.IndexEventListener;
2627
import org.elasticsearch.index.shard.IndexShard;
28+
import org.elasticsearch.index.shard.IndexShardState;
2729
import org.elasticsearch.index.shard.IndexShardTestCase;
2830
import org.elasticsearch.index.shard.ShardId;
2931
import org.elasticsearch.indices.cluster.IndexRemovalReason;
3032
import org.elasticsearch.indices.recovery.RecoveryState;
3133
import org.elasticsearch.test.ESSingleNodeTestCase;
3234

3335
import java.util.Arrays;
36+
import java.util.concurrent.TimeUnit;
3437
import java.util.concurrent.atomic.AtomicInteger;
38+
import java.util.concurrent.locks.LockSupport;
3539

3640
import static java.util.Collections.emptySet;
3741
import static org.elasticsearch.indices.cluster.IndexRemovalReason.DELETED;
3842
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
43+
import static org.hamcrest.Matchers.equalTo;
3944

4045
public class IndicesLifecycleListenerSingleNodeTests extends ESSingleNodeTestCase {
4146

@@ -106,7 +111,6 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
106111
assertEquals(9, counter.get());
107112
counter.incrementAndGet();
108113
}
109-
110114
};
111115
indicesService.removeIndex(idx, DELETED, "simon says", EsExecutors.DIRECT_EXECUTOR_SERVICE, ActionListener.noop());
112116
try {
@@ -136,4 +140,48 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
136140
assertEquals(10, counter.get());
137141
}
138142

143+
public void testAfterRecoveryCallbackTriggeredWhileStillInRecoveryState() throws Throwable {
144+
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
145+
assertAcked(client().admin().indices().prepareCreate("test").setSettings(indexSettings(1, 0)));
146+
ensureGreen();
147+
Index idx = resolveIndex("test");
148+
IndexMetadata metadata = indicesService.indexService(idx).getMetadata();
149+
ShardRouting shardRouting = indicesService.indexService(idx).getShard(0).routingEntry();
150+
PlainActionFuture<Void> recoveryTriggered = new PlainActionFuture<>();
151+
IndexEventListener recoveryListener = new IndexEventListener() {
152+
153+
@Override
154+
public void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void> listener) {
155+
// Pause to ensure we do not transition to post recovery until after the recovery is complete
156+
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(30));
157+
assertThat(indexShard.recoveryState().getStage(), equalTo(RecoveryState.Stage.DONE));
158+
assertThat(indexShard.state(), equalTo(IndexShardState.RECOVERING));
159+
recoveryTriggered.onResponse(null);
160+
listener.onResponse(null);
161+
}
162+
};
163+
indicesService.removeIndex(idx, DELETED, "delete", EsExecutors.DIRECT_EXECUTOR_SERVICE, ActionListener.noop());
164+
try {
165+
IndexService index = indicesService.createIndex(metadata, Arrays.asList(recoveryListener), false);
166+
idx = index.index();
167+
ShardRouting newRouting = shardRouting;
168+
String nodeId = newRouting.currentNodeId();
169+
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "boom");
170+
newRouting = newRouting.moveToUnassigned(unassignedInfo)
171+
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
172+
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
173+
IndexShard shard = index.createShard(newRouting, IndexShardTestCase.NOOP_GCP_SYNCER, RetentionLeaseSyncer.EMPTY);
174+
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
175+
final DiscoveryNode localNode = DiscoveryNodeUtils.builder("foo").roles(emptySet()).build();
176+
shard.markAsRecovering("store", new RecoveryState(newRouting, localNode, null));
177+
IndexShardTestCase.recoverFromStore(shard);
178+
assertBusy(() -> assertThat(shard.state(), equalTo(IndexShardState.POST_RECOVERY)));
179+
newRouting = ShardRoutingHelper.moveToStarted(newRouting);
180+
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
181+
recoveryTriggered.actionGet();
182+
assertBusy(() -> assertThat(shard.state(), equalTo(IndexShardState.STARTED)));
183+
} finally {
184+
indicesService.removeIndex(idx, DELETED, "simon says", EsExecutors.DIRECT_EXECUTOR_SERVICE, ActionListener.noop());
185+
}
186+
}
139187
}

0 commit comments

Comments
 (0)