40
40
import org .elasticsearch .cluster .routing .allocation .decider .ShardsLimitAllocationDecider ;
41
41
import org .elasticsearch .cluster .service .ClusterService ;
42
42
import org .elasticsearch .common .settings .ClusterSettings ;
43
+ import org .elasticsearch .common .settings .Setting ;
43
44
import org .elasticsearch .common .settings .Settings ;
44
45
import org .elasticsearch .common .util .set .Sets ;
45
46
import org .elasticsearch .core .Nullable ;
47
+ import org .elasticsearch .core .TimeValue ;
46
48
import org .elasticsearch .health .Diagnosis ;
47
49
import org .elasticsearch .health .HealthIndicatorDetails ;
48
50
import org .elasticsearch .health .HealthIndicatorImpact ;
56
58
import org .elasticsearch .snapshots .SearchableSnapshotsSettings ;
57
59
import org .elasticsearch .snapshots .SnapshotShardSizeInfo ;
58
60
61
+ import java .time .Instant ;
59
62
import java .util .ArrayList ;
60
63
import java .util .HashMap ;
61
64
import java .util .HashSet ;
@@ -108,11 +111,29 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
108
111
109
112
private static final String DATA_TIER_ALLOCATION_DECIDER_NAME = "data_tier" ;
110
113
114
+ /**
115
+ * Changes the behavior of isNewlyCreatedAndInitializingReplica so that the
116
+ * shard_availability health indicator returns YELLOW if a primary
117
+ * is STARTED, but a replica is still INITIALIZING and the replica has been
118
+ * unassigned for less than the value of this setting. This function is
119
+ * only used in serverless, so this setting has no effect in stateless.
120
+ */
121
+ public static final Setting <TimeValue > REPLICA_UNASSIGNED_BUFFER_TIME = Setting .timeSetting (
122
+ "health.shards_availability.replica_unassigned_buffer_time" ,
123
+ TimeValue .timeValueSeconds (3 ),
124
+ TimeValue .timeValueSeconds (0 ),
125
+ TimeValue .timeValueSeconds (20 ),
126
+ Setting .Property .NodeScope ,
127
+ Setting .Property .Dynamic
128
+ );
129
+
111
130
private final ClusterService clusterService ;
112
131
private final AllocationService allocationService ;
113
132
114
133
private final SystemIndices systemIndices ;
115
134
135
+ private volatile TimeValue replicaUnassignedBufferTime = TimeValue .timeValueSeconds (0 );
136
+
116
137
public ShardsAvailabilityHealthIndicatorService (
117
138
ClusterService clusterService ,
118
139
AllocationService allocationService ,
@@ -121,6 +142,11 @@ public ShardsAvailabilityHealthIndicatorService(
121
142
this .clusterService = clusterService ;
122
143
this .allocationService = allocationService ;
123
144
this .systemIndices = systemIndices ;
145
+ clusterService .getClusterSettings ().addSettingsUpdateConsumer (REPLICA_UNASSIGNED_BUFFER_TIME , this ::setReplicaUnassignedBufferTime );
146
+ }
147
+
148
+ private void setReplicaUnassignedBufferTime (TimeValue replicaUnassignedBufferTime ) {
149
+ this .replicaUnassignedBufferTime = replicaUnassignedBufferTime ;
124
150
}
125
151
126
152
@ Override
@@ -144,7 +170,7 @@ public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResources
144
170
var state = clusterService .state ();
145
171
var shutdown = state .getMetadata ().custom (NodesShutdownMetadata .TYPE , NodesShutdownMetadata .EMPTY );
146
172
var status = createNewStatus (state .getMetadata ());
147
- updateShardAllocationStatus (status , state , shutdown , verbose );
173
+ updateShardAllocationStatus (status , state , shutdown , verbose , replicaUnassignedBufferTime );
148
174
return createIndicator (
149
175
status .getStatus (),
150
176
status .getSymptom (),
@@ -158,14 +184,15 @@ static void updateShardAllocationStatus(
158
184
ShardAllocationStatus status ,
159
185
ClusterState state ,
160
186
NodesShutdownMetadata shutdown ,
161
- boolean verbose
187
+ boolean verbose ,
188
+ TimeValue replicaUnassignedBufferTime
162
189
) {
163
190
for (IndexRoutingTable indexShardRouting : state .routingTable ()) {
164
191
for (int i = 0 ; i < indexShardRouting .size (); i ++) {
165
192
IndexShardRoutingTable shardRouting = indexShardRouting .shard (i );
166
193
status .addPrimary (shardRouting .primaryShard (), state , shutdown , verbose );
167
194
for (ShardRouting replicaShard : shardRouting .replicaShards ()) {
168
- status .addReplica (replicaShard , state , shutdown , verbose );
195
+ status .addReplica (replicaShard , state , shutdown , verbose , replicaUnassignedBufferTime );
169
196
}
170
197
}
171
198
}
@@ -438,11 +465,18 @@ public class ShardAllocationCounts {
438
465
public SearchableSnapshotsState searchableSnapshotsState = new SearchableSnapshotsState ();
439
466
final Map <Diagnosis .Definition , Set <String >> diagnosisDefinitions = new HashMap <>();
440
467
441
- public void increment (ShardRouting routing , ClusterState state , NodesShutdownMetadata shutdowns , boolean verbose ) {
468
+ public void increment (
469
+ ShardRouting routing ,
470
+ ClusterState state ,
471
+ NodesShutdownMetadata shutdowns ,
472
+ boolean verbose ,
473
+ TimeValue replicaUnassignedBufferTime
474
+ ) {
442
475
boolean isNew = isUnassignedDueToNewInitialization (routing , state );
443
476
boolean isRestarting = isUnassignedDueToTimelyRestart (routing , shutdowns );
477
+ long replicaUnassignedCutoffTime = Instant .now ().toEpochMilli () - replicaUnassignedBufferTime .millis ();
444
478
boolean allUnavailable = areAllShardsOfThisTypeUnavailable (routing , state )
445
- && isNewlyCreatedAndInitializingReplica (routing , state ) == false ;
479
+ && isNewlyCreatedAndInitializingReplica (routing , state , replicaUnassignedCutoffTime ) == false ;
446
480
if (allUnavailable ) {
447
481
indicesWithAllShardsUnavailable .add (routing .getIndexName ());
448
482
}
@@ -520,18 +554,23 @@ boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterState sta
520
554
* (a newly created index having unassigned replicas for example), we don't want the cluster
521
555
* to turn "unhealthy" for the tiny amount of time before the shards are allocated.
522
556
*/
523
- static boolean isNewlyCreatedAndInitializingReplica (ShardRouting routing , ClusterState state ) {
557
+ static boolean isNewlyCreatedAndInitializingReplica (ShardRouting routing , ClusterState state , long replicaUnassignedCutoffTime ) {
524
558
if (routing .active ()) {
525
559
return false ;
526
560
}
527
561
if (routing .primary ()) {
528
562
return false ;
529
563
}
530
564
ShardRouting primary = state .routingTable ().shardRoutingTable (routing .shardId ()).primaryShard ();
531
- if (primary .active ()) {
532
- return false ;
565
+ if (primary .active () == false ) {
566
+ return ClusterShardHealth . getInactivePrimaryHealth ( primary ) == ClusterHealthStatus . YELLOW ;
533
567
}
534
- return ClusterShardHealth .getInactivePrimaryHealth (primary ) == ClusterHealthStatus .YELLOW ;
568
+
569
+ Optional <UnassignedInfo > ui = Optional .ofNullable (routing .unassignedInfo ());
570
+ return ui .filter (info -> info .failedAllocations () == 0 )
571
+ .filter (info -> info .lastAllocationStatus () != UnassignedInfo .AllocationStatus .DECIDERS_NO )
572
+ .filter (info -> info .unassignedTimeMillis () > replicaUnassignedCutoffTime )
573
+ .isPresent ();
535
574
}
536
575
537
576
private static boolean isUnassignedDueToTimelyRestart (ShardRouting routing , NodesShutdownMetadata shutdowns ) {
@@ -910,11 +949,17 @@ public ShardAllocationStatus(Metadata clusterMetadata) {
910
949
}
911
950
912
951
void addPrimary (ShardRouting routing , ClusterState state , NodesShutdownMetadata shutdowns , boolean verbose ) {
913
- primaries .increment (routing , state , shutdowns , verbose );
952
+ primaries .increment (routing , state , shutdowns , verbose , TimeValue . MINUS_ONE );
914
953
}
915
954
916
- void addReplica (ShardRouting routing , ClusterState state , NodesShutdownMetadata shutdowns , boolean verbose ) {
917
- replicas .increment (routing , state , shutdowns , verbose );
955
+ void addReplica (
956
+ ShardRouting routing ,
957
+ ClusterState state ,
958
+ NodesShutdownMetadata shutdowns ,
959
+ boolean verbose ,
960
+ TimeValue replicaUnassignedBufferTime
961
+ ) {
962
+ replicas .increment (routing , state , shutdowns , verbose , replicaUnassignedBufferTime );
918
963
}
919
964
920
965
void updateSearchableSnapshotsOfAvailableIndices () {
0 commit comments