Skip to content

Commit b5588d1

Browse files
authored
fix(sampling): ensure the rate limiter operates on positive time intervals (#9416)
## Motivation Currently the RateLimiter samples the first span in trace using `Span.start_ns` and evaluating this timestamp against the last seen timestamp. [RateLimiter.is_allowed(...)](https://github.com/DataDog/dd-trace-py/blob/v2.9.0rc7/ddtrace/internal/rate_limiter.py#L60) works as expected if it receives monotonically increasing timestamps. However if this method receives a timestamp that is less than a previous value it will compute a [negative time window](https://github.com/DataDog/dd-trace-py/blob/v2.9.0rc7/ddtrace/internal/rate_limiter.py#L126) and then set an [incorrect rate_limit](https://github.com/DataDog/dd-trace-py/blob/v2.9.0rc7/ddtrace/internal/rate_limiter.py#L136). ddtrace v2.8.0 introduced support for lazy sampling. With this feature sample rates and rate limits are no longer applied on span start. This increased the frequency of this bug: 9707da1. ## Description This PR resolves this issue by: - Deprecating the timestamp argument in `RateLimiter.is_allowed`. The current time will always be used to compute span rate limits (instead of Span.start_ns). This will ensure rate limits are computed on ONLY increasing time intervals. - Ensuring a lock is acquired when computing rate limits and updating rate counts. Currently we only acquire a lock to compute `RateLimiter._replenish`. This is not sufficient. ## Reproduction - This bug can be reproduced by generating two spans with different start times but the same end time. The span with earliest start time should be finished last. Failing regression test: https://app.circleci.com/pipelines/github/DataDog/dd-trace-py/62701/workflows/915c8cc5-6968-4069-a379-84929b239df8/jobs/3906251 ## Checklist - [x] Change(s) are motivated and described in the PR description - [x] Testing strategy is described if automated tests are not included in the PR - [x] Risks are described (performance impact, potential for breakage, maintainability) - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed or label `changelog/no-changelog` is set - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)) - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] If this PR changes the public interface, I've notified `@DataDog/apm-tees`. ## Reviewer Checklist - [x] Title is accurate - [x] All changes are related to the pull request's stated goal - [x] Description motivates each change - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - [x] Testing strategy adequately addresses listed risks - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] Release note makes sense to a user of the library - [x] Author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
1 parent 844117e commit b5588d1

File tree

9 files changed

+150
-64
lines changed

9 files changed

+150
-64
lines changed

ddtrace/appsec/_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ def _waf_action(
374374
if waf_results.data or blocked:
375375
# We run the rate limiter only if there is an attack, its goal is to limit the number of collected asm
376376
# events
377-
allowed = self._rate_limiter.is_allowed(span.start_ns)
377+
allowed = self._rate_limiter.is_allowed()
378378
if not allowed:
379379
# TODO: add metric collection to keep an eye (when it's name is clarified)
380380
return waf_results

ddtrace/internal/core/_core.pyi

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,20 @@ class RateLimiter:
2929
:param time_window: The time window where the rate limit applies in nanoseconds. default value is 1 second.
3030
:type time_window: :obj:`float`
3131
"""
32-
def is_allowed(self, timestamp_ns: int) -> bool:
32+
def is_allowed(self, timestamp_ns: typing.Optional[int] = None) -> bool:
3333
"""
3434
Check whether the current request is allowed or not
3535
3636
This method will also reduce the number of available tokens by 1
3737
38+
:param int timestamp_ns: timestamp in nanoseconds for the current request. [deprecated]
39+
:returns: Whether the current request is allowed or not
40+
:rtype: :obj:`bool`
41+
"""
42+
def _is_allowed(self, timestamp_ns: int) -> bool:
43+
"""
44+
Internal method to check whether the current request is allowed or not
45+
3846
:param int timestamp_ns: timestamp in nanoseconds for the current request.
3947
:returns: Whether the current request is allowed or not
4048
:rtype: :obj:`bool`

ddtrace/internal/rate_limiter.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
import attr
1010

11+
from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning
12+
from ddtrace.vendor.debtcollector import deprecate
13+
1114
from ..internal import compat
1215
from ..internal.constants import DEFAULT_SAMPLING_RATE_LIMIT
1316
from .core import RateLimiter as _RateLimiter
@@ -18,6 +21,17 @@ class RateLimiter(_RateLimiter):
1821
def _has_been_configured(self):
1922
return self.rate_limit != DEFAULT_SAMPLING_RATE_LIMIT
2023

24+
def is_allowed(self, timestamp_ns: Optional[int] = None) -> bool:
25+
if timestamp_ns is not None:
26+
deprecate(
27+
"The `timestamp_ns` parameter is deprecated and will be removed in a future version."
28+
"Ratelimiter will use the current time.",
29+
category=DDTraceDeprecationWarning,
30+
)
31+
# rate limits are tested and mocked in pytest so we need to compute the timestamp here
32+
# (or move the unit tests to rust)
33+
return self._is_allowed(compat.monotonic_ns())
34+
2135

2236
class RateLimitExceeded(Exception):
2337
pass

ddtrace/internal/sampling.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ def __init__(
147147
def sample(self, span):
148148
# type: (Span) -> bool
149149
if self._sample(span):
150-
if self._limiter.is_allowed(span.start_ns):
150+
if self._limiter.is_allowed():
151151
self.apply_span_sampling_tags(span)
152152
return True
153153
return False
@@ -310,7 +310,7 @@ def _apply_rate_limit(span, sampled, limiter):
310310
# type: (Span, bool, RateLimiter) -> bool
311311
allowed = True
312312
if sampled:
313-
allowed = limiter.is_allowed(span.start_ns)
313+
allowed = limiter.is_allowed()
314314
if not allowed:
315315
_set_priority(span, USER_REJECT)
316316
if limiter._has_been_configured:
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
tracing: Ensures spans are rate limited at the expected rate (100 spans per second by default). Previously long running spans would set the rate limiter to set an invalid window and this could cause the next trace to be dropped.

src/core/rate_limiter.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ impl RateLimiter {
3131
}
3232
}
3333

34-
pub fn is_allowed(&mut self, timestamp_ns: f64) -> bool {
34+
pub fn _is_allowed(&mut self, timestamp_ns: f64) -> bool {
3535
let mut _lock = self._lock.lock().unwrap();
3636

3737
let allowed = (|| -> bool {
@@ -43,7 +43,11 @@ impl RateLimiter {
4343
}
4444

4545
if self.tokens < self.max_tokens {
46-
let elapsed: f64 = (timestamp_ns - self.last_update_ns) / self.time_window;
46+
let mut elapsed: f64 = (timestamp_ns - self.last_update_ns) / self.time_window;
47+
if elapsed < 0.0 {
48+
// Note - this should never happen, but if it does, we should reset the elapsed time to avoid negative tokens.
49+
elapsed = 0.0
50+
}
4751
self.tokens += elapsed * self.max_tokens;
4852
if self.tokens > self.max_tokens {
4953
self.tokens = self.max_tokens;
@@ -114,8 +118,8 @@ impl RateLimiterPy {
114118
}
115119
}
116120

117-
pub fn is_allowed(&mut self, py: Python<'_>, timestamp_ns: f64) -> bool {
118-
py.allow_threads(|| self.rate_limiter.is_allowed(timestamp_ns))
121+
pub fn _is_allowed(&mut self, py: Python<'_>, timestamp_ns: f64) -> bool {
122+
py.allow_threads(|| self.rate_limiter._is_allowed(timestamp_ns))
119123
}
120124

121125
#[getter]

tests/integration/test_sampling.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import mock
12
import pytest
23

34
from ddtrace import config
@@ -297,3 +298,48 @@ def test_extended_sampling_float_special_case_match_star(writer, tracer):
297298
tracer.configure(sampler=sampler, writer=writer)
298299
with tracer.trace(name="should_send") as span:
299300
span.set_tag("tag", 20.1)
301+
302+
303+
def test_rate_limiter_on_spans(tracer):
304+
"""
305+
Ensure that the rate limiter is applied to spans
306+
"""
307+
tracer.configure(sampler=DatadogSampler(rate_limit=10))
308+
spans = []
309+
# Generate 10 spans with the start and finish time in same second
310+
for x in range(10):
311+
start_time = x / 10
312+
span = tracer.trace(name=f"span {start_time}")
313+
span.start = start_time
314+
span.finish(1 - start_time)
315+
spans.append(span)
316+
# Generate 11th span in the same second
317+
dropped_span = tracer.trace(name=f"span {start_time}")
318+
dropped_span.start = 0.8
319+
dropped_span.finish(0.9)
320+
# Spans are sampled on flush
321+
tracer.flush()
322+
# Since the rate limiter is set to 10, first ten spans should be kept
323+
for span in spans:
324+
assert span.context.sampling_priority > 0
325+
# 11th span should be dropped
326+
assert dropped_span.context.sampling_priority < 0
327+
328+
329+
def test_rate_limiter_on_long_running_spans(tracer):
330+
"""
331+
Ensure that the rate limiter is applied on increasing time intervals
332+
"""
333+
tracer.configure(sampler=DatadogSampler(rate_limit=5))
334+
335+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=1617333414):
336+
span_m30 = tracer.trace(name="march 30")
337+
span_m30.start = 1622347257 # Mar 30 2021
338+
span_m30.finish(1617333414) # April 2 2021
339+
340+
span_m29 = tracer.trace(name="march 29")
341+
span_m29.start = 1616999414 # Mar 29 2021
342+
span_m29.finish(1617333414) # April 2 2021
343+
344+
assert span_m29.context.sampling_priority > 0
345+
assert span_m30.context.sampling_priority > 0

tests/tracer/test_rate_limiter.py

Lines changed: 65 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ def test_rate_limiter_rate_limit_0(time_window):
3333
now_ns = compat.monotonic_ns()
3434
for i in nanoseconds(10000, time_window):
3535
# Make sure the time is different for every check
36-
assert limiter.is_allowed(now_ns + i) is False
36+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=now_ns + i):
37+
assert limiter.is_allowed() is False
3738

3839

3940
@pytest.mark.parametrize("time_window", [1e3, 1e6, 1e9])
@@ -46,30 +47,32 @@ def test_rate_limiter_rate_limit_negative(time_window):
4647
now_ns = compat.monotonic_ns()
4748
for i in nanoseconds(10000, time_window):
4849
# Make sure the time is different for every check
49-
assert limiter.is_allowed(now_ns + i) is True
50+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=now_ns + i):
51+
assert limiter.is_allowed() is True
5052

5153

5254
@pytest.mark.parametrize("rate_limit", [1, 10, 50, 100, 500, 1000])
5355
@pytest.mark.parametrize("time_window", [1e3, 1e6, 1e9])
5456
def test_rate_limiter_is_allowed(rate_limit, time_window):
5557
limiter = RateLimiter(rate_limit=rate_limit, time_window=time_window)
5658

57-
def check_limit(time_ns):
59+
def check_limit():
5860
# Up to the allowed limit is allowed
5961
for _ in range(rate_limit):
60-
assert limiter.is_allowed(time_ns) is True
62+
assert limiter.is_allowed() is True
6163

6264
# Any over the limit is disallowed
6365
for _ in range(1000):
64-
assert limiter.is_allowed(time_ns) is False
66+
assert limiter.is_allowed() is False
6567

6668
# Start time
6769
now = compat.monotonic_ns()
6870

6971
# Check the limit for 5 time frames
7072
for i in nanoseconds(5, time_window):
7173
# Keep the same timeframe
72-
check_limit(now + i)
74+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=now + i):
75+
check_limit()
7376

7477

7578
@pytest.mark.parametrize("time_window", [1e3, 1e6, 1e9])
@@ -79,12 +82,14 @@ def test_rate_limiter_is_allowed_large_gap(time_window):
7982
# Start time
8083
now_ns = compat.monotonic_ns()
8184
# Keep the same timeframe
82-
for _ in range(100):
83-
assert limiter.is_allowed(now_ns) is True
85+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=now_ns):
86+
for _ in range(100):
87+
assert limiter.is_allowed() is True
8488

8589
# Large gap before next call to `is_allowed()`
86-
for _ in range(100):
87-
assert limiter.is_allowed(now_ns + (time_window * 100)) is True
90+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=now_ns + (time_window * 100)):
91+
for _ in range(100):
92+
assert limiter.is_allowed() is True
8893

8994

9095
@pytest.mark.parametrize("time_window", [1e3, 1e6, 1e9])
@@ -98,8 +103,8 @@ def test_rate_limiter_is_allowed_small_gaps(time_window):
98103
for i in nanoseconds(10000, time_window):
99104
# Keep the same timeframe
100105
time_ns = now_ns + (gap * i)
101-
102-
assert limiter.is_allowed(time_ns) is True
106+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=time_ns):
107+
assert limiter.is_allowed() is True
103108

104109

105110
@pytest.mark.parametrize("time_window", [1e3, 1e6, 1e9])
@@ -108,30 +113,31 @@ def test_rate_liimter_effective_rate_rates(time_window):
108113

109114
# Static rate limit window
110115
starting_window_ns = compat.monotonic_ns()
111-
for _ in range(100):
112-
assert limiter.is_allowed(starting_window_ns) is True
113-
assert limiter.effective_rate == 1.0
114-
assert limiter.current_window_ns == starting_window_ns
115-
116-
for i in range(1, 101):
117-
assert limiter.is_allowed(starting_window_ns) is False
118-
rate = 100 / (100 + i)
119-
assert limiter.effective_rate == rate
120-
assert limiter.current_window_ns == starting_window_ns
116+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=starting_window_ns):
117+
for _ in range(100):
118+
assert limiter.is_allowed() is True
119+
assert limiter.effective_rate == 1.0
120+
assert limiter.current_window_ns == starting_window_ns
121+
122+
for i in range(1, 101):
123+
assert limiter.is_allowed() is False
124+
rate = 100 / (100 + i)
125+
assert limiter.effective_rate == rate
126+
assert limiter.current_window_ns == starting_window_ns
121127

