Skip to content

Commit 4430cf5

Browse files
authored
OAP gRPC-Client support Health Check and [Break Change] Health Check make response 1 represents healthy, 0 represents unhealthy. (#13345)
1 parent 8139ffb commit 4430cf5

File tree

9 files changed

+93
-23
lines changed

9 files changed

+93
-23
lines changed

docs/en/changes/changes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
* chore: add `toString` implementation for `StorageID`.
3737
* chore: add a warning log when connecting to ES takes too long.
3838
* Fix the query time range in the metadata API.
39+
* OAP gRPC-Client support `Health Check`.
40+
* [Break Change] `Health Check` make response 1 represents healthy, 0 represents unhealthy.
3941

4042
#### UI
4143

docs/en/setup/backend/backend-health-check.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ If the OAP server is healthy, the response should be
3636
{
3737
"data": {
3838
"checkHealth": {
39-
"score": 0,
39+
"score": 1,
4040
"details": ""
4141
}
4242
}
@@ -49,7 +49,7 @@ If some modules are unhealthy (e.g. storage H2 is down), then the result may loo
4949
{
5050
"data": {
5151
"checkHealth": {
52-
"score": 1,
52+
"score": 0,
5353
"details": "storage_h2,"
5454
}
5555
}

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/HealthStatus.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
@Setter
2727
@ToString
2828
public class HealthStatus {
29-
// score == 0 means healthy, otherwise it's unhealthy.
29+
// score == 1 means healthy, otherwise it's unhealthy.
3030
private int score;
3131
private String details;
3232
}

oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerHttpService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public HttpResponse healthcheck(ServiceRequestContext ctx, HttpRequest req) thro
3737
final var status = healthQueryService.checkHealth();
3838
log.info("Health status: {}", status);
3939

40-
if (status.getScore() == 0) {
40+
if (status.getScore() == 1) {
4141
return HttpResponse.of(HttpStatus.OK);
4242
}
4343
return HttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE);

oap-server/server-health-checker/src/main/java/org/apache/skywalking/oap/server/health/checker/provider/HealthCheckerProvider.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import java.util.concurrent.Executors;
2626
import java.util.concurrent.ScheduledExecutorService;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
2829
import java.util.concurrent.atomic.AtomicReference;
29-
import java.util.stream.Collectors;
3030
import lombok.extern.slf4j.Slf4j;
3131
import org.apache.skywalking.oap.server.core.CoreModule;
3232
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
@@ -100,16 +100,18 @@ public void onInitialized(final HealthCheckerConfig initialized) {
100100
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
101101
ses.scheduleAtFixedRate(() -> {
102102
StringBuilder unhealthyModules = new StringBuilder();
103-
score.set(Stream.ofAll(collector.collect())
104-
.flatMap(metricFamily -> metricFamily.samples)
105-
.filter(sample -> metricsCreator.isHealthCheckerMetrics(sample.name))
106-
.peek(sample -> {
107-
if (sample.value > 0.0) {
108-
unhealthyModules.append(metricsCreator.extractModuleName(sample.name)).append(",");
109-
}
110-
})
111-
.map(sample -> sample.value)
112-
.collect(Collectors.summingDouble(Double::doubleValue)));
103+
AtomicBoolean hasUnhealthyModule = new AtomicBoolean(false);
104+
Stream.ofAll(collector.collect())
105+
.flatMap(metricFamily -> metricFamily.samples)
106+
.filter(sample -> metricsCreator.isHealthCheckerMetrics(sample.name))
107+
.forEach(sample -> {
108+
if (sample.value < 1) {
109+
unhealthyModules.append(metricsCreator.extractModuleName(sample.name)).append(",");
110+
hasUnhealthyModule.set(true);
111+
}
112+
});
113+
114+
score.set(hasUnhealthyModule.get() ? 0 : 1);
113115
details.set(unhealthyModules.toString());
114116
},
115117
2, config.getCheckIntervalSeconds(), TimeUnit.SECONDS);

oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/grpc/GRPCClient.java

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,23 @@
1818

1919
package org.apache.skywalking.oap.server.library.client.grpc;
2020

21+
import io.grpc.ConnectivityState;
2122
import io.grpc.ManagedChannel;
2223
import io.grpc.ManagedChannelBuilder;
2324
import io.grpc.netty.NettyChannelBuilder;
2425
import io.netty.handler.ssl.SslContext;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.TimeUnit;
2529
import lombok.Getter;
2630
import org.apache.skywalking.oap.server.library.client.Client;
31+
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
32+
import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
33+
import org.apache.skywalking.oap.server.library.util.HealthChecker;
2734
import org.slf4j.Logger;
2835
import org.slf4j.LoggerFactory;
2936

30-
public class GRPCClient implements Client {
37+
public class GRPCClient implements Client, HealthCheckable {
3138

3239
private static final Logger LOGGER = LoggerFactory.getLogger(GRPCClient.class);
3340

@@ -41,6 +48,12 @@ public class GRPCClient implements Client {
4148

4249
private ManagedChannel channel;
4350

51+
private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
52+
53+
private ScheduledExecutorService healthCheckExecutor;
54+
55+
private boolean enableHealthCheck = false;
56+
4457
public GRPCClient(String host, int port) {
4558
this.host = host;
4659
this.port = port;
@@ -55,9 +68,12 @@ public GRPCClient(String host, int port, final SslContext sslContext) {
5568
public void connect() {
5669
if (sslContext == null) {
5770
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
58-
return;
71+
} else {
72+
channel = NettyChannelBuilder.forAddress(host, port).sslContext(sslContext).build();
73+
}
74+
if (enableHealthCheck) {
75+
checkHealth();
5976
}
60-
channel = NettyChannelBuilder.forAddress(host, port).sslContext(sslContext).build();
6177
}
6278

6379
@Override
@@ -66,6 +82,12 @@ public void shutdown() {
6682
channel.shutdownNow();
6783
} catch (Throwable t) {
6884
LOGGER.error(t.getMessage(), t);
85+
} finally {
86+
if (healthCheckExecutor != null) {
87+
healthCheckExecutor.shutdownNow();
88+
healthChecker.unHealth("gRPC channel is shutting down. Host: " + host + ", Port: " + port);
89+
healthCheckExecutor = null;
90+
}
6991
}
7092
}
7193

@@ -77,4 +99,48 @@ public ManagedChannel getChannel() {
7799
public String toString() {
78100
return host + ":" + port;
79101
}
102+
103+
/**
104+
* Must register a HealthChecker before calling connect() if you want to enable health check.
105+
* If the channel is shutdown by client side, the health check will not be performed.
106+
* Note: If you register a `org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics` here
107+
* and the metric name start with `org.apache.skywalking.oap.server.telemetry.api.MetricsCreator.HEALTH_METRIC_PREFIX`,
108+
* this healthy status will be included in the whole OAP health evaluate.
109+
* @param healthChecker HealthChecker to be registered.
110+
*/
111+
@Override
112+
public void registerChecker(final HealthChecker healthChecker) {
113+
this.healthChecker.register(healthChecker);
114+
this.enableHealthCheck = true;
115+
}
116+
117+
private void checkHealth() {
118+
if (healthCheckExecutor == null) {
119+
healthCheckExecutor = Executors.newSingleThreadScheduledExecutor();
120+
healthCheckExecutor.scheduleAtFixedRate(
121+
() -> {
122+
ConnectivityState currentState = channel.getState(true); // true means try to connect
123+
handleStateChange(currentState);
124+
}, 5, 10, TimeUnit.SECONDS
125+
);
126+
}
127+
}
128+
129+
private void handleStateChange(ConnectivityState newState) {
130+
switch (newState) {
131+
case READY:
132+
case IDLE:
133+
this.healthChecker.health();
134+
break;
135+
case CONNECTING:
136+
this.healthChecker.unHealth("gRPC connecting, waiting for ready. Host: " + host + ", Port: " + port);
137+
break;
138+
case TRANSIENT_FAILURE:
139+
this.healthChecker.unHealth("gRPC connection failed, will retry. Host: " + host + ", Port: " + port);
140+
break;
141+
case SHUTDOWN:
142+
this.healthChecker.unHealth("gRPC channel is shutting down. Host: " + host + ", Port: " + port);
143+
break;
144+
}
145+
}
80146
}

oap-server/server-telemetry/telemetry-api/src/main/java/org/apache/skywalking/oap/server/telemetry/api/HealthCheckMetrics.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,18 @@ public HealthCheckMetrics(GaugeMetrics metrics) {
3636

3737
@Override
3838
public void health() {
39-
metrics.setValue(0);
39+
metrics.setValue(1);
4040
}
4141

4242
@Override
4343
public void unHealth(Throwable t) {
4444
log.error("Health check fails", t);
45-
metrics.setValue(1);
45+
metrics.setValue(0);
4646
}
4747

4848
@Override
4949
public void unHealth(String reason) {
5050
log.warn("Health check fails. reason: {}", reason);
51-
metrics.setValue(1);
51+
metrics.setValue(0);
5252
}
5353
}

oap-server/server-telemetry/telemetry-api/src/main/java/org/apache/skywalking/oap/server/telemetry/api/MetricsCreator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ HistogramMetrics createHistogramMetric(String name, String tips, MetricsTag.Keys
5353
default HealthCheckMetrics createHealthCheckerGauge(String name, MetricsTag.Keys tagKeys, MetricsTag.Values tagValues) {
5454
Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Require non-null or empty metric name");
5555
return new HealthCheckMetrics(createGauge(Strings.lenientFormat("%s%s", HEALTH_METRIC_PREFIX, name),
56-
Strings.lenientFormat("%s health check", name),
56+
Strings.lenientFormat("%s health check. 1 health, 0 not health, -1 unknown", name),
5757
tagKeys, tagValues));
5858
}
5959

0 commit comments

Comments
 (0)