Skip to content

Commit da81686

Browse files
authored
Add Health API Periodic Logging (#96772)
Logs the health status of the cluster and of each health indicator as observed by the Health API
1 parent 34268e1 commit da81686

File tree

6 files changed

+716
-0
lines changed

6 files changed

+716
-0
lines changed

docs/changelog/96772.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 96772
2+
summary: Health API Periodic Logging
3+
area: Health
4+
type: enhancement
5+
issues: []

docs/reference/settings/health-diagnostic-settings.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,6 @@ comprise its local health such as its disk usage.
4545
`health.ilm.max_retries_per_step`::
4646
(<<cluster-update-settings,Dynamic>>) The minimum amount of times an index has retried by an {ilm-init} step before it is considered stagnant. Defaults to `100`
4747

48+
`health.periodic_logger.poll_interval`::
49+
(<<cluster-update-settings,Dynamic>>, <<time-units, time unit value>>) How often {es} logs the health status of the cluster and of each health indicator as observed by the Health API.
50+
Defaults to `60s` (60 seconds).

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.elasticsearch.env.NodeEnvironment;
7373
import org.elasticsearch.gateway.GatewayService;
7474
import org.elasticsearch.gateway.PersistedClusterStateService;
75+
import org.elasticsearch.health.HealthPeriodicLogger;
7576
import org.elasticsearch.health.node.LocalHealthMonitor;
7677
import org.elasticsearch.health.node.action.TransportHealthNodeAction;
7778
import org.elasticsearch.health.node.selection.HealthNodeTaskExecutor;
@@ -567,6 +568,7 @@ public void apply(Settings value, Settings current, Settings previous) {
567568
TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterPortSettings.TCP_NO_DELAY : null,
568569
TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterPortSettings.TCP_REUSE_ADDRESS : null,
569570
TcpTransport.isUntrustedRemoteClusterEnabled() ? RemoteClusterPortSettings.TCP_SEND_BUFFER_SIZE : null,
571+
HealthPeriodicLogger.HEALTH_PERIODIC_LOGGER_POLL_INTERVAL_SETTING,
570572
DataStreamLifecycle.isEnabled() ? DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING : null,
571573
IndicesClusterStateService.SHARD_LOCK_RETRY_INTERVAL_SETTING,
572574
IndicesClusterStateService.SHARD_LOCK_RETRY_TIMEOUT_SETTING,
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.health;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.lucene.util.SetOnce;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.client.internal.Client;
16+
import org.elasticsearch.cluster.ClusterChangedEvent;
17+
import org.elasticsearch.cluster.ClusterStateListener;
18+
import org.elasticsearch.cluster.node.DiscoveryNode;
19+
import org.elasticsearch.cluster.service.ClusterService;
20+
import org.elasticsearch.common.component.Lifecycle;
21+
import org.elasticsearch.common.logging.ESLogMessage;
22+
import org.elasticsearch.common.scheduler.SchedulerEngine;
23+
import org.elasticsearch.common.scheduler.TimeValueSchedule;
24+
import org.elasticsearch.common.settings.Setting;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.util.concurrent.RunOnce;
27+
import org.elasticsearch.core.TimeValue;
28+
import org.elasticsearch.gateway.GatewayService;
29+
import org.elasticsearch.health.node.selection.HealthNode;
30+
31+
import java.io.Closeable;
32+
import java.time.Clock;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Locale;
36+
import java.util.Map;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
39+
/**
40+
* This class periodically logs the results of the Health API to the standard Elasticsearch server log file.
41+
*/
42+
public class HealthPeriodicLogger implements ClusterStateListener, Closeable, SchedulerEngine.Listener {
43+
public static final String HEALTH_FIELD_PREFIX = "elasticsearch.health";
44+
45+
public static final String HEALTH_PERIODIC_LOGGER_POLL_INTERVAL = "health.periodic_logger.poll_interval";
46+
public static final Setting<TimeValue> HEALTH_PERIODIC_LOGGER_POLL_INTERVAL_SETTING = Setting.timeSetting(
47+
HEALTH_PERIODIC_LOGGER_POLL_INTERVAL,
48+
TimeValue.timeValueSeconds(60),
49+
TimeValue.timeValueSeconds(15),
50+
Setting.Property.Dynamic,
51+
Setting.Property.NodeScope
52+
);
53+
54+
/**
55+
* Name constant for the job HealthService schedules
56+
*/
57+
protected static final String HEALTH_PERIODIC_LOGGER_JOB_NAME = "health_periodic_logger";
58+
59+
private final Settings settings;
60+
61+
private final ClusterService clusterService;
62+
private final Client client;
63+
64+
private final HealthService healthService;
65+
private final Clock clock;
66+
67+
// default visibility for testing purposes
68+
volatile boolean isHealthNode = false;
69+
70+
private final AtomicBoolean currentlyRunning = new AtomicBoolean(false);
71+
72+
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
73+
private volatile TimeValue pollInterval;
74+
75+
private static final Logger logger = LogManager.getLogger(HealthPeriodicLogger.class);
76+
77+
/**
78+
* Creates a new HealthPeriodicLogger.
79+
* This creates a scheduled job using the SchedulerEngine framework and runs it on the current health node.
80+
*
81+
* @param settings the cluster settings, used to get the interval setting.
82+
* @param clusterService the cluster service, used to know when the health node changes.
83+
* @param client the client used to call the Health Service.
84+
* @param healthService the Health Service, where the actual Health API logic lives.
85+
*/
86+
public HealthPeriodicLogger(Settings settings, ClusterService clusterService, Client client, HealthService healthService) {
87+
this.settings = settings;
88+
this.clusterService = clusterService;
89+
this.client = client;
90+
this.healthService = healthService;
91+
this.clock = Clock.systemUTC();
92+
this.pollInterval = HEALTH_PERIODIC_LOGGER_POLL_INTERVAL_SETTING.get(settings);
93+
}
94+
95+
/**
96+
* Initializer method to avoid the publication of a self reference in the constructor.
97+
*/
98+
public void init() {
99+
clusterService.addListener(this);
100+
clusterService.getClusterSettings()
101+
.addSettingsUpdateConsumer(HEALTH_PERIODIC_LOGGER_POLL_INTERVAL_SETTING, this::updatePollInterval);
102+
}
103+
104+
@Override
105+
public void clusterChanged(ClusterChangedEvent event) {
106+
// wait for the cluster state to be recovered
107+
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
108+
return;
109+
}
110+
111+
DiscoveryNode healthNode = HealthNode.findHealthNode(event.state());
112+
if (healthNode == null) {
113+
this.isHealthNode = false;
114+
this.maybeCancelJob();
115+
return;
116+
}
117+
final boolean isCurrentlyHealthNode = healthNode.getId().equals(this.clusterService.localNode().getId());
118+
if (this.isHealthNode != isCurrentlyHealthNode) {
119+
this.isHealthNode = isCurrentlyHealthNode;
120+
if (this.isHealthNode) {
121+
// we weren't the health node, and now we are
122+
maybeScheduleJob();
123+
} else {
124+
// we were the health node, and now we aren't
125+
maybeCancelJob();
126+
}
127+
}
128+
}
129+
130+
@Override
131+
public void close() {
132+
SchedulerEngine engine = scheduler.get();
133+
if (engine != null) {
134+
engine.stop();
135+
}
136+
}
137+
138+
@Override
139+
public void triggered(SchedulerEngine.Event event) {
140+
if (event.getJobName().equals(HEALTH_PERIODIC_LOGGER_JOB_NAME)) {
141+
this.tryToLogHealth();
142+
}
143+
}
144+
145+
// default visibility for testing purposes
146+
void tryToLogHealth() {
147+
if (this.currentlyRunning.compareAndExchange(false, true) == false) {
148+
RunOnce release = new RunOnce(() -> currentlyRunning.set(false));
149+
try {
150+
ActionListener<List<HealthIndicatorResult>> listenerWithRelease = ActionListener.runAfter(resultsListener, release);
151+
this.healthService.getHealth(this.client, null, false, 0, listenerWithRelease);
152+
} catch (Exception e) {
153+
logger.warn(() -> "The health periodic logger encountered an error.", e);
154+
// In case of an exception before the listener was wired, we can release the flag here, and we feel safe
155+
// that it will not release it again because this can only be run once.
156+
release.run();
157+
}
158+
}
159+
}
160+
161+
// default visibility for testing purposes
162+
SchedulerEngine getScheduler() {
163+
return this.scheduler.get();
164+
}
165+
166+
/**
167+
* Create a Map of the results, which is then turned into JSON for logging.
168+
*
169+
* The structure looks like:
170+
* {"elasticsearch.health.overall.status": "green", "elasticsearch.health.[other indicators].status": "green"}
171+
* Only the indicator status values are included, along with the computed top-level status.
172+
*
173+
* @param indicatorResults the results of the Health API call that will be used as the output.
174+
*/
175+
// default visibility for testing purposes
176+
static Map<String, Object> convertToLoggedFields(List<HealthIndicatorResult> indicatorResults) {
177+
if (indicatorResults == null || indicatorResults.isEmpty()) {
178+
return Map.of();
179+
}
180+
181+
final Map<String, Object> result = new HashMap<>();
182+
183+
// overall status
184+
final HealthStatus status = HealthStatus.merge(indicatorResults.stream().map(HealthIndicatorResult::status));
185+
result.put(String.format(Locale.ROOT, "%s.overall.status", HEALTH_FIELD_PREFIX), status.xContentValue());
186+
187+
// top-level status for each indicator
188+
indicatorResults.forEach((indicatorResult) -> {
189+
result.put(
190+
String.format(Locale.ROOT, "%s.%s.status", HEALTH_FIELD_PREFIX, indicatorResult.name()),
191+
indicatorResult.status().xContentValue()
192+
);
193+
});
194+
195+
return result;
196+
}
197+
198+
/**
199+
* Handle the result of the Health Service getHealth call
200+
*/
201+
// default visibility for testing purposes
202+
final ActionListener<List<HealthIndicatorResult>> resultsListener = new ActionListener<List<HealthIndicatorResult>>() {
203+
@Override
204+
public void onResponse(List<HealthIndicatorResult> healthIndicatorResults) {
205+
try {
206+
Map<String, Object> resultsMap = convertToLoggedFields(healthIndicatorResults);
207+
208+
// if we have a valid response, log in JSON format
209+
if (resultsMap.isEmpty() == false) {
210+
ESLogMessage msg = new ESLogMessage().withFields(resultsMap);
211+
logger.info(msg);
212+
}
213+
} catch (Exception e) {
214+
logger.warn("Health Periodic Logger error:{}", e.toString());
215+
}
216+
}
217+
218+
@Override
219+
public void onFailure(Exception e) {
220+
logger.warn("Health Periodic Logger error:{}", e.toString());
221+
}
222+
};
223+
224+
/**
225+
* Create the SchedulerEngine.Job if this node is the health node
226+
*/
227+
private void maybeScheduleJob() {
228+
if (this.isHealthNode == false) {
229+
return;
230+
}
231+
232+
// don't schedule the job if the node is shutting down
233+
if (isClusterServiceStoppedOrClosed()) {
234+
logger.trace(
235+
"Skipping scheduling a health periodic logger job due to the cluster lifecycle state being: [{}] ",
236+
clusterService.lifecycleState()
237+
);
238+
return;
239+
}
240+
241+
if (scheduler.get() == null) {
242+
scheduler.set(new SchedulerEngine(settings, clock));
243+
scheduler.get().register(this);
244+
}
245+
246+
assert scheduler.get() != null : "scheduler should be available";
247+
final SchedulerEngine.Job scheduledJob = new SchedulerEngine.Job(
248+
HEALTH_PERIODIC_LOGGER_JOB_NAME,
249+
new TimeValueSchedule(pollInterval)
250+
);
251+
scheduler.get().add(scheduledJob);
252+
}
253+
254+
private void maybeCancelJob() {
255+
if (scheduler.get() != null) {
256+
scheduler.get().remove(HEALTH_PERIODIC_LOGGER_JOB_NAME);
257+
}
258+
}
259+
260+
private void updatePollInterval(TimeValue newInterval) {
261+
this.pollInterval = newInterval;
262+
maybeScheduleJob();
263+
}
264+
265+
private boolean isClusterServiceStoppedOrClosed() {
266+
final Lifecycle.State state = clusterService.lifecycleState();
267+
return state == Lifecycle.State.STOPPED || state == Lifecycle.State.CLOSED;
268+
}
269+
}

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import org.elasticsearch.gateway.MetaStateService;
109109
import org.elasticsearch.gateway.PersistedClusterStateService;
110110
import org.elasticsearch.health.HealthIndicatorService;
111+
import org.elasticsearch.health.HealthPeriodicLogger;
111112
import org.elasticsearch.health.HealthService;
112113
import org.elasticsearch.health.metadata.HealthMetadataService;
113114
import org.elasticsearch.health.node.DiskHealthIndicatorService;
@@ -1029,6 +1030,8 @@ protected Node(
10291030
threadPool,
10301031
systemIndices
10311032
);
1033+
HealthPeriodicLogger healthPeriodicLogger = createHealthPeriodicLogger(clusterService, settings, client, healthService);
1034+
healthPeriodicLogger.init();
10321035
HealthMetadataService healthMetadataService = HealthMetadataService.create(clusterService, settings);
10331036
LocalHealthMonitor localHealthMonitor = LocalHealthMonitor.create(settings, clusterService, nodeService, threadPool, client);
10341037
HealthInfoCache nodeHealthOverview = HealthInfoCache.create(clusterService);
@@ -1136,6 +1139,7 @@ protected Node(
11361139
b.bind(Tracer.class).toInstance(tracer);
11371140
b.bind(FileSettingsService.class).toInstance(fileSettingsService);
11381141
b.bind(WriteLoadForecaster.class).toInstance(writeLoadForecaster);
1142+
b.bind(HealthPeriodicLogger.class).toInstance(healthPeriodicLogger);
11391143
});
11401144

11411145
if (ReadinessService.enabled(environment)) {
@@ -1285,6 +1289,15 @@ private HealthService createHealthService(
12851289
);
12861290
}
12871291

1292+
private HealthPeriodicLogger createHealthPeriodicLogger(
1293+
ClusterService clusterService,
1294+
Settings settings,
1295+
NodeClient client,
1296+
HealthService healthService
1297+
) {
1298+
return new HealthPeriodicLogger(settings, clusterService, client, healthService);
1299+
}
1300+
12881301
private RecoveryPlannerService getRecoveryPlannerService(
12891302
ThreadPool threadPool,
12901303
ClusterService clusterService,
@@ -1659,6 +1672,7 @@ public synchronized void close() throws IOException {
16591672
toClose.add(injector.getInstance(ReadinessService.class));
16601673
}
16611674
toClose.add(injector.getInstance(FileSettingsService.class));
1675+
toClose.add(injector.getInstance(HealthPeriodicLogger.class));
16621676

16631677
for (LifecycleComponent plugin : pluginLifecycleComponents) {
16641678
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));

0 commit comments

Comments
 (0)