122128
prev_rate = 0.5
123129
window_ns = starting_window_ns + time_window
130+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=window_ns):
131+
for _ in range(100):
132+
assert limiter.is_allowed() is True
133+
assert limiter.effective_rate == 0.75
134+
assert limiter.current_window_ns == window_ns
124135

125-
for _ in range(100):
126-
assert limiter.is_allowed(window_ns) is True
127-
assert limiter.effective_rate == 0.75
128-
assert limiter.current_window_ns == window_ns
129-
130-
for i in range(1, 101):
131-
assert limiter.is_allowed(window_ns) is False
132-
rate = 100 / (100 + i)
133-
assert limiter.effective_rate == (rate + prev_rate) / 2
134-
assert limiter.current_window_ns == window_ns
136+
for i in range(1, 101):
137+
assert limiter.is_allowed() is False
138+
rate = 100 / (100 + i)
139+
assert limiter.effective_rate == (rate + prev_rate) / 2
140+
assert limiter.current_window_ns == window_ns
135141

136142

137143
@pytest.mark.parametrize("time_window", [1e3, 1e6, 1e9])
@@ -150,47 +156,51 @@ def test_rate_limiter_effective_rate_starting_rate(time_window):
150156
assert limiter.prev_window_rate is None
151157

152158
# Calling `.is_allowed()` updates the values
153-
assert limiter.is_allowed(now_ns) is True
154-
assert limiter.effective_rate == 1.0
155-
assert limiter.current_window_ns == now_ns
156-
assert limiter.prev_window_rate is None
159+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=now_ns):
160+
assert limiter.is_allowed() is True
161+
assert limiter.effective_rate == 1.0
162+
assert limiter.current_window_ns == now_ns
163+
assert limiter.prev_window_rate is None
157164

