Skip to content

Commit e62c717

Browse files
prdoylenielsbaumanelasticsearchmachine
authored andcommitted
Propagate file settings health info to the health node (elastic#127397)
* Initial testHealthIndicator that fails * Refactor: FileSettingsHealthInfo record * Propagate file settings health indicator to health node * ensureStableCluster * Try to induce a failure from returning node-local info * Remove redundant node from client() call * Use local node ID in UpdateHealthInfoCacheAction.Request * Move logger to top * Test node-local health on master and health nodes * Fix calculate to use the given info * mutateFileSettingsHealthInfo * Test status from local current info * FileSettingsHealthTracker * Spruce up HealthInfoTests * spotless * randomNonNegativeLong * Rename variable Co-authored-by: Niels Bauman <[email protected]> * Address Niels' comments * Test one- and two-node clusters * [CI] Auto commit changes from spotless * Ensure there's a master node Co-authored-by: Niels Bauman <[email protected]> * setBootstrapMasterNodeIndex --------- Co-authored-by: Niels Bauman <[email protected]> Co-authored-by: elasticsearchmachine <[email protected]>
1 parent e28210f commit e62c717

File tree

20 files changed

+563
-187
lines changed

20 files changed

+563
-187
lines changed

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthIndicatorServiceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.health.node.DslErrorInfo;
2222
import org.elasticsearch.health.node.HealthInfo;
2323
import org.elasticsearch.health.node.ProjectIndexName;
24+
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
2425
import org.elasticsearch.test.ESTestCase;
2526
import org.junit.Before;
2627

@@ -167,6 +168,6 @@ public void testMultiProject() {
167168
}
168169

169170
private HealthInfo constructHealthInfo(DataStreamLifecycleHealthInfo dslHealthInfo) {
170-
return new HealthInfo(Map.of(), dslHealthInfo, Map.of());
171+
return new HealthInfo(Map.of(), dslHealthInfo, Map.of(), FileSettingsHealthInfo.INDETERMINATE);
171172
}
172173
}

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorServiceIT.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
2424
import org.elasticsearch.health.node.HealthInfo;
2525
import org.elasticsearch.indices.SystemIndices;
26+
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
2627
import org.elasticsearch.test.ESIntegTestCase;
2728
import org.hamcrest.Matcher;
2829

@@ -144,7 +145,16 @@ public void clusterChanged(ClusterChangedEvent event) {
144145
states.add(
145146
new RoutingNodesAndHealth(
146147
event.state().getRoutingNodes(),
147-
service.calculate(false, 1, new HealthInfo(Map.of(), DataStreamLifecycleHealthInfo.NO_DSL_ERRORS, Map.of()))
148+
service.calculate(
149+
false,
150+
1,
151+
new HealthInfo(
152+
Map.of(),
153+
DataStreamLifecycleHealthInfo.NO_DSL_ERRORS,
154+
Map.of(),
155+
FileSettingsHealthInfo.INDETERMINATE
156+
)
157+
)
148158
)
149159
);
150160
}

server/src/internalClusterTest/java/org/elasticsearch/reservedstate/service/FileSettingsServiceIT.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.core.Strings;
2727
import org.elasticsearch.core.Tuple;
28+
import org.elasticsearch.health.GetHealthAction;
29+
import org.elasticsearch.health.node.FetchHealthInfoCacheAction;
2830
import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
2931
import org.elasticsearch.test.ESIntegTestCase;
3032
import org.junit.Before;
@@ -37,7 +39,9 @@
3739
import java.util.concurrent.ExecutionException;
3840
import java.util.concurrent.TimeUnit;
3941
import java.util.concurrent.atomic.AtomicLong;
42+
import java.util.stream.Stream;
4043

44+
import static org.elasticsearch.health.HealthStatus.YELLOW;
4145
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
4246
import static org.elasticsearch.node.Node.INITIAL_STATE_TIMEOUT_SETTING;
4347
import static org.elasticsearch.test.NodeRoles.dataOnlyNode;
@@ -532,6 +536,82 @@ public void testSymlinkUpdateTriggerReload() throws Exception {
532536
}
533537
}
534538

