Skip to content

Commit b776cf6

Browse files
Do not treat replica as unassigned if new and below time threshold. (#112066)
Changes the way we calculate if all replicas are unassigned when primary is recently created. This change will only be used in serverless, not in stateful. When a primary is new, if the primary is active, but the replica is unassigned for less than a buffer time period, do not treat is as unassigned. Control time period through health.shards_availability.replica_unassigned_buffer_time setting.
1 parent 7b44430 commit b776cf6

File tree

5 files changed

+271
-80
lines changed

5 files changed

+271
-80
lines changed

docs/changelog/112066.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 112066
2+
summary: Do not treat replica as unassigned if primary recently created and unassigned
3+
time is below a threshold
4+
area: Health
5+
type: enhancement
6+
issues: []

server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
4141
import org.elasticsearch.cluster.service.ClusterService;
4242
import org.elasticsearch.common.settings.ClusterSettings;
43+
import org.elasticsearch.common.settings.Setting;
4344
import org.elasticsearch.common.settings.Settings;
4445
import org.elasticsearch.common.util.set.Sets;
4546
import org.elasticsearch.core.Nullable;
47+
import org.elasticsearch.core.TimeValue;
4648
import org.elasticsearch.health.Diagnosis;
4749
import org.elasticsearch.health.HealthIndicatorDetails;
4850
import org.elasticsearch.health.HealthIndicatorImpact;
@@ -56,6 +58,7 @@
5658
import org.elasticsearch.snapshots.SearchableSnapshotsSettings;
5759
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
5860

61+
import java.time.Instant;
5962
import java.util.ArrayList;
6063
import java.util.HashMap;
6164
import java.util.HashSet;
@@ -108,11 +111,29 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
108111

109112
private static final String DATA_TIER_ALLOCATION_DECIDER_NAME = "data_tier";
110113

114+
/**
115+
* Changes the behavior of isNewlyCreatedAndInitializingReplica so that the
116+
* shard_availability health indicator returns YELLOW if a primary
117+
* is STARTED, but a replica is still INITIALIZING and the replica has been
118+
* unassigned for less than the value of this setting. This function is
119+
* only used in serverless, so this setting has no effect in stateless.
120+
*/
121+
public static final Setting<TimeValue> REPLICA_UNASSIGNED_BUFFER_TIME = Setting.timeSetting(
122+
"health.shards_availability.replica_unassigned_buffer_time",
123+
TimeValue.timeValueSeconds(3),
124+
TimeValue.timeValueSeconds(0),
125+
TimeValue.timeValueSeconds(20),
126+
Setting.Property.NodeScope,
127+
Setting.Property.Dynamic
128+
);
129+
111130
private final ClusterService clusterService;
112131
private final AllocationService allocationService;
113132

114133
private final SystemIndices systemIndices;
115134

135+
private volatile TimeValue replicaUnassignedBufferTime = TimeValue.timeValueSeconds(0);
136+
116137
public ShardsAvailabilityHealthIndicatorService(
117138
ClusterService clusterService,
118139
AllocationService allocationService,
@@ -121,6 +142,11 @@ public ShardsAvailabilityHealthIndicatorService(
121142
this.clusterService = clusterService;
122143
this.allocationService = allocationService;
123144
this.systemIndices = systemIndices;
145+
clusterService.getClusterSettings().addSettingsUpdateConsumer(REPLICA_UNASSIGNED_BUFFER_TIME, this::setReplicaUnassignedBufferTime);
146+
}
147+
148+
private void setReplicaUnassignedBufferTime(TimeValue replicaUnassignedBufferTime) {
149+
this.replicaUnassignedBufferTime = replicaUnassignedBufferTime;
124150
}
125151

126152
@Override
@@ -144,7 +170,7 @@ public HealthIndicatorResult calculate(boolean verbose, int maxAffectedResources
144170
var state = clusterService.state();
145171
var shutdown = state.getMetadata().custom(NodesShutdownMetadata.TYPE, NodesShutdownMetadata.EMPTY);
146172
var status = createNewStatus(state.getMetadata());
147-
updateShardAllocationStatus(status, state, shutdown, verbose);
173+
updateShardAllocationStatus(status, state, shutdown, verbose, replicaUnassignedBufferTime);
148174
return createIndicator(
149175
status.getStatus(),
150176
status.getSymptom(),
@@ -158,14 +184,15 @@ static void updateShardAllocationStatus(
158184
ShardAllocationStatus status,
159185
ClusterState state,
160186
NodesShutdownMetadata shutdown,
161-
boolean verbose
187+
boolean verbose,
188+
TimeValue replicaUnassignedBufferTime
162189
) {
163190
for (IndexRoutingTable indexShardRouting : state.routingTable()) {
164191
for (int i = 0; i < indexShardRouting.size(); i++) {
165192
IndexShardRoutingTable shardRouting = indexShardRouting.shard(i);
166193
status.addPrimary(shardRouting.primaryShard(), state, shutdown, verbose);
167194
for (ShardRouting replicaShard : shardRouting.replicaShards()) {
168-
status.addReplica(replicaShard, state, shutdown, verbose);
195+
status.addReplica(replicaShard, state, shutdown, verbose, replicaUnassignedBufferTime);
169196
}
170197
}
171198
}
@@ -438,11 +465,18 @@ public class ShardAllocationCounts {
438465
public SearchableSnapshotsState searchableSnapshotsState = new SearchableSnapshotsState();
439466
final Map<Diagnosis.Definition, Set<String>> diagnosisDefinitions = new HashMap<>();
440467

441-
public void increment(ShardRouting routing, ClusterState state, NodesShutdownMetadata shutdowns, boolean verbose) {
468+
public void increment(
469+
ShardRouting routing,
470+
ClusterState state,
471+
NodesShutdownMetadata shutdowns,
472+
boolean verbose,
473+
TimeValue replicaUnassignedBufferTime
474+
) {
442475
boolean isNew = isUnassignedDueToNewInitialization(routing, state);
443476
boolean isRestarting = isUnassignedDueToTimelyRestart(routing, shutdowns);
477+
long replicaUnassignedCutoffTime = Instant.now().toEpochMilli() - replicaUnassignedBufferTime.millis();
444478
boolean allUnavailable = areAllShardsOfThisTypeUnavailable(routing, state)
445-
&& isNewlyCreatedAndInitializingReplica(routing, state) == false;
479+
&& isNewlyCreatedAndInitializingReplica(routing, state, replicaUnassignedCutoffTime) == false;
446480
if (allUnavailable) {
447481
indicesWithAllShardsUnavailable.add(routing.getIndexName());
448482
}
@@ -520,18 +554,23 @@ boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterState sta
520554
* (a newly created index having unassigned replicas for example), we don't want the cluster
521555
* to turn "unhealthy" for the tiny amount of time before the shards are allocated.
522556
*/
523-
static boolean isNewlyCreatedAndInitializingReplica(ShardRouting routing, ClusterState state) {
557+
static boolean isNewlyCreatedAndInitializingReplica(ShardRouting routing, ClusterState state, long replicaUnassignedCutoffTime) {
524558
if (routing.active()) {
525559
return false;
526560
}
527561
if (routing.primary()) {
528562
return false;
529563
}
530564
ShardRouting primary = state.routingTable().shardRoutingTable(routing.shardId()).primaryShard();
531-
if (primary.active()) {
532-
return false;
565+
if (primary.active() == false) {
566+
return ClusterShardHealth.getInactivePrimaryHealth(primary) == ClusterHealthStatus.YELLOW;
533567
}
534-
return ClusterShardHealth.getInactivePrimaryHealth(primary) == ClusterHealthStatus.YELLOW;
568+
569+
Optional<UnassignedInfo> ui = Optional.ofNullable(routing.unassignedInfo());
570+
return ui.filter(info -> info.failedAllocations() == 0)
571+
.filter(info -> info.lastAllocationStatus() != UnassignedInfo.AllocationStatus.DECIDERS_NO)
572+
.filter(info -> info.unassignedTimeMillis() > replicaUnassignedCutoffTime)
573+
.isPresent();
535574
}
536575

537576
private static boolean isUnassignedDueToTimelyRestart(ShardRouting routing, NodesShutdownMetadata shutdowns) {
@@ -910,11 +949,17 @@ public ShardAllocationStatus(Metadata clusterMetadata) {
910949
}
911950

912951
void addPrimary(ShardRouting routing, ClusterState state, NodesShutdownMetadata shutdowns, boolean verbose) {
913-
primaries.increment(routing, state, shutdowns, verbose);
952+
primaries.increment(routing, state, shutdowns, verbose, TimeValue.MINUS_ONE);
914953
}
915954

916-
void addReplica(ShardRouting routing, ClusterState state, NodesShutdownMetadata shutdowns, boolean verbose) {
917-
replicas.increment(routing, state, shutdowns, verbose);
955+
void addReplica(
956+
ShardRouting routing,
957+
ClusterState state,
958+
NodesShutdownMetadata shutdowns,
959+
boolean verbose,
960+
TimeValue replicaUnassignedBufferTime
961+
) {
962+
replicas.increment(routing, state, shutdowns, verbose, replicaUnassignedBufferTime);
918963
}
919964

920965
void updateSearchableSnapshotsOfAvailableIndices() {

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
5656
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
5757
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
58+
import org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService;
5859
import org.elasticsearch.cluster.service.ClusterApplierService;
5960
import org.elasticsearch.cluster.service.ClusterService;
6061
import org.elasticsearch.cluster.service.MasterService;
@@ -598,6 +599,7 @@ public void apply(Settings value, Settings current, Settings previous) {
598599
MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING,
599600
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
600601
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
601-
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING
602+
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
603+
ShardsAvailabilityHealthIndicatorService.REPLICA_UNASSIGNED_BUFFER_TIME
602604
);
603605
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsAvailabilityActionGuideTests.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService;
1212
import org.elasticsearch.cluster.service.ClusterService;
13+
import org.elasticsearch.common.settings.ClusterSettings;
1314
import org.elasticsearch.indices.SystemIndices;
1415
import org.elasticsearch.test.ESTestCase;
1516

@@ -33,14 +34,17 @@
3334
import static org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService.TIER_CAPACITY_ACTION_GUIDE;
3435
import static org.hamcrest.Matchers.is;
3536
import static org.mockito.Mockito.mock;
37+
import static org.mockito.Mockito.when;
3638

3739
public class ShardsAvailabilityActionGuideTests extends ESTestCase {
3840

39-
private final ShardsAvailabilityHealthIndicatorService service = new ShardsAvailabilityHealthIndicatorService(
40-
mock(ClusterService.class),
41-
mock(AllocationService.class),
42-
mock(SystemIndices.class)
43-
);
41+
private final ShardsAvailabilityHealthIndicatorService service;
42+
43+
public ShardsAvailabilityActionGuideTests() {
44+
ClusterService clusterService = mock(ClusterService.class);
45+
when(clusterService.getClusterSettings()).thenReturn(ClusterSettings.createBuiltInClusterSettings());
46+
service = new ShardsAvailabilityHealthIndicatorService(clusterService, mock(AllocationService.class), mock(SystemIndices.class));
47+
}
4448

4549
public void testRestoreFromSnapshotAction() {
4650
assertThat(ACTION_RESTORE_FROM_SNAPSHOT.helpURL(), is(RESTORE_FROM_SNAPSHOT_ACTION_GUIDE));

0 commit comments

Comments
 (0)