158165
# Gap of 0.9999 seconds, same window
159166
time_ns = now_ns + (0.9999 * time_window)
160-
assert limiter.is_allowed(time_ns) is False
161-
# DEV: We have rate_limit=1 set
162-
assert limiter.effective_rate == 0.5
163-
assert limiter.current_window_ns == now_ns
164-
assert limiter.prev_window_rate is None
167+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=time_ns):
168+
assert limiter.is_allowed() is False
169+
# DEV: We have rate_limit=1 set
170+
assert limiter.effective_rate == 0.5
171+
assert limiter.current_window_ns == now_ns
172+
assert limiter.prev_window_rate is None
165173

166174
# Gap of 1.0 seconds, new window
167175
time_ns = now_ns + time_window
168-
assert limiter.is_allowed(time_ns) is True
169-
assert limiter.effective_rate == 0.75
170-
assert limiter.current_window_ns == (now_ns + time_window)
171-
assert limiter.prev_window_rate == 0.5
176+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=time_ns):
177+
assert limiter.is_allowed() is True
178+
assert limiter.effective_rate == 0.75
179+
assert limiter.current_window_ns == (now_ns + time_window)
180+
assert limiter.prev_window_rate == 0.5
172181

173182
# Gap of 1.9999 seconds, same window
174183
time_ns = now_ns + (1.9999 * time_window)
175-
assert limiter.is_allowed(time_ns) is False
176-
assert limiter.effective_rate == 0.5
177-
assert limiter.current_window_ns == (now_ns + time_window) # Same as old window
178-
assert limiter.prev_window_rate == 0.5
184+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=time_ns):
185+
assert limiter.is_allowed() is False
186+
assert limiter.effective_rate == 0.5
187+
assert limiter.current_window_ns == (now_ns + time_window) # Same as old window
188+
assert limiter.prev_window_rate == 0.5
179189

