Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
* chore: add `toString` implementation for `StorageID`.
* chore: add a warning log when connecting to ES takes too long.
* Fix the query time range in the metadata API.
* OAP gRPC-Client support `Health Check`.
* [Break Change] `Health Check` make response 1 represents healthy, 0 represents unhealthy.

#### UI

Expand Down
4 changes: 2 additions & 2 deletions docs/en/setup/backend/backend-health-check.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ If the OAP server is healthy, the response should be
{
"data": {
"checkHealth": {
"score": 0,
"score": 1,
"details": ""
}
}
Expand All @@ -49,7 +49,7 @@ If some modules are unhealthy (e.g. storage H2 is down), then the result may loo
{
"data": {
"checkHealth": {
"score": 1,
"score": 0,
"details": "storage_h2,"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@Setter
@ToString
public class HealthStatus {
// score == 0 means healthy, otherwise it's unhealthy.
// score == 1 means healthy, otherwise it's unhealthy.
private int score;
private String details;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public HttpResponse healthcheck(ServiceRequestContext ctx, HttpRequest req) thro
final var status = healthQueryService.checkHealth();
log.info("Health status: {}", status);

if (status.getScore() == 0) {
if (status.getScore() == 1) {
return HttpResponse.of(HttpStatus.OK);
}
return HttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
Expand Down Expand Up @@ -100,16 +100,18 @@ public void onInitialized(final HealthCheckerConfig initialized) {
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
ses.scheduleAtFixedRate(() -> {
StringBuilder unhealthyModules = new StringBuilder();
score.set(Stream.ofAll(collector.collect())
.flatMap(metricFamily -> metricFamily.samples)
.filter(sample -> metricsCreator.isHealthCheckerMetrics(sample.name))
.peek(sample -> {
if (sample.value > 0.0) {
unhealthyModules.append(metricsCreator.extractModuleName(sample.name)).append(",");
}
})
.map(sample -> sample.value)
.collect(Collectors.summingDouble(Double::doubleValue)));
AtomicBoolean hasUnhealthyModule = new AtomicBoolean(false);
Stream.ofAll(collector.collect())
.flatMap(metricFamily -> metricFamily.samples)
.filter(sample -> metricsCreator.isHealthCheckerMetrics(sample.name))
.forEach(sample -> {
if (sample.value < 1) {
unhealthyModules.append(metricsCreator.extractModuleName(sample.name)).append(",");
hasUnhealthyModule.set(true);
}
});

score.set(hasUnhealthyModule.get() ? 0 : 1);
details.set(unhealthyModules.toString());
},
2, config.getCheckIntervalSeconds(), TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,23 @@

package org.apache.skywalking.oap.server.library.client.grpc;

import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContext;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker;
import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable;
import org.apache.skywalking.oap.server.library.util.HealthChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GRPCClient implements Client {
public class GRPCClient implements Client, HealthCheckable {

private static final Logger LOGGER = LoggerFactory.getLogger(GRPCClient.class);

Expand All @@ -41,6 +48,12 @@ public class GRPCClient implements Client {

private ManagedChannel channel;

private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();

private ScheduledExecutorService healthCheckExecutor;

private boolean enableHealthCheck = false;

public GRPCClient(String host, int port) {
this.host = host;
this.port = port;
Expand All @@ -55,9 +68,12 @@ public GRPCClient(String host, int port, final SslContext sslContext) {
public void connect() {
if (sslContext == null) {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
return;
} else {
channel = NettyChannelBuilder.forAddress(host, port).sslContext(sslContext).build();
}
if (enableHealthCheck) {
checkHealth();
}
channel = NettyChannelBuilder.forAddress(host, port).sslContext(sslContext).build();
}

@Override
Expand All @@ -66,6 +82,12 @@ public void shutdown() {
channel.shutdownNow();
} catch (Throwable t) {
LOGGER.error(t.getMessage(), t);
} finally {
if (healthCheckExecutor != null) {
healthCheckExecutor.shutdownNow();
healthChecker.unHealth("gRPC channel is shutting down. Host: " + host + ", Port: " + port);
healthCheckExecutor = null;
}
}
}

Expand All @@ -77,4 +99,48 @@ public ManagedChannel getChannel() {
public String toString() {
return host + ":" + port;
}

/**
* Must register a HealthChecker before calling connect() if you want to enable health check.
* If the channel is shutdown by client side, the health check will not be performed.
* Note: If you register a `org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics` here
* and the metric name start with `org.apache.skywalking.oap.server.telemetry.api.MetricsCreator.HEALTH_METRIC_PREFIX`,
* this healthy status will be included in the whole OAP health evaluate.
* @param healthChecker HealthChecker to be registered.
*/
@Override
public void registerChecker(final HealthChecker healthChecker) {
this.healthChecker.register(healthChecker);
this.enableHealthCheck = true;
}

private void checkHealth() {
if (healthCheckExecutor == null) {
healthCheckExecutor = Executors.newSingleThreadScheduledExecutor();
healthCheckExecutor.scheduleAtFixedRate(
() -> {
ConnectivityState currentState = channel.getState(true); // true means try to connect
handleStateChange(currentState);
}, 5, 10, TimeUnit.SECONDS
);
}
}

private void handleStateChange(ConnectivityState newState) {
switch (newState) {
case READY:
case IDLE:
this.healthChecker.health();
break;
case CONNECTING:
this.healthChecker.unHealth("gRPC connecting, waiting for ready. Host: " + host + ", Port: " + port);
break;
case TRANSIENT_FAILURE:
this.healthChecker.unHealth("gRPC connection failed, will retry. Host: " + host + ", Port: " + port);
break;
case SHUTDOWN:
this.healthChecker.unHealth("gRPC channel is shutting down. Host: " + host + ", Port: " + port);
break;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ public HealthCheckMetrics(GaugeMetrics metrics) {

@Override
public void health() {
metrics.setValue(0);
metrics.setValue(1);
}

@Override
public void unHealth(Throwable t) {
log.error("Health check fails", t);
metrics.setValue(1);
metrics.setValue(0);
}

@Override
public void unHealth(String reason) {
log.warn("Health check fails. reason: {}", reason);
metrics.setValue(1);
metrics.setValue(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ HistogramMetrics createHistogramMetric(String name, String tips, MetricsTag.Keys
default HealthCheckMetrics createHealthCheckerGauge(String name, MetricsTag.Keys tagKeys, MetricsTag.Values tagValues) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(name), "Require non-null or empty metric name");
return new HealthCheckMetrics(createGauge(Strings.lenientFormat("%s%s", HEALTH_METRIC_PREFIX, name),
Strings.lenientFormat("%s health check", name),
Strings.lenientFormat("%s health check. 1 health, 0 not health, -1 unknown", name),
tagKeys, tagValues));
}

Expand Down
Loading