@@ -193,6 +193,99 @@ public static void validate(final String repositoryName, final String snapshotNa
193193 }
194194 }
195195
196+ /**
197+ * Assert that there are no snapshots that have a shard that is waiting to be assigned even though the cluster state would allow for it
198+ * to be assigned
199+ */
200+ public static boolean assertNoDanglingSnapshots (ClusterState state ) {
201+ final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress .get (state );
202+ final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress .get (state );
203+ final Set <ProjectRepo > reposWithRunningDelete = snapshotDeletionsInProgress .getEntries ()
204+ .stream ()
205+ .filter (entry -> entry .state () == SnapshotDeletionsInProgress .State .STARTED )
206+ .map (entry -> new ProjectRepo (entry .projectId (), entry .repository ()))
207+ .collect (Collectors .toSet ());
208+ for (List <SnapshotsInProgress .Entry > repoEntry : snapshotsInProgress .entriesByRepo ()) {
209+ final SnapshotsInProgress .Entry entry = repoEntry .get (0 );
210+ for (SnapshotsInProgress .ShardSnapshotStatus value : entry .shardSnapshotStatusByRepoShardId ().values ()) {
211+ if (value .equals (SnapshotsInProgress .ShardSnapshotStatus .UNASSIGNED_QUEUED )) {
212+ assert reposWithRunningDelete .contains (new ProjectRepo (entry .projectId (), entry .repository ()))
213+ : "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete" ;
214+ } else if (value .isActive ()) {
215+ assert reposWithRunningDelete .contains (new ProjectRepo (entry .projectId (), entry .repository ())) == false
216+ : "Found shard snapshot actively executing in ["
217+ + entry
218+ + "] when it should be blocked by a running delete ["
219+ + Strings .toString (snapshotDeletionsInProgress )
220+ + "]" ;
221+ }
222+ }
223+ }
224+ return true ;
225+ }
226+
227+ /**
228+ * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository.
229+ *
230+ * @param repositoryMetaVersion version to check
231+ * @return true if version supports {@link ShardGenerations}
232+ */
233+ public static boolean useShardGenerations (IndexVersion repositoryMetaVersion ) {
234+ return repositoryMetaVersion .onOrAfter (SHARD_GEN_IN_REPO_DATA_VERSION );
235+ }
236+
237+ /**
238+ * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository.
239+ *
240+ * @param repositoryMetaVersion version to check
241+ * @return true if version supports {@link ShardGenerations}
242+ */
243+ public static boolean useIndexGenerations (IndexVersion repositoryMetaVersion ) {
244+ return repositoryMetaVersion .onOrAfter (INDEX_GEN_IN_REPO_DATA_VERSION );
245+ }
246+
247+ /**
248+ * Checks whether the metadata version supports writing the cluster- and repository-uuid to the repository.
249+ *
250+ * @param repositoryMetaVersion version to check
251+ * @return true if version supports writing cluster- and repository-uuid to the repository
252+ */
253+ public static boolean includesUUIDs (IndexVersion repositoryMetaVersion ) {
254+ return repositoryMetaVersion .onOrAfter (UUIDS_IN_REPO_DATA_VERSION );
255+ }
256+
257+ public static boolean includeFileInfoWriterUUID (IndexVersion repositoryMetaVersion ) {
258+ return repositoryMetaVersion .onOrAfter (FILE_INFO_WRITER_UUIDS_IN_SHARD_DATA_VERSION );
259+ }
260+
261+ public static boolean supportsNodeRemovalTracking (ClusterState clusterState ) {
262+ return clusterState .getMinTransportVersion ().onOrAfter (TransportVersions .V_8_13_0 );
263+ }
264+
265+ /**
266+ * Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository.
267+ *
268+ * @param entry snapshot entry
269+ * @return true if entry is currently writing to the repository
270+ */
271+ public static boolean isWritingToRepository (SnapshotsInProgress .Entry entry ) {
272+ if (entry .state ().completed ()) {
273+ // Entry is writing to the repo because it's finalizing on master
274+ return true ;
275+ }
276+ for (SnapshotsInProgress .ShardSnapshotStatus value : entry .shardSnapshotStatusByRepoShardId ().values ()) {
277+ if (value .isActive ()) {
278+ // Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard
279+ return true ;
280+ }
281+ }
282+ return false ;
283+ }
284+
285+ public static boolean isQueued (@ Nullable SnapshotsInProgress .ShardSnapshotStatus status ) {
286+ return status != null && status .state () == SnapshotsInProgress .ShardState .QUEUED ;
287+ }
288+
196289 public static FinalizeSnapshotContext .UpdatedShardGenerations buildGenerations (SnapshotsInProgress .Entry snapshot , Metadata metadata ) {
197290 ShardGenerations .Builder builder = ShardGenerations .builder ();
198291 ShardGenerations .Builder deletedBuilder = null ;
@@ -295,37 +388,6 @@ public static List<SnapshotsInProgress.Entry> currentSnapshots(
295388 return unmodifiableList (builder );
296389 }
297390
298- /**
299- * Assert that there are no snapshots that have a shard that is waiting to be assigned even though the cluster state would allow for it
300- * to be assigned
301- */
302- public static boolean assertNoDanglingSnapshots (ClusterState state ) {
303- final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress .get (state );
304- final SnapshotDeletionsInProgress snapshotDeletionsInProgress = SnapshotDeletionsInProgress .get (state );
305- final Set <ProjectRepo > reposWithRunningDelete = snapshotDeletionsInProgress .getEntries ()
306- .stream ()
307- .filter (entry -> entry .state () == SnapshotDeletionsInProgress .State .STARTED )
308- .map (entry -> new ProjectRepo (entry .projectId (), entry .repository ()))
309- .collect (Collectors .toSet ());
310- for (List <SnapshotsInProgress .Entry > repoEntry : snapshotsInProgress .entriesByRepo ()) {
311- final SnapshotsInProgress .Entry entry = repoEntry .get (0 );
312- for (SnapshotsInProgress .ShardSnapshotStatus value : entry .shardSnapshotStatusByRepoShardId ().values ()) {
313- if (value .equals (SnapshotsInProgress .ShardSnapshotStatus .UNASSIGNED_QUEUED )) {
314- assert reposWithRunningDelete .contains (new ProjectRepo (entry .projectId (), entry .repository ()))
315- : "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete" ;
316- } else if (value .isActive ()) {
317- assert reposWithRunningDelete .contains (new ProjectRepo (entry .projectId (), entry .repository ())) == false
318- : "Found shard snapshot actively executing in ["
319- + entry
320- + "] when it should be blocked by a running delete ["
321- + Strings .toString (snapshotDeletionsInProgress )
322- + "]" ;
323- }
324- }
325- }
326- return true ;
327- }
328-
329391 /**
330392 * Walks through the snapshot entries' shard snapshots and creates applies updates from looking at removed nodes or indexes and known
331393 * failed shard snapshots on the same shard IDs.
@@ -801,26 +863,6 @@ public static SnapshotDeletionsInProgress deletionsWithoutSnapshots(
801863 return changed ? SnapshotDeletionsInProgress .of (updatedEntries ) : null ;
802864 }
803865
804- /**
805- * Checks if the given {@link SnapshotsInProgress.Entry} is currently writing to the repository.
806- *
807- * @param entry snapshot entry
808- * @return true if entry is currently writing to the repository
809- */
810- public static boolean isWritingToRepository (SnapshotsInProgress .Entry entry ) {
811- if (entry .state ().completed ()) {
812- // Entry is writing to the repo because it's finalizing on master
813- return true ;
814- }
815- for (SnapshotsInProgress .ShardSnapshotStatus value : entry .shardSnapshotStatusByRepoShardId ().values ()) {
816- if (value .isActive ()) {
817- // Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard
818- return true ;
819- }
820- }
821- return false ;
822- }
823-
824866 /**
825867 * Determines the minimum {@link IndexVersion} that the snapshot repository must be compatible with
826868 * from the current nodes in the cluster and the contents of the repository.
@@ -860,40 +902,6 @@ public static IndexVersion minCompatibleVersion(
860902 return minCompatVersion ;
861903 }
862904
863- /**
864- * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository.
865- *
866- * @param repositoryMetaVersion version to check
867- * @return true if version supports {@link ShardGenerations}
868- */
869- public static boolean useShardGenerations (IndexVersion repositoryMetaVersion ) {
870- return repositoryMetaVersion .onOrAfter (SHARD_GEN_IN_REPO_DATA_VERSION );
871- }
872-
873- /**
874- * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository.
875- *
876- * @param repositoryMetaVersion version to check
877- * @return true if version supports {@link ShardGenerations}
878- */
879- public static boolean useIndexGenerations (IndexVersion repositoryMetaVersion ) {
880- return repositoryMetaVersion .onOrAfter (INDEX_GEN_IN_REPO_DATA_VERSION );
881- }
882-
883- /**
884- * Checks whether the metadata version supports writing the cluster- and repository-uuid to the repository.
885- *
886- * @param repositoryMetaVersion version to check
887- * @return true if version supports writing cluster- and repository-uuid to the repository
888- */
889- public static boolean includesUUIDs (IndexVersion repositoryMetaVersion ) {
890- return repositoryMetaVersion .onOrAfter (UUIDS_IN_REPO_DATA_VERSION );
891- }
892-
893- public static boolean includeFileInfoWriterUUID (IndexVersion repositoryMetaVersion ) {
894- return repositoryMetaVersion .onOrAfter (FILE_INFO_WRITER_UUIDS_IN_SHARD_DATA_VERSION );
895- }
896-
897905 /**
898906 * Shortcut to build new {@link ClusterState} from the current state and updated values of {@link SnapshotsInProgress} and
899907 * {@link SnapshotDeletionsInProgress}.
@@ -1117,10 +1125,6 @@ public static Map<String, DataStreamAlias> filterDataStreamAliases(
11171125 .collect (Collectors .toMap (DataStreamAlias ::getName , Function .identity ()));
11181126 }
11191127
1120- public static boolean isQueued (@ Nullable SnapshotsInProgress .ShardSnapshotStatus status ) {
1121- return status != null && status .state () == SnapshotsInProgress .ShardState .QUEUED ;
1122- }
1123-
11241128 public static void logSnapshotFailure (String operation , Snapshot snapshot , Exception e ) {
11251129 final var logLevel = snapshotFailureLogLevel (e );
11261130 if (logLevel == Level .INFO && logger .isDebugEnabled () == false ) {
@@ -1165,8 +1169,4 @@ public static Level snapshotFailureLogLevel(Exception e) {
11651169 }
11661170 return Level .WARN ;
11671171 }
1168-
1169- public static boolean supportsNodeRemovalTracking (ClusterState clusterState ) {
1170- return clusterState .getMinTransportVersion ().onOrAfter (TransportVersions .V_8_13_0 );
1171- }
11721172}
0 commit comments