Skip to content

Commit 39e4882

Browse files
committed
Add circuit-breaking for gRPC server and relative docs.
1 parent 23c4e8f commit 39e4882

File tree

12 files changed

+112
-7
lines changed

12 files changed

+112
-7
lines changed

docs/en/setup/backend/backend-telemetry.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
# Telemetry for backend
22
The OAP backend cluster itself is a distributed streaming process system. To assist the Ops team, we provide the telemetry for the OAP backend itself, also known as self-observability (so11y)
33

4-
By default, the telemetry is disabled by setting `selector` to `none`, like this:
4+
By default, the telemetry is disabled by setting `selector` to `prometheus`, like this, which activated the Prometheus telemetry.
55

66
```yaml
77
telemetry:
8-
selector: ${SW_TELEMETRY:none}
8+
selector: ${SW_TELEMETRY:prometheus}
99
none:
1010
prometheus:
1111
host: ${SW_TELEMETRY_PROMETHEUS_HOST:0.0.0.0}
@@ -15,7 +15,7 @@ telemetry:
1515
sslCertChainPath: ${SW_TELEMETRY_PROMETHEUS_SSL_CERT_CHAIN_PATH:""}
1616
```
1717
18-
You may also set `Prometheus` to enable them. For more information, refer to the details below.
18+
Besides the self observability, this supports [OAP circuit breaking](circuit-breaker.md) functionality.
1919
2020
## Self Observability
2121
SkyWalking supports exposing telemetry data representing OAP running status through Prometheus endpoint.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Circuit Breaking
2+
3+
Circuit breaking is a mechanism used to detect failures and encapsulate the logic of preventing OAP node crashing. It is
4+
a key component of SkyWalking's resilience strategy. This approach protects the system from overload and ensures
5+
stability.
6+
7+
Currently, there are two available strategies for circuit breaking: heap memory usage and direct memory pool size.
8+
9+
```yaml
10+
# The int value of the max heap memory usage percent. The default value is 85%.
11+
maxHeapMemoryUsagePercent: ${SW_CORE_MAX_HEAP_MEMORY_USAGE_PERCENT:85}
12+
# The long value of the max direct memory usage. The default max value is -1, representing no limit.
13+
maxDirectMemoryUsage: ${SW_CORE_MAX_DIRECT_MEMORY_USAGE:-1}
14+
```
15+
16+
SkyWalking's circuit breaker mechanism actively rejects new telemetry data from gRPC.
17+
We keep HTTP services running to serve the UI and API, and Prometheus telemetry data is still available.
18+
19+
Note, this feature relies on the `prometheus` provider in [Telemetry for backend](backend-telemetry.md) to monitor OAP
20+
server status.

