Skip to content

Commit 647928e

Browse files
authored
Add rate limiter for Zipkin trace receiver (#13481)
1 parent 093e404 commit 647928e

File tree

10 files changed

+64
-13
lines changed

10 files changed

+64
-13
lines changed

docs/en/changes/changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
* Fix otlp log handler reponse error and otlp span convert error.
9595
* Fix service_relation source layer in mq entry span analyse.
9696
* Fix metrics comparison in promql with bool modifier.
97+
* Add rate limiter for Zipkin trace receiver to limit maximum spans per second.
9798

9899
#### UI
99100

docs/en/setup/backend/configuration-vocabulary.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ It divided into several modules, each of which has its own settings. The followi
198198
| - | - | enabledOtelMetricsRules | Enabled metric rules for OTLP handler. | SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES | - |
199199
| receiver-zipkin | default | A receiver for Zipkin traces. | - | - | |
200200
| - | - | sampleRate | The sample rate precision is 1/10000, should be between 0 and 10000 | SW_ZIPKIN_SAMPLE_RATE | 10000 |
201+
| - | - | maxSpansPerSecond | The maximum spans to be collected per second. 0 means no limit. Spans exceeding this threshold will be dropped. | SW_ZIPKIN_MAX_SPANS_PER_SECOND | 0 |
201202
| - | - | searchableTracesTags | Defines a set of span tag keys which are searchable. Multiple values are separated by commas. The max length of key=value should be less than 256 or will be dropped. | SW_ZIPKIN_SEARCHABLE_TAG_KEYS | http.method |
202203
| - | - | enableHttpCollector | Enable Http Collector. | SW_ZIPKIN_HTTP_COLLECTOR_ENABLED | true |
203204
| - | - | restHost | Binding IP of RESTful services. | SW_RECEIVER_ZIPKIN_REST_HOST | 0.0.0.0 |

docs/en/setup/backend/zipkin-trace.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ receiver-zipkin:
1616
searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
1717
# The sample rate precision is 1/10000, should be between 0 and 10000
1818
sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000}
19+
# The maximum spans to be collected per second. 0 means no limit. Spans exceeding this threshold will be dropped.
20+
maxSpansPerSecond: ${SW_ZIPKIN_MAX_SPANS_PER_SECOND:0}
1921
## The below configs are for OAP collect zipkin trace from HTTP
2022
enableHttpCollector: ${SW_ZIPKIN_HTTP_COLLECTOR_ENABLED:true}
2123
restHost: ${SW_RECEIVER_ZIPKIN_REST_HOST:0.0.0.0}
@@ -71,4 +73,4 @@ Lens UI is Zipkin native UI. SkyWalking webapp has bundled it in the binary dist
7173
`{webapp IP}:{webapp port}/zipkin` is exposed and accessible for the browser.
7274
Meanwhile, `Iframe` UI component could be used to host Zipkin Lens UI on the SkyWalking booster UI dashboard.(link=/zipkin)
7375

74-
Zipkin Lens UI source codes could be found [here](https://github.com/openzipkin/zipkin/tree/master/zipkin-lens).
76+
Zipkin Lens UI source codes could be found [here](https://github.com/openzipkin/zipkin/tree/master/zipkin-lens).

oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryTraceHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
4343
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
4444
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
45+
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
4546
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
4647
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
4748
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
@@ -78,6 +79,13 @@ public class OpenTelemetryTraceHandler
7879
MetricsTag.EMPTY_KEY,
7980
MetricsTag.EMPTY_VALUE
8081
);
82+
@Getter(lazy = true)
83+
private final CounterMetrics droppedSpans = getMetricsCreator().createCounter(
84+
"otel_spans_dropped",
85+
"The count of spans that were dropped due to rate limit",
86+
MetricsTag.EMPTY_KEY,
87+
MetricsTag.EMPTY_VALUE
88+
);
8189

8290
@Override
8391
public void init(ModuleManager manager, OtelMetricReceiverConfig config) {
@@ -127,7 +135,10 @@ public void export(ExportTraceServiceRequest request, StreamObserver<ExportTrace
127135
log.warn("convert span error, discarding the span: {}", e.getMessage());
128136
}
129137
});
130-
getForwardService().send(result);
138+
final var processedSpans = getForwardService().send(result);
139+
if (processedSpans.size() < result.size()) {
140+
getDroppedSpans().inc(result.size() - processedSpans.size());
141+
}
131142
}
132143