180190
# Large gap of 100 seconds, new window
181191
time_ns = now_ns + (100.0 * time_window)
182-
assert limiter.is_allowed(time_ns) is True
183-
assert limiter.effective_rate == 0.75
184-
assert limiter.current_window_ns == (now_ns + (100.0 * time_window))
185-
assert limiter.prev_window_rate == 0.5
192+
with mock.patch("ddtrace.internal.rate_limiter.compat.monotonic_ns", return_value=time_ns):
193+
assert limiter.is_allowed() is True
194+
assert limiter.effective_rate == 0.75
195+
assert limiter.current_window_ns == (now_ns + (100.0 * time_window))
196+
assert limiter.prev_window_rate == 0.5
186197

187198

188199
def test_rate_limiter_3():
189200
limiter = RateLimiter(rate_limit=2)
190201

191-
now_ns = compat.monotonic_ns()
192202
for i in range(3):
193-
decision = limiter.is_allowed(now_ns)
203+
decision = limiter.is_allowed()
194204
# the first two should be allowed, the third should not
195205
if i < 2:
196206
assert decision is True

tests/tracer/test_single_span_sampling_rules.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ def test_max_per_sec_with_is_allowed_check():
336336
tracer = DummyTracer(rule)
337337
while True:
338338
span = traced_function(rule, tracer)
339-
if not rule._limiter.is_allowed(span.start_ns):
339+
if not rule._limiter.is_allowed():
340340
break
341341
assert_sampling_decision_tags(span, limit=2)
342342

0 commit comments

Comments
 (0)