539+
public void testHealthIndicatorWithSingleNode() throws Exception {
540+
internalCluster().setBootstrapMasterNodeIndex(0);
541+
logger.info("--> start the node");
542+
String nodeName = internalCluster().startNode();
543+
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, nodeName);
544+
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
545+
546+
ensureStableCluster(1);
547+
548+
testHealthIndicatorOnError(nodeName, nodeName);
549+
}
550+
551+
public void testHealthIndicatorWithSeparateHealthNode() throws Exception {
552+
internalCluster().setBootstrapMasterNodeIndex(0);
553+
logger.info("--> start a data node to act as the health node");
554+
String healthNode = internalCluster().startNode(
555+
Settings.builder().put(dataOnlyNode()).put("discovery.initial_state_timeout", "1s")
556+
);
557+
558+
logger.info("--> start master node");
559+
final String masterNode = internalCluster().startMasterOnlyNode(
560+
Settings.builder().put(INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s").build()
561+
);
562+
FileSettingsService masterFileSettingsService = internalCluster().getInstance(FileSettingsService.class, masterNode);
563+
assertBusy(() -> assertTrue(masterFileSettingsService.watching()));
564+
565+
ensureStableCluster(2);
566+
567+
testHealthIndicatorOnError(masterNode, healthNode);
568+
}
569+
570+
/**
571+
* {@code masterNode} and {@code healthNode} can be the same node.
572+
*/
573+
private void testHealthIndicatorOnError(String masterNode, String healthNode) throws Exception {
574+
logger.info("--> ensure all is well before the error");
575+
assertBusy(() -> {
576+
FetchHealthInfoCacheAction.Response healthNodeResponse = client().execute(
577+
FetchHealthInfoCacheAction.INSTANCE,
578+
new FetchHealthInfoCacheAction.Request()
579+
).get();
580+
assertEquals(0, healthNodeResponse.getHealthInfo().fileSettingsHealthInfo().failureStreak());
581+
});
582+
583+
logger.info("--> induce an error and wait for it to be processed");
584+
var savedClusterState = setupClusterStateListenerForError(masterNode);
585+
writeJSONFile(masterNode, testErrorJSON, logger, versionCounter.incrementAndGet());
586+
boolean awaitSuccessful = savedClusterState.v1().await(20, TimeUnit.SECONDS);
587+
assertTrue(awaitSuccessful);
588+
589+
logger.info("--> ensure the health node also reports it");
590+
assertBusy(() -> {
591+
FetchHealthInfoCacheAction.Response healthNodeResponse = client().execute(
592+
FetchHealthInfoCacheAction.INSTANCE,
593+
new FetchHealthInfoCacheAction.Request()
594+
).get();
595+
assertEquals(
596+
"Cached info on health node should report one failure",
597+
1,
598+
healthNodeResponse.getHealthInfo().fileSettingsHealthInfo().failureStreak()
599+
);
600+
601+
for (var node : Stream.of(masterNode, healthNode).distinct().toList()) {
602+
GetHealthAction.Response getHealthResponse = client(node).execute(
603+
GetHealthAction.INSTANCE,
604+
new GetHealthAction.Request(false, 123)
605+
).get();
606+
assertEquals(
607+
"Health should be yellow on node " + node,
608+
YELLOW,
609+
getHealthResponse.findIndicator(FileSettingsService.FileSettingsHealthIndicatorService.NAME).status()
610+
);
611+
}
612+
});
613+
}
614+
535615
private void assertHasErrors(AtomicLong waitForMetadataVersion, String expectedError) {
536616
var errorMetadata = getErrorMetadata(waitForMetadataVersion);
537617
assertThat(errorMetadata, is(notNullValue()));

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ static TransportVersion def(int id) {
241241
public static final TransportVersion ML_INFERENCE_SAGEMAKER = def(9_069_0_00);
242242
public static final TransportVersion WRITE_LOAD_INCLUDES_BUFFER_WRITES = def(9_070_00_0);
243243
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION = def(9_071_0_00);
244+
public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_072_0_00);
244245

245246
/*
246247
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/health/node/HealthInfo.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,46 @@
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.io.stream.Writeable;
1616
import org.elasticsearch.core.Nullable;
17+
import org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo;
1718

1819
import java.io.IOException;
1920
import java.util.Map;
2021

22+
import static java.util.Objects.requireNonNull;
2123
import static org.elasticsearch.health.node.DataStreamLifecycleHealthInfo.NO_DSL_ERRORS;
24+
import static org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo.INDETERMINATE;
2225

2326
/**
2427
* This class wraps all the data returned by the health node.
25-
* @param diskInfoByNode A Map of node id to DiskHealthInfo for that node
26-
* @param dslHealthInfo The data stream lifecycle health information
28+
*
29+
* @param diskInfoByNode A Map of node id to DiskHealthInfo for that node
30+
* @param dslHealthInfo The data stream lifecycle health information
2731
* @param repositoriesInfoByNode A Map of node id to RepositoriesHealthInfo for that node
32+
* @param fileSettingsHealthInfo The file-based settings health information
2833
*/
2934
public record HealthInfo(
3035
Map<String, DiskHealthInfo> diskInfoByNode,
3136
@Nullable DataStreamLifecycleHealthInfo dslHealthInfo,
32-
Map<String, RepositoriesHealthInfo> repositoriesInfoByNode
37+
Map<String, RepositoriesHealthInfo> repositoriesInfoByNode,
38+
FileSettingsHealthInfo fileSettingsHealthInfo
3339
) implements Writeable {
3440

35-
public static final HealthInfo EMPTY_HEALTH_INFO = new HealthInfo(Map.of(), NO_DSL_ERRORS, Map.of());
41+
public static final HealthInfo EMPTY_HEALTH_INFO = new HealthInfo(Map.of(), NO_DSL_ERRORS, Map.of(), INDETERMINATE);
42+
43+
public HealthInfo {
44+
requireNonNull(fileSettingsHealthInfo);
45+
}
3646

3747
public HealthInfo(StreamInput input) throws IOException {
3848
this(
3949
input.readMap(DiskHealthInfo::new),
4050
input.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)
4151
? input.readOptionalWriteable(DataStreamLifecycleHealthInfo::new)
4252
: null,
43-
input.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? input.readMap(RepositoriesHealthInfo::new) : Map.of()
53+
input.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? input.readMap(RepositoriesHealthInfo::new) : Map.of(),
54+
input.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)
55+
? input.readOptionalWriteable(FileSettingsHealthInfo::new)
56+
: INDETERMINATE
4457
);
4558
}
4659

@@ -53,5 +66,8 @@ public void writeTo(StreamOutput output) throws IOException {
5366
if (output.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
5467
output.writeMap(repositoriesInfoByNode, StreamOutput::writeWriteable);
5568
}
69+
if (output.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)) {
70+
output.writeOptionalWriteable(fileSettingsHealthInfo);
71+
}
5672
}
5773
}