133144
responseObserver.onNext(ExportTraceServiceResponse.getDefaultInstance());

oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/SpanForwardService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525

2626
public interface SpanForwardService extends Service {
2727
/**
28-
* Forward and process zipkin span
28+
* Forward and process zipkin span.
29+
*
30+
* @return the spans that are successfully processed.
2931
*/
30-
void send(List<Span> spanList);
32+
List<Span> send(List<Span> spanList);
3133
}

oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/ZipkinReceiverConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class ZipkinReceiverConfig extends ModuleConfig {
3535
private int restAcceptQueueSize = 0;
3636
private String searchableTracesTags = DEFAULT_SEARCHABLE_TAG_KEYS;
3737
private int sampleRate = 10000;
38+
private int maxSpansPerSecond = 0;
3839

3940
private static final String DEFAULT_SEARCHABLE_TAG_KEYS = String.join(
4041
Const.COMMA,

oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/ZipkinSpanHTTPHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
public class ZipkinSpanHTTPHandler {
4444
private final HistogramMetrics histogram;
4545
private final CounterMetrics errorCounter;
46+
private final CounterMetrics droppedCounter;
4647
private final SpanForward spanForward;
4748

4849
public ZipkinSpanHTTPHandler(SpanForward forward, ModuleManager manager) {
@@ -58,6 +59,10 @@ public ZipkinSpanHTTPHandler(SpanForward forward, ModuleManager manager) {
5859
"trace_analysis_error_count", "The error number of trace analysis",
5960
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-http")
6061
);
62+
droppedCounter = metricsCreator.createCounter(
63+
"spans_dropped_count", "The count of spans that were dropped due to rate limit",
64+
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-http")
65+
);
6166
}
6267

6368
@Post("/api/v2/spans")
@@ -100,7 +105,10 @@ HttpResponse doCollectSpans(final SpanBytesDecoder decoder,
100105
final HttpResponse response = HttpResponse.from(req.aggregate().thenApply(request -> {
101106
try (final HttpData httpData = request.content()) {
102107
final List<Span> spanList = decoder.decodeList(httpData.byteBuf().nioBuffer());
103-
spanForward.send(spanList);
108+
final var processedSpans = spanForward.send(spanList);
109+
if (processedSpans.size() < spanList.size()) {
110+
droppedCounter.inc(spanList.size() - processedSpans.size());
111+
}
104112
return HttpResponse.of(HttpStatus.ACCEPTED);
105113
}
106114
}));

oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/trace/SpanForward.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package org.apache.skywalking.oap.server.receiver.zipkin.trace;
2020

21+
import com.google.common.util.concurrent.RateLimiter;
2122
import com.google.gson.JsonObject;
2223
import java.util.ArrayList;
2324
import java.util.Arrays;
25+
import java.util.Collections;
2426
import java.util.List;
2527

2628
import java.util.Map;
@@ -55,20 +57,25 @@ public class SpanForward implements SpanForwardService {
5557
private final long samplerBoundary;
5658
private NamingControl namingControl;
5759
private SourceReceiver receiver;
60+
private RateLimiter rateLimiter;
5861

5962
public SpanForward(final ZipkinReceiverConfig config, final ModuleManager manager) {
6063
this.config = config;
6164
this.moduleManager = manager;
6265
this.searchTagKeys = Arrays.asList(config.getSearchableTracesTags().split(Const.COMMA));
6366
float sampleRate = (float) config.getSampleRate() / 10000;
6467
samplerBoundary = (long) (Long.MAX_VALUE * sampleRate);
68+
if (config.getMaxSpansPerSecond() > 0) {
69+
this.rateLimiter = RateLimiter.create(config.getMaxSpansPerSecond());
70+
}
6571
}
6672

67-
public void send(List<Span> spanList) {
73+
public List<Span> send(List<Span> spanList) {
6874
if (CollectionUtils.isEmpty(spanList)) {
69-
return;
75+
return Collections.emptyList();
7076
}
71-
getSampledTraces(spanList).forEach(span -> {
77+
final var sampledTraces = getSampledTraces(spanList);
78+
sampledTraces.forEach(span -> {
7279
ZipkinSpan zipkinSpan = new ZipkinSpan();
7380
String serviceName = span.localServiceName();
7481
if (StringUtil.isEmpty(serviceName)) {
@@ -153,6 +160,7 @@ public void send(List<Span> spanList) {
153160
toServiceRelation(zipkinSpan, minuteTimeBucket);
154161
}
155162
});
163+
return sampledTraces;
156164
}
157165

158166
private void addAutocompleteTags(final long minuteTimeBucket, final String key, final String value) {
@@ -188,8 +196,8 @@ private void toServiceRelation(ZipkinSpan zipkinSpan, final long minuteTimeBucke
188196
}
189197

190198
private List<Span> getSampledTraces(List<Span> input) {
191-
//100% sampleRate
192-
if (config.getSampleRate() == 10000) {
199+
// 100% sampleRate and no rateLimiter, return all spans
200+
if (config.getSampleRate() == 10000 && rateLimiter == null) {
193201
return input;
194202
}
195203
List<Span> sampledTraces = new ArrayList<>(input.size());
@@ -198,10 +206,23 @@ private List<Span> getSampledTraces(List<Span> input) {
198206
sampledTraces.add(span);
199207
continue;
200208
}
201-
long traceId = HexCodec.lowerHexToUnsignedLong(span.traceId());
202-
traceId = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId);
203-
if (traceId <= samplerBoundary) {
209+
210+
// Apply maximum spans per minute sampling first
211+
if (rateLimiter != null && !rateLimiter.tryAcquire()) {
212+
log.debug("Span dropped due to maximum spans per minute limit: {}", span.id());
213+
continue;
214+
}
215+
216+
// Apply percentage-based sampling
217+
if (config.getSampleRate() == 10000) {
218+
// 100% sample rate - include all spans that passed the maximum spans check
204219
sampledTraces.add(span);
220+
} else {
221+
long traceId = HexCodec.lowerHexToUnsignedLong(span.traceId());
222+
traceId = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId);
223+
if (traceId <= samplerBoundary) {
224+
sampledTraces.add(span);
225+
}
205226
}
206227
}
207228
return sampledTraces;

oap-server/server-starter/src/main/resources/application.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,8 @@ receiver-zipkin:
378378
searchableTracesTags: ${SW_ZIPKIN_SEARCHABLE_TAG_KEYS:http.method}
379379
# The sample rate precision is 1/10000, should be between 0 and 10000
380380
sampleRate: ${SW_ZIPKIN_SAMPLE_RATE:10000}
381+
# The maximum spans to be collected per second. 0 means no limit. Spans exceeding this threshold will be dropped.
382+
maxSpansPerSecond: ${SW_ZIPKIN_MAX_SPANS_PER_SECOND:0}
381383
## The below configs are for OAP collect zipkin trace from HTTP
382384
enableHttpCollector: ${SW_ZIPKIN_HTTP_COLLECTOR_ENABLED:true}
383385
restHost: ${SW_RECEIVER_ZIPKIN_REST_HOST:0.0.0.0}

oap-server/server-starter/src/main/resources/otel-rules/oap.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ metricsRules:
118118
exp: otel_logs_latency_count.sum(['service', 'host_name']).increase('PT1M')
119119
- name: otel_spans_received
120120
exp: otel_spans_latency_count.sum(['service', 'host_name']).increase('PT1M')
121+
- name: otel_spans_dropped
122+
exp: otel_spans_dropped.sum(['service', 'host_name']).increase('PT1M')
121123
- name: otel_metrics_latency_percentile
122124
exp: otel_metrics_latency.sum(['le', 'service', 'host_name']).increase('PT1M').histogram().histogram_percentile([50,70,90,99])
123125
- name: otel_logs_latency_percentile

0 commit comments

Comments
 (0)