docs/menu.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ catalog:
196196
path: "/en/setup/backend/backend-telemetry"
197197
- name: "OAP Health Check"
198198
path: "/en/setup/backend/backend-health-check"
199+
- name: "Circuit Breaking"
200+
path: "/en/setup/backend/circuit-breaking"
199201
- name: "BanyanDB Exclusive Setup"
200202
catalog:
201203
- name: "Progressive TTL"

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
103103
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
104104
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
105+
import org.apache.skywalking.oap.server.core.watermark.WatermarkGRPCInterceptor;
105106
import org.apache.skywalking.oap.server.core.watermark.WatermarkWatcher;
106107
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
107108
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
@@ -392,6 +393,7 @@ TTLStatusQuery.class, new TTLStatusQuery(
392393
public void start() throws ModuleStartException {
393394
grpcServer.addHandler(new RemoteServiceHandler(getManager()));
394395
grpcServer.addHandler(new HealthCheckServiceHandler());
396+
grpcServer.addInterceptor(WatermarkGRPCInterceptor.INSTANCE);
395397

396398
endpointNameGrouping.startHttpUriRecognitionSvr(
397399
getManager()

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,6 @@ public void addHandler(ServerServiceDefinition definition) {
4343

4444
@Override
4545
public void addFilter(ServerInterceptor interceptor) {
46-
server.addHandler(interceptor);
46+
server.addInterceptor(interceptor);
4747
}
4848
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.oap.server.core.watermark;
20+
21+
import io.grpc.ForwardingServerCallListener;
22+
import io.grpc.Metadata;
23+
import io.grpc.ServerCall;
24+
import io.grpc.ServerCallHandler;
25+
import io.grpc.ServerInterceptor;
26+
import io.grpc.Status;
27+
28+
/**
29+
* gRPCWatermarkInterceptor is a gRPC interceptor that checks if the watermark is exceeded before processing the request.
30+
*/
31+
public class WatermarkGRPCInterceptor extends WatermarkListener implements ServerInterceptor {
32+
public static WatermarkGRPCInterceptor INSTANCE;
33+
34+
private WatermarkGRPCInterceptor() {
35+
super("gRPC-Watermark-Interceptor");
36+
}
37+
38+
public static WatermarkGRPCInterceptor create() {
39+
INSTANCE = new WatermarkGRPCInterceptor();
40+
return INSTANCE;
41+
}
42+
43+
@Override
44+
public <REQ, RESP> ServerCall.Listener<REQ> interceptCall(final ServerCall<REQ, RESP> call,
45+
final Metadata headers,
46+
final ServerCallHandler<REQ, RESP> next) {
47+
if (isWatermarkExceeded()) {
48+
call.close(Status.RESOURCE_EXHAUSTED.withDescription("Watermark exceeded"), new Metadata());
49+
return new ServerCall.Listener<REQ>() {
50+
};
51+
}
52+
53+
ServerCall.Listener<REQ> delegate = next.startCall(call, headers);
54+
55+
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<REQ>(delegate) {
56+
@Override
57+
public void onMessage(final REQ message) {
58+
if (isWatermarkExceeded()) {
59+
call.close(Status.RESOURCE_EXHAUSTED.withDescription("Watermark exceeded"), new Metadata());
60+
return;
61+
}
62+
63+
super.onMessage(message);
64+
}
65+
};
66+
}
67+
}

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,14 @@ public class WatermarkWatcher implements Service {
5454

5555
public void start(MetricsCollector so11yCollector) {
5656
this.so11yCollector = so11yCollector;
57-
Executors.newSingleThreadScheduledExecutor()
58-
.scheduleWithFixedDelay(this::watch, 0, 10, TimeUnit.SECONDS);
5957
lock = new ReentrantLock();
6058
listeners = new ArrayList<>();
59+
60+
// Create internal listeners to watch the watermark
61+
this.addListener(WatermarkGRPCInterceptor.create());
62+
63+
Executors.newSingleThreadScheduledExecutor()
64+
.scheduleWithFixedDelay(this::watch, 0, 10, TimeUnit.SECONDS);
6165
}
6266

6367
private void watch() {

oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public void addHandler(ServerServiceDefinition definition) {
139139
nettyServerBuilder.addService(definition);
140140
}
141141

142-
public void addHandler(ServerInterceptor serverInterceptor) {
142+
public void addInterceptor(ServerInterceptor serverInterceptor) {
143143
log.info("Bind interceptor {} into gRPC server {}:{}", serverInterceptor.getClass().getSimpleName(), host, port);
144144
nettyServerBuilder.intercept(serverInterceptor);
145145
}

oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
2626
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
2727
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
28+
import org.apache.skywalking.oap.server.core.watermark.WatermarkGRPCInterceptor;
2829
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
2930
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
3031
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -138,6 +139,7 @@ public void start() throws ServiceNotProvidedException, ModuleStartException {
138139
service.addHandler(handler);
139140
service.addHandler(new AccessLogServiceGRPCHandlerV3(handler));
140141
service.addHandler(new SatelliteAccessLogServiceGRPCHandlerV3(handler));
142+
service.addFilter(WatermarkGRPCInterceptor.INSTANCE);
141143
}
142144

143145
@Override

oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/EBPFReceiverProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
2525
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
2626
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
27+
import org.apache.skywalking.oap.server.core.watermark.WatermarkGRPCInterceptor;
2728
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
2829
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
2930
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -122,6 +123,7 @@ public void start() throws ServiceNotProvidedException, ModuleStartException {
122123
service.addHandler(new EBPFProfilingServiceHandler(getManager()));
123124
service.addHandler(new ContinuousProfilingServiceHandler(getManager(), this.config));
124125
service.addHandler(new AccessLogServiceHandler(getManager()));
126+
service.addFilter(WatermarkGRPCInterceptor.INSTANCE);
125127
}
126128

127129
@Override

0 commit comments

Comments
 (0)