server/src/main/java/org/elasticsearch/health/node/HealthInfoCache.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
import org.elasticsearch.cluster.service.ClusterService;
1818
import org.elasticsearch.core.Nullable;
1919
import org.elasticsearch.health.node.selection.HealthNode;
20+
import org.elasticsearch.reservedstate.service.FileSettingsService;
2021

2122
import java.util.Map;
2223
import java.util.concurrent.ConcurrentHashMap;
2324

25+
import static org.elasticsearch.reservedstate.service.FileSettingsService.FileSettingsHealthInfo.INDETERMINATE;
26+
2427
/**
2528
* Keeps track of several health statuses per node that can be used in health.
2629
*/
@@ -31,6 +34,7 @@ public class HealthInfoCache implements ClusterStateListener {
3134
@Nullable
3235
private volatile DataStreamLifecycleHealthInfo dslHealthInfo = null;
3336
private volatile ConcurrentHashMap<String, RepositoriesHealthInfo> repositoriesInfoByNode = new ConcurrentHashMap<>();
37+
private volatile FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo = INDETERMINATE;
3438

3539
private HealthInfoCache() {}
3640

@@ -44,7 +48,8 @@ public void updateNodeHealth(
4448
String nodeId,
4549
@Nullable DiskHealthInfo diskHealthInfo,
4650
@Nullable DataStreamLifecycleHealthInfo latestDslHealthInfo,
47-
@Nullable RepositoriesHealthInfo repositoriesHealthInfo
51+
@Nullable RepositoriesHealthInfo repositoriesHealthInfo,
52+
@Nullable FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo
4853
) {
4954
if (diskHealthInfo != null) {
5055
diskInfoByNode.put(nodeId, diskHealthInfo);
@@ -55,6 +60,9 @@ public void updateNodeHealth(
5560
if (repositoriesHealthInfo != null) {
5661
repositoriesInfoByNode.put(nodeId, repositoriesHealthInfo);
5762
}
63+
if (fileSettingsHealthInfo != null) {
64+
this.fileSettingsHealthInfo = fileSettingsHealthInfo;
65+
}
5866
}
5967

6068
@Override
@@ -77,6 +85,7 @@ public void clusterChanged(ClusterChangedEvent event) {
7785
diskInfoByNode = new ConcurrentHashMap<>();
7886
dslHealthInfo = null;
7987
repositoriesInfoByNode = new ConcurrentHashMap<>();
88+
fileSettingsHealthInfo = INDETERMINATE;
8089
}
8190
}
8291

@@ -86,6 +95,6 @@ public void clusterChanged(ClusterChangedEvent event) {
8695
*/
8796
public HealthInfo getHealthInfo() {
8897
// A shallow copy is enough because the inner data is immutable.
89-
return new HealthInfo(Map.copyOf(diskInfoByNode), dslHealthInfo, Map.copyOf(repositoriesInfoByNode));
98+
return new HealthInfo(Map.copyOf(diskInfoByNode), dslHealthInfo, Map.copyOf(repositoriesInfoByNode), fileSettingsHealthInfo);
9099
}
91100
}

server/src/main/java/org/elasticsearch/health/node/UpdateHealthInfoCacheAction.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import org.elasticsearch.health.node.action.HealthNodeRequest;
2424
import org.elasticsearch.health.node.action.TransportHealthNodeAction;
2525
import org.elasticsearch.injection.guice.Inject;
26+
import org.elasticsearch.logging.LogManager;
27+
import org.elasticsearch.logging.Logger;
28+
import org.elasticsearch.reservedstate.service.FileSettingsService;
2629
import org.elasticsearch.tasks.Task;
2730
import org.elasticsearch.threadpool.ThreadPool;
2831
import org.elasticsearch.transport.TransportService;
@@ -37,6 +40,7 @@
3740
* regarding this node.
3841
*/
3942
public class UpdateHealthInfoCacheAction extends ActionType<AcknowledgedResponse> {
43+
private static final Logger logger = LogManager.getLogger(UpdateHealthInfoCacheAction.class);
4044

4145
public static class Request extends HealthNodeRequest {
4246
private final String nodeId;
@@ -46,24 +50,37 @@ public static class Request extends HealthNodeRequest {
4650
private final DataStreamLifecycleHealthInfo dslHealthInfo;
4751
@Nullable
4852
private final RepositoriesHealthInfo repositoriesHealthInfo;
53+
@Nullable
54+
private final FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo;
4955

5056
public Request(
5157
String nodeId,
5258
DiskHealthInfo diskHealthInfo,
5359
DataStreamLifecycleHealthInfo dslHealthInfo,
54-
RepositoriesHealthInfo repositoriesHealthInfo
60+
RepositoriesHealthInfo repositoriesHealthInfo,
61+
@Nullable FileSettingsService.FileSettingsHealthInfo fileSettingsHealthInfo
5562
) {
5663
this.nodeId = nodeId;
5764
this.diskHealthInfo = diskHealthInfo;
5865
this.dslHealthInfo = dslHealthInfo;
5966
this.repositoriesHealthInfo = repositoriesHealthInfo;
67+
this.fileSettingsHealthInfo = fileSettingsHealthInfo;
6068
}
6169

6270
public Request(String nodeId, DataStreamLifecycleHealthInfo dslHealthInfo) {
6371
this.nodeId = nodeId;
6472
this.diskHealthInfo = null;
6573
this.repositoriesHealthInfo = null;
6674
this.dslHealthInfo = dslHealthInfo;
75+
this.fileSettingsHealthInfo = null;
76+
}
77+
78+
public Request(String nodeId, FileSettingsService.FileSettingsHealthInfo info) {
79+
this.nodeId = nodeId;
80+
this.diskHealthInfo = null;
81+
this.repositoriesHealthInfo = null;
82+
this.dslHealthInfo = null;
83+
this.fileSettingsHealthInfo = info;
6784
}
6885

6986
public Request(StreamInput in) throws IOException {
@@ -75,6 +92,9 @@ public Request(StreamInput in) throws IOException {
7592
this.repositoriesHealthInfo = in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)
7693
? in.readOptionalWriteable(RepositoriesHealthInfo::new)
7794
: null;
95+
this.fileSettingsHealthInfo = in.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)
96+
? in.readOptionalWriteable(FileSettingsService.FileSettingsHealthInfo::new)
97+
: null;
7898
} else {
7999
// BWC for pre-8.12 the disk health info was mandatory. Evolving this request has proven tricky however we've made use of
80100
// waiting for all nodes to be on the {@link TransportVersions.HEALTH_INFO_ENRICHED_WITH_DSL_STATUS} transport version
@@ -83,6 +103,7 @@ public Request(StreamInput in) throws IOException {
83103
this.diskHealthInfo = new DiskHealthInfo(in);
84104
this.dslHealthInfo = null;
85105
this.repositoriesHealthInfo = null;
106+
this.fileSettingsHealthInfo = null;
86107
}
87108
}
88109

@@ -102,6 +123,11 @@ public RepositoriesHealthInfo getRepositoriesHealthInfo() {
102123
return repositoriesHealthInfo;
103124
}
104125

126+
@Nullable
127+
public FileSettingsService.FileSettingsHealthInfo getFileSettingsHealthInfo() {
128+
return fileSettingsHealthInfo;
129+
}
130+
105131
@Override
106132
public ActionRequestValidationException validate() {
107133
return null;
@@ -117,6 +143,9 @@ public void writeTo(StreamOutput out) throws IOException {
117143
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
118144
out.writeOptionalWriteable(repositoriesHealthInfo);
119145
}
146+
if (out.getTransportVersion().onOrAfter(TransportVersions.FILE_SETTINGS_HEALTH_INFO)) {
147+
out.writeOptionalWriteable(fileSettingsHealthInfo);
148+
}
120149
} else {
121150
// BWC for pre-8.12 the disk health info was mandatory. Evolving this request has proven tricky however we've made use of
122151
// waiting for all nodes to be on the {@link TransportVersions.V_8_12_0} transport version
@@ -185,7 +214,7 @@ public Builder dslHealthInfo(DataStreamLifecycleHealthInfo dslHealthInfo) {
185214
}
186215

187216
public Request build() {
188-
return new Request(nodeId, diskHealthInfo, dslHealthInfo, repositoriesHealthInfo);
217+
return new Request(nodeId, diskHealthInfo, dslHealthInfo, repositoriesHealthInfo, null);
189218
}
190219
}
191220
}
@@ -228,13 +257,21 @@ protected void healthOperation(
228257
ClusterState clusterState,
229258
ActionListener<AcknowledgedResponse> listener
230259
) {
260+
logger.debug(
261+
"Updating health info cache on node [{}][{}] from node [{}]",
262+
clusterService.getNodeName(),
263+
clusterService.localNode().getId(),
264+
request.getNodeId()
265+
);
231266
nodeHealthOverview.updateNodeHealth(
232267
request.getNodeId(),
233268
request.getDiskHealthInfo(),
234269
request.getDslHealthInfo(),
235-
request.getRepositoriesHealthInfo()
270+
request.getRepositoriesHealthInfo(),
271+
request.getFileSettingsHealthInfo()
236272
);
237273
listener.onResponse(AcknowledgedResponse.of(true));
238274
}
239275
}
276+
240277
}

0 commit comments

Comments
 (0)