Skip to content

Commit ed2007e

Browse files
authored
Move reusable code into SnapshotShardContextHelper (#144477)
Move shard validation, resharding checks, abort listener setup, and commit cleanup logic from `LocalPrimarySnapshotShardContextFactory` into static methods on `SnapshotShardContextHelper` so they can be reused by other implementations. Relates ES-14099 Note: This is the first step towards adding full support for stateless snapshot. See #142527 for the more complete changes. Raising this PR first for refactoring changes to keep the changeset in control and hopefully easier to review.
1 parent 737ca4e commit ed2007e

File tree

4 files changed

+194
-171
lines changed

4 files changed

+194
-171
lines changed

server/src/main/java/org/elasticsearch/repositories/LocalPrimarySnapshotShardContextFactory.java

Lines changed: 25 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,20 @@
1111

1212
import org.elasticsearch.action.ActionListener;
1313
import org.elasticsearch.action.support.SubscribableListener;
14-
import org.elasticsearch.cluster.SnapshotsInProgress;
1514
import org.elasticsearch.cluster.service.ClusterService;
16-
import org.elasticsearch.common.Strings;
17-
import org.elasticsearch.index.IndexReshardService;
1815
import org.elasticsearch.index.IndexVersion;
1916
import org.elasticsearch.index.shard.IndexShard;
20-
import org.elasticsearch.index.shard.IndexShardState;
2117
import org.elasticsearch.index.shard.ShardId;
22-
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
2318
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
2419
import org.elasticsearch.indices.IndicesService;
2520
import org.elasticsearch.logging.LogManager;
2621
import org.elasticsearch.logging.Logger;
2722
import org.elasticsearch.snapshots.Snapshot;
28-
import org.elasticsearch.threadpool.ThreadPool;
2923

3024
import java.io.IOException;
3125

26+
import static org.elasticsearch.repositories.SnapshotShardContextHelper.acquireSnapshotIndexCommit;
27+
import static org.elasticsearch.repositories.SnapshotShardContextHelper.closeSnapshotIndexCommit;
3228
import static org.elasticsearch.snapshots.SnapshotShardsService.getShardStateId;
3329

3430
/**
@@ -57,157 +53,34 @@ public SubscribableListener<SnapshotShardContext> asyncCreate(
5753
long snapshotStartTime,
5854
ActionListener<ShardSnapshotResult> listener
5955
) throws IOException {
60-
6156
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
62-
if (indexShard.routingEntry().primary() == false) {
63-
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
64-
}
65-
if (indexShard.routingEntry().relocating()) {
66-
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
67-
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
68-
}
69-
70-
final IndexShardState indexShardState = indexShard.state();
71-
if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
72-
// shard has just been created, or still recovering
73-
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
74-
}
75-
76-
SnapshotIndexCommit snapshotIndexCommit = null;
57+
final var snapshotIndexCommit = acquireSnapshotIndexCommit(
58+
clusterService,
59+
indexShard,
60+
snapshot,
61+
supportsRelocationDuringSnapshot(),
62+
snapshotStatus
63+
);
7764
try {
78-
snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush");
79-
snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot());
80-
81-
// The check below is needed to handle shard snapshots during resharding.
82-
// Resharding changes the number of shards in the index and moves data between shards.
83-
// These processes may cause shard snapshots to be inconsistent with each other (e.g. caught in between data movements)
84-
// or to be out of sync with index metadata (e.g. a newly added shard is not present in the snapshot).
85-
// We want to detect if a resharding operation has happened after this snapshot was started
86-
// and if so we'll fail the shard snapshot to avoid such inconsistency.
87-
// We perform this check here on the data node and not on the master node
88-
// to correctly propagate this failure to SnapshotsService using existing listener
89-
// in case resharding starts in the middle of the snapshot.
90-
// Marking shard as failed directly in the cluster state would bypass parts of SnapshotsService logic.
91-
92-
// We obtain a new `SnapshotsInProgress.Entry` here in order to not capture the original in the Runnable.
93-
// The information that we are interested in (the shards map keys) doesn't change so this is fine.
94-
SnapshotsInProgress.Entry snapshotEntry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
95-
// The snapshot is deleted, there is no reason to proceed.
96-
if (snapshotEntry == null) {
97-
throw new IndexShardSnapshotFailedException(shardId, "snapshot is deleted");
98-
}
99-
100-
int maximumShardIdForIndexInTheSnapshot = calculateMaximumShardIdForIndexInTheSnapshot(shardId, snapshotEntry);
101-
if (IndexReshardService.isShardSnapshotImpactedByResharding(
102-
indexShard.indexSettings().getIndexMetadata(),
103-
maximumShardIdForIndexInTheSnapshot
104-
)) {
105-
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot a shard during resharding");
106-
}
107-
108-
snapshotStatus.updateStatusDescription("commit reference acquired, proceeding with snapshot");
10965
final var shardStateId = getShardStateId(indexShard, snapshotIndexCommit.indexCommit()); // not aborted so indexCommit() ok
110-
snapshotStatus.addAbortListener(makeAbortListener(indexShard.shardId(), snapshot, snapshotIndexCommit));
111-
snapshotStatus.ensureNotAborted();
112-
113-
final var snapshotShardContextListener = doAsyncCreate(
114-
shardId,
115-
snapshot,
116-
indexId,
117-
snapshotStatus,
118-
repositoryMetaVersion,
119-
snapshotStartTime,
120-
listener,
121-
indexShard,
122-
snapshotIndexCommit,
123-
shardStateId
66+
return SubscribableListener.newSucceeded(
67+
new LocalPrimarySnapshotShardContext(
68+
indexShard.store(),
69+
indexShard.mapperService(),
70+
snapshot.getSnapshotId(),
71+
indexId,
72+
snapshotIndexCommit,
73+
shardStateId,
74+
snapshotStatus,
75+
repositoryMetaVersion,
76+
snapshotStartTime,
77+
listener
78+
)
12479
);
125-
snapshotIndexCommit = null;
126-
return snapshotShardContextListener;
127-
} finally {
128-
if (snapshotIndexCommit != null) {
129-
snapshotIndexCommit.closingBefore(new ActionListener<Void>() {
130-
@Override
131-
public void onResponse(Void unused) {}
132-
133-
@Override
134-
public void onFailure(Exception e) {
135-
// we're already failing exceptionally, and prefer to propagate the original exception instead of this one
136-
logger.warn(Strings.format("exception closing commit for [%s] in [%s]", shardId, snapshot), e);
137-
}
138-
}).onResponse(null);
139-
}
80+
} catch (Exception e) {
81+
closeSnapshotIndexCommit(snapshotIndexCommit, shardId, snapshot);
82+
throw e;
14083
}
14184
}
14285

143-
protected SubscribableListener<SnapshotShardContext> doAsyncCreate(
144-
ShardId shardId,
145-
Snapshot snapshot,
146-
IndexId indexId,
147-
IndexShardSnapshotStatus snapshotStatus,
148-
IndexVersion repositoryMetaVersion,
149-
long snapshotStartTime,
150-
ActionListener<ShardSnapshotResult> listener,
151-
IndexShard indexShard,
152-
SnapshotIndexCommit snapshotIndexCommit,
153-
String shardStateId
154-
) {
155-
return SubscribableListener.newSucceeded(
156-
new LocalPrimarySnapshotShardContext(
157-
indexShard.store(),
158-
indexShard.mapperService(),
159-
snapshot.getSnapshotId(),
160-
indexId,
161-
snapshotIndexCommit,
162-
shardStateId,
163-
snapshotStatus,
164-
repositoryMetaVersion,
165-
snapshotStartTime,
166-
listener
167-
)
168-
);
169-
}
170-
171-
private static int calculateMaximumShardIdForIndexInTheSnapshot(ShardId shardIdStartingASnapshot, SnapshotsInProgress.Entry entry) {
172-
int maximum = shardIdStartingASnapshot.id();
173-
int i = maximum + 1;
174-
175-
while (entry.shards().containsKey(new ShardId(shardIdStartingASnapshot.getIndex(), i))) {
176-
maximum = i;
177-
i += 1;
178-
}
179-
180-
return maximum;
181-
}
182-
183-
static ActionListener<IndexShardSnapshotStatus.AbortStatus> makeAbortListener(
184-
ShardId shardId,
185-
Snapshot snapshot,
186-
SnapshotIndexCommit snapshotIndexCommit
187-
) {
188-
return new ActionListener<>() {
189-
@Override
190-
public void onResponse(IndexShardSnapshotStatus.AbortStatus abortStatus) {
191-
if (abortStatus == IndexShardSnapshotStatus.AbortStatus.ABORTED) {
192-
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.GENERIC, ThreadPool.Names.SNAPSHOT);
193-
snapshotIndexCommit.onAbort();
194-
}
195-
}
196-
197-
@Override
198-
public void onFailure(Exception e) {
199-
logger.error(() -> Strings.format("unexpected failure in %s", description()), e);
200-
assert false : e;
201-
}
202-
203-
@Override
204-
public String toString() {
205-
return description();
206-
}
207-
208-
private String description() {
209-
return Strings.format("abort listener for [%s] in [%s]", shardId, snapshot);
210-
}
211-
};
212-
}
21386
}

server/src/main/java/org/elasticsearch/repositories/SnapshotShardContextFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,13 @@ SubscribableListener<SnapshotShardContext> asyncCreate(
4848
) throws IOException;
4949

5050
/**
51-
* Indicates whether this factory wants to ignore shard close events while a shard snapshot is running.
52-
* Defaults to false. It should return false unless the shard snapshot allows the shard to relocate.
51+
* Indicates whether the factory supports relocating a shard while its snapshot is in progress. When {@code true},
52+
* lifecycle of the local shard is not tied to its shard snapshot. For example, when the shard closes, it does
53+
* not automatically abort the snapshot {@link org.elasticsearch.snapshots.SnapshotShardsService#beforeIndexShardClosed}.
54+
* Note this value indicates whether the feature is supported, but whether relocation will actually happen still depends
55+
* on other factors {@link org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider}
5356
*/
54-
default boolean ignoreShardCloseEvent() {
57+
default boolean supportsRelocationDuringSnapshot() {
5558
return false;
5659
}
5760
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.repositories;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.cluster.SnapshotsInProgress;
14+
import org.elasticsearch.cluster.service.ClusterService;
15+
import org.elasticsearch.common.Strings;
16+
import org.elasticsearch.core.Nullable;
17+
import org.elasticsearch.index.IndexReshardService;
18+
import org.elasticsearch.index.shard.IndexShard;
19+
import org.elasticsearch.index.shard.IndexShardState;
20+
import org.elasticsearch.index.shard.ShardId;
21+
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
22+
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
23+
import org.elasticsearch.logging.LogManager;
24+
import org.elasticsearch.logging.Logger;
25+
import org.elasticsearch.snapshots.Snapshot;
26+
import org.elasticsearch.threadpool.ThreadPool;
27+
28+
public class SnapshotShardContextHelper {
29+
public static final Logger logger = LogManager.getLogger(SnapshotShardContextHelper.class);
30+
31+
private SnapshotShardContextHelper() {}
32+
33+
/**
34+
* Acquire an index commit for the shard snapshot, validating that the shard is a started primary and no resharding is in progress.
35+
* A {@code null} {@code snapshotStatus} means the snapshot is running on a remote node, abort handling and status updates skipped
36+
* on this node as they are handled on the remote node.
37+
*/
38+
public static SnapshotIndexCommit acquireSnapshotIndexCommit(
39+
ClusterService clusterService,
40+
IndexShard indexShard,
41+
Snapshot snapshot,
42+
boolean supportsRelocationDuringSnapshot,
43+
@Nullable IndexShardSnapshotStatus snapshotStatus
44+
) {
45+
final var shardId = indexShard.shardId();
46+
if (indexShard.routingEntry().primary() == false) {
47+
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
48+
}
49+
// TODO: usage for supportsRelocationDuringSnapshot will be added in a future PR, see also ES-14099
50+
assert supportsRelocationDuringSnapshot == false;
51+
if (indexShard.routingEntry().relocating()) {
52+
// do not snapshot when in the process of relocation of primaries so we won't get conflicts
53+
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating");
54+
}
55+
56+
final IndexShardState indexShardState = indexShard.state();
57+
if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) {
58+
// shard has just been created, or still recovering
59+
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
60+
}
61+
62+
if (snapshotStatus != null) {
63+
snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush");
64+
}
65+
final var snapshotIndexCommit = new SnapshotIndexCommit(indexShard.acquireIndexCommitForSnapshot());
66+
try {
67+
68+
// The check below is needed to handle shard snapshots during resharding.
69+
// Resharding changes the number of shards in the index and moves data between shards.
70+
// These processes may cause shard snapshots to be inconsistent with each other (e.g. caught in between data movements)
71+
// or to be out of sync with index metadata (e.g. a newly added shard is not present in the snapshot).
72+
// We want to detect if a resharding operation has happened after this snapshot was started
73+
// and if so we'll fail the shard snapshot to avoid such inconsistency.
74+
// We perform this check here on the data node and not on the master node
75+
// to correctly propagate this failure to SnapshotsService using existing listener
76+
// in case resharding starts in the middle of the snapshot.
77+
// Marking shard as failed directly in the cluster state would bypass parts of SnapshotsService logic.
78+
79+
// We obtain a new `SnapshotsInProgress.Entry` here in order to not capture the original in the Runnable.
80+
// The information that we are interested in (the shards map keys) doesn't change so this is fine.
81+
SnapshotsInProgress.Entry snapshotEntry = SnapshotsInProgress.get(clusterService.state()).snapshot(snapshot);
82+
// The snapshot is deleted, there is no reason to proceed.
83+
if (snapshotEntry == null) {
84+
throw new IndexShardSnapshotFailedException(shardId, "snapshot is deleted");
85+
}
86+
87+
int maximumShardIdForIndexInTheSnapshot = calculateMaximumShardIdForIndexInTheSnapshot(shardId, snapshotEntry);
88+
if (IndexReshardService.isShardSnapshotImpactedByResharding(
89+
indexShard.indexSettings().getIndexMetadata(),
90+
maximumShardIdForIndexInTheSnapshot
91+
)) {
92+
throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot a shard during resharding");
93+
}
94+
95+
if (snapshotStatus != null) {
96+
snapshotStatus.updateStatusDescription("commit reference acquired, proceeding with snapshot");
97+
snapshotStatus.addAbortListener(makeAbortListener(indexShard.shardId(), snapshot, snapshotIndexCommit));
98+
snapshotStatus.ensureNotAborted();
99+
}
100+
return snapshotIndexCommit;
101+
} catch (Exception e) {
102+
closeSnapshotIndexCommit(snapshotIndexCommit, shardId, snapshot);
103+
throw e;
104+
}
105+
}
106+
107+
public static void closeSnapshotIndexCommit(SnapshotIndexCommit snapshotIndexCommit, ShardId shardId, Snapshot snapshot) {
108+
snapshotIndexCommit.closingBefore(new ActionListener<Void>() {
109+
@Override
110+
public void onResponse(Void unused) {}
111+
112+
@Override
113+
public void onFailure(Exception e) {
114+
// we're already failing exceptionally, and prefer to propagate the original exception instead of this one
115+
logger.warn(Strings.format("exception closing commit for [%s] in [%s]", shardId, snapshot), e);
116+
}
117+
}).onResponse(null);
118+
}
119+
120+
private static int calculateMaximumShardIdForIndexInTheSnapshot(ShardId shardIdStartingASnapshot, SnapshotsInProgress.Entry entry) {
121+
int maximum = shardIdStartingASnapshot.id();
122+
int i = maximum + 1;
123+
124+
while (entry.shards().containsKey(new ShardId(shardIdStartingASnapshot.getIndex(), i))) {
125+
maximum = i;
126+
i += 1;
127+
}
128+
129+
return maximum;
130+
}
131+
132+
private static ActionListener<IndexShardSnapshotStatus.AbortStatus> makeAbortListener(
133+
ShardId shardId,
134+
Snapshot snapshot,
135+
SnapshotIndexCommit snapshotIndexCommit
136+
) {
137+
return new ActionListener<>() {
138+
@Override
139+
public void onResponse(IndexShardSnapshotStatus.AbortStatus abortStatus) {
140+
if (abortStatus == IndexShardSnapshotStatus.AbortStatus.ABORTED) {
141+
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.GENERIC, ThreadPool.Names.SNAPSHOT);
142+
snapshotIndexCommit.onAbort();
143+
}
144+
}
145+
146+
@Override
147+
public void onFailure(Exception e) {
148+
logger.error(() -> Strings.format("unexpected failure in %s", description()), e);
149+
assert false : e;
150+
}
151+
152+
@Override
153+
public String toString() {
154+
return description();
155+
}
156+
157+
private String description() {
158+
return Strings.format("abort listener for [%s] in [%s]", shardId, snapshot);
159+
}
160+
};
161+
}
162+
}

0 commit comments

Comments
 (0)