1717import org .elasticsearch .common .Strings ;
1818import org .elasticsearch .common .io .stream .StreamInput ;
1919import org .elasticsearch .common .io .stream .StreamOutput ;
20+ import org .elasticsearch .common .settings .Settings ;
2021import org .elasticsearch .core .Nullable ;
22+ import org .elasticsearch .core .TimeValue ;
2123import org .elasticsearch .license .LicenseUtils ;
2224import org .elasticsearch .license .XPackLicenseState ;
2325import org .elasticsearch .xcontent .ConstructingObjectParser ;
26+ import org .elasticsearch .xcontent .ObjectParser ;
2427import org .elasticsearch .xcontent .ParseField ;
2528import org .elasticsearch .xcontent .XContentBuilder ;
2629import org .elasticsearch .xcontent .XContentParser ;
3336import java .util .List ;
3437import java .util .Objects ;
3538
39+ import static org .elasticsearch .TransportVersions .ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR ;
3640import static org .elasticsearch .snapshots .SearchableSnapshotsSettings .SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY ;
3741import static org .elasticsearch .snapshots .SearchableSnapshotsSettings .SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY ;
3842import static org .elasticsearch .snapshots .SearchableSnapshotsSettings .SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY ;
@@ -51,6 +55,7 @@ public class SearchableSnapshotAction implements LifecycleAction {
5155 public static final ParseField SNAPSHOT_REPOSITORY = new ParseField ("snapshot_repository" );
5256 public static final ParseField FORCE_MERGE_INDEX = new ParseField ("force_merge_index" );
5357 public static final ParseField TOTAL_SHARDS_PER_NODE = new ParseField ("total_shards_per_node" );
58+ public static final ParseField REPLICATE_FOR = new ParseField ("replicate_for" );
5459 public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep .NAME + "-on-datastream-check" ;
5560 public static final String CONDITIONAL_SKIP_ACTION_STEP = BranchingStep .NAME + "-check-prerequisites" ;
5661 public static final String CONDITIONAL_SKIP_GENERATE_AND_CLEAN = BranchingStep .NAME + "-check-existing-snapshot" ;
@@ -60,13 +65,19 @@ public class SearchableSnapshotAction implements LifecycleAction {
6065
6166 private static final ConstructingObjectParser <SearchableSnapshotAction , Void > PARSER = new ConstructingObjectParser <>(
6267 NAME ,
63- a -> new SearchableSnapshotAction ((String ) a [0 ], a [1 ] == null || (boolean ) a [1 ], (Integer ) a [2 ])
68+ a -> new SearchableSnapshotAction ((String ) a [0 ], a [1 ] == null || (boolean ) a [1 ], (Integer ) a [2 ], ( TimeValue ) a [ 3 ] )
6469 );
6570
6671 static {
6772 PARSER .declareString (ConstructingObjectParser .constructorArg (), SNAPSHOT_REPOSITORY );
6873 PARSER .declareBoolean (ConstructingObjectParser .optionalConstructorArg (), FORCE_MERGE_INDEX );
6974 PARSER .declareInt (ConstructingObjectParser .optionalConstructorArg (), TOTAL_SHARDS_PER_NODE );
75+ PARSER .declareField (
76+ ConstructingObjectParser .optionalConstructorArg (),
77+ p -> TimeValue .parseTimeValue (p .textOrNull (), REPLICATE_FOR .getPreferredName ()),
78+ REPLICATE_FOR ,
79+ ObjectParser .ValueType .STRING
80+ );
7081 }
7182
7283 public static SearchableSnapshotAction parse (XContentParser parser ) {
@@ -77,8 +88,15 @@ public static SearchableSnapshotAction parse(XContentParser parser) {
7788 private final boolean forceMergeIndex ;
7889 @ Nullable
7990 private final Integer totalShardsPerNode ;
80-
81- public SearchableSnapshotAction (String snapshotRepository , boolean forceMergeIndex , @ Nullable Integer totalShardsPerNode ) {
91+ @ Nullable
92+ private final TimeValue replicateFor ;
93+
94+ public SearchableSnapshotAction (
95+ String snapshotRepository ,
96+ boolean forceMergeIndex ,
97+ @ Nullable Integer totalShardsPerNode ,
98+ @ Nullable TimeValue replicateFor
99+ ) {
82100 if (Strings .hasText (snapshotRepository ) == false ) {
83101 throw new IllegalArgumentException ("the snapshot repository must be specified" );
84102 }
@@ -89,20 +107,30 @@ public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeInd
89107 throw new IllegalArgumentException ("[" + TOTAL_SHARDS_PER_NODE .getPreferredName () + "] must be >= 1" );
90108 }
91109 this .totalShardsPerNode = totalShardsPerNode ;
110+
111+ if (replicateFor != null && replicateFor .millis () <= 0 ) {
112+ throw new IllegalArgumentException (
113+ "[" + REPLICATE_FOR .getPreferredName () + "] must be positive [" + replicateFor .getStringRep () + "]"
114+ );
115+ }
116+ this .replicateFor = replicateFor ;
92117 }
93118
94119 public SearchableSnapshotAction (String snapshotRepository , boolean forceMergeIndex ) {
95- this (snapshotRepository , forceMergeIndex , null );
120+ this (snapshotRepository , forceMergeIndex , null , null );
96121 }
97122
98123 public SearchableSnapshotAction (String snapshotRepository ) {
99- this (snapshotRepository , true , null );
124+ this (snapshotRepository , true , null , null );
100125 }
101126
102127 public SearchableSnapshotAction (StreamInput in ) throws IOException {
103128 this .snapshotRepository = in .readString ();
104129 this .forceMergeIndex = in .readBoolean ();
105130 this .totalShardsPerNode = in .getTransportVersion ().onOrAfter (TransportVersions .V_8_16_0 ) ? in .readOptionalInt () : null ;
131+ this .replicateFor = in .getTransportVersion ().onOrAfter (ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR )
132+ ? in .readOptionalTimeValue ()
133+ : null ;
106134 }
107135
108136 boolean isForceMergeIndex () {
@@ -118,6 +146,11 @@ public Integer getTotalShardsPerNode() {
118146 return totalShardsPerNode ;
119147 }
120148
149+ @ Nullable
150+ public TimeValue getReplicateFor () {
151+ return replicateFor ;
152+ }
153+
121154 @ Override
122155 public List <Step > toSteps (Client client , String phase , StepKey nextStepKey ) {
123156 assert false ;
@@ -145,6 +178,8 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
145178 StepKey swapAliasesKey = new StepKey (phase , NAME , SwapAliasesAndDeleteSourceIndexStep .NAME );
146179 StepKey replaceDataStreamIndexKey = new StepKey (phase , NAME , ReplaceDataStreamBackingIndexStep .NAME );
147180 StepKey deleteIndexKey = new StepKey (phase , NAME , DeleteStep .NAME );
181+ StepKey replicateForKey = new StepKey (phase , NAME , WaitUntilReplicateForTimePassesStep .NAME );
182+ StepKey dropReplicasKey = new StepKey (phase , NAME , UpdateSettingsStep .NAME );
148183
149184 // Before going through all these steps, first check if we need to do them at all. For example, the index could already be
150185 // a searchable snapshot of the same type and repository, in which case we don't need to do anything. If that is detected,
@@ -319,19 +354,20 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
319354 getRestoredIndexPrefix (mountSnapshotKey ),
320355 storageType ,
321356 totalShardsPerNode ,
322- 0
357+ replicateFor != null ? 1 : 0 // if the 'replicate_for' option is set, then have a replica, otherwise don't
323358 );
324359 WaitForIndexColorStep waitForGreenIndexHealthStep = new WaitForIndexColorStep (
325360 waitForGreenRestoredIndexKey ,
326361 copyMetadataKey ,
327362 ClusterHealthStatus .GREEN ,
328363 getRestoredIndexPrefix (waitForGreenRestoredIndexKey )
329364 );
365+ StepKey keyForReplicateForOrContinue = replicateFor != null ? replicateForKey : nextStepKey ;
330366 CopyExecutionStateStep copyMetadataStep = new CopyExecutionStateStep (
331367 copyMetadataKey ,
332368 copyLifecyclePolicySettingKey ,
333369 (index , executionState ) -> getRestoredIndexPrefix (copyMetadataKey ) + index ,
334- nextStepKey
370+ keyForReplicateForOrContinue
335371 );
336372 CopySettingsStep copySettingsStep = new CopySettingsStep (
337373 copyLifecyclePolicySettingKey ,
@@ -364,6 +400,16 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
364400 getRestoredIndexPrefix (swapAliasesKey )
365401 );
366402
403+ // note that the replicateForStep and dropReplicasStep will only be used if replicateFor != null, see the construction of
404+ // the list of steps below
405+ Step replicateForStep = new WaitUntilReplicateForTimePassesStep (replicateForKey , dropReplicasKey , replicateFor );
406+ UpdateSettingsStep dropReplicasStep = new UpdateSettingsStep (
407+ dropReplicasKey ,
408+ nextStepKey ,
409+ client ,
410+ Settings .builder ().put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 ).build ()
411+ );
412+
367413 List <Step > steps = new ArrayList <>();
368414 steps .add (conditionalSkipActionStep );
369415 steps .add (checkNoWriteIndexStep );
@@ -382,6 +428,10 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey, XPac
382428 steps .add (waitForGreenIndexHealthStep );
383429 steps .add (copyMetadataStep );
384430 steps .add (copySettingsStep );
431+ if (replicateFor != null ) {
432+ steps .add (replicateForStep );
433+ steps .add (dropReplicasStep );
434+ }
385435 steps .add (isDataStreamBranchingStep );
386436 steps .add (replaceDataStreamBackingIndex );
387437 steps .add (deleteSourceIndexStep );
@@ -426,6 +476,9 @@ public void writeTo(StreamOutput out) throws IOException {
426476 if (out .getTransportVersion ().onOrAfter (TransportVersions .V_8_16_0 )) {
427477 out .writeOptionalInt (totalShardsPerNode );
428478 }
479+ if (out .getTransportVersion ().onOrAfter (ILM_ADD_SEARCHABLE_SNAPSHOT_ADD_REPLICATE_FOR )) {
480+ out .writeOptionalTimeValue (replicateFor );
481+ }
429482 }
430483
431484 @ Override
@@ -436,6 +489,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
436489 if (totalShardsPerNode != null ) {
437490 builder .field (TOTAL_SHARDS_PER_NODE .getPreferredName (), totalShardsPerNode );
438491 }
492+ if (replicateFor != null ) {
493+ builder .field (REPLICATE_FOR .getPreferredName (), replicateFor );
494+ }
439495 builder .endObject ();
440496 return builder ;
441497 }
@@ -451,12 +507,13 @@ public boolean equals(Object o) {
451507 SearchableSnapshotAction that = (SearchableSnapshotAction ) o ;
452508 return Objects .equals (snapshotRepository , that .snapshotRepository )
453509 && Objects .equals (forceMergeIndex , that .forceMergeIndex )
454- && Objects .equals (totalShardsPerNode , that .totalShardsPerNode );
510+ && Objects .equals (totalShardsPerNode , that .totalShardsPerNode )
511+ && Objects .equals (replicateFor , that .replicateFor );
455512 }
456513
457514 @ Override
458515 public int hashCode () {
459- return Objects .hash (snapshotRepository , forceMergeIndex , totalShardsPerNode );
516+ return Objects .hash (snapshotRepository , forceMergeIndex , totalShardsPerNode , replicateFor );
460517 }
461518
462519 @ Nullable
0 commit comments