Skip to content

Commit 104a441

Browse files
apply rate limit to queue events (#7823)
* apply rate limit to queue events * test for PerRecordingRateLimiter * graal native image * remove debugging code * no queue time on graal native image - requires too much build time initialisation to support rate limiting
1 parent c1afabf commit 104a441

File tree

7 files changed

+106
-16
lines changed

7 files changed

+106
-16
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/java/concurrent/QueueTimerHelper.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,50 @@
11
package datadog.trace.bootstrap.instrumentation.java.concurrent;
22

3+
import datadog.trace.api.Platform;
4+
import datadog.trace.api.config.ProfilingConfig;
35
import datadog.trace.api.profiling.QueueTiming;
46
import datadog.trace.api.profiling.Timer;
7+
import datadog.trace.api.sampling.PerRecordingRateLimiter;
58
import datadog.trace.bootstrap.ContextStore;
9+
import datadog.trace.bootstrap.config.provider.ConfigProvider;
610
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
711
import datadog.trace.bootstrap.instrumentation.jfr.InstrumentationBasedProfiling;
12+
import java.time.Duration;
13+
import java.time.temporal.ChronoUnit;
814

915
public class QueueTimerHelper {
1016

17+
private static final class RateLimiterHolder {
18+
// indirection to prevent needing to instantiate the class and its transitive dependencies
19+
// in graal native image
20+
private static final PerRecordingRateLimiter RATE_LIMITER =
21+
new PerRecordingRateLimiter(
22+
Duration.of(500, ChronoUnit.MILLIS),
23+
10_000, // hard limit on queue events
24+
Duration.ofSeconds(
25+
ConfigProvider.getInstance()
26+
.getInteger(
27+
ProfilingConfig.PROFILING_UPLOAD_PERIOD,
28+
ProfilingConfig.PROFILING_UPLOAD_PERIOD_DEFAULT)));
29+
}
30+
1131
public static <T> void startQueuingTimer(
1232
ContextStore<T, State> taskContextStore, Class<?> schedulerClass, T task) {
1333
State state = taskContextStore.get(task);
1434
startQueuingTimer(state, schedulerClass, task);
1535
}
1636

1737
public static void startQueuingTimer(State state, Class<?> schedulerClass, Object task) {
38+
if (Platform.isNativeImageBuilder()) {
39+
// explicitly not supported for Graal native image
40+
return;
41+
}
1842
// avoid calling this before JFR is initialised because it will lead to reading the wrong
1943
// TSC frequency before JFR has set it up properly
20-
if (task != null && state != null && InstrumentationBasedProfiling.isJFRReady()) {
44+
if (task != null
45+
&& state != null
46+
&& InstrumentationBasedProfiling.isJFRReady()
47+
&& RateLimiterHolder.RATE_LIMITER.permit()) {
2148
QueueTiming timing =
2249
(QueueTiming) AgentTracer.get().getProfilingContext().start(Timer.TimerType.QUEUEING);
2350
timing.setTask(task);

dd-java-agent/agent-bootstrap/src/main/java11/datadog/trace/bootstrap/instrumentation/jfr/WindowSampler.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import datadog.trace.api.sampling.AdaptiveSampler;
44
import java.time.Duration;
5-
import java.time.temporal.ChronoUnit;
65
import jdk.jfr.Event;
76
import jdk.jfr.EventType;
87

@@ -24,18 +23,4 @@ public void start() {
2423
public boolean sample() {
2524
return sampleType.isEnabled() && sampler.sample();
2625
}
27-
28-
protected static int samplingWindowsPerRecording(
29-
long uploadPeriodSeconds, Duration samplingWindow) {
30-
/*
31-
* Java8 doesn't have dividedBy#Duration so we have to implement poor man's version.
32-
* None of these durations should be big enough to warrant dealing with bigints.
33-
* We also do not care about nanoseconds here.
34-
*/
35-
return (int)
36-
Math.min(
37-
Duration.of(uploadPeriodSeconds, ChronoUnit.SECONDS).toMillis()
38-
/ samplingWindow.toMillis(),
39-
Integer.MAX_VALUE);
40-
}
4126
}

dd-java-agent/agent-bootstrap/src/main/java11/datadog/trace/bootstrap/instrumentation/jfr/backpressure/BackpressureSampler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.bootstrap.instrumentation.jfr.backpressure;
22

3+
import static datadog.trace.api.sampling.PerRecordingRateLimiter.samplingWindowsPerRecording;
4+
35
import datadog.trace.api.Config;
46
import datadog.trace.bootstrap.instrumentation.jfr.WindowSampler;
57
import java.time.Duration;

dd-java-agent/agent-bootstrap/src/main/java11/datadog/trace/bootstrap/instrumentation/jfr/directallocation/DirectAllocationSampler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.bootstrap.instrumentation.jfr.directallocation;
22

3+
import static datadog.trace.api.sampling.PerRecordingRateLimiter.samplingWindowsPerRecording;
4+
35
import datadog.trace.api.Config;
46
import datadog.trace.bootstrap.instrumentation.jfr.WindowSampler;
57
import java.time.Duration;

dd-java-agent/agent-bootstrap/src/main/java11/datadog/trace/bootstrap/instrumentation/jfr/exceptions/ExceptionSampler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.bootstrap.instrumentation.jfr.exceptions;
22

3+
import static datadog.trace.api.sampling.PerRecordingRateLimiter.samplingWindowsPerRecording;
4+
35
import datadog.trace.api.Config;
46
import datadog.trace.bootstrap.instrumentation.jfr.WindowSampler;
57
import java.time.Duration;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package datadog.trace.api.sampling;
2+
3+
import java.time.Duration;
4+
import java.time.temporal.ChronoUnit;
5+
6+
public class PerRecordingRateLimiter {
7+
8+
private final AdaptiveSampler sampler;
9+
10+
public PerRecordingRateLimiter(Duration windowDuration, int limit, Duration recordingLength) {
11+
this(windowDuration, limit, recordingLength, 16);
12+
}
13+
14+
public PerRecordingRateLimiter(
15+
Duration windowDuration, int limit, Duration recordingLength, int budgetLookback) {
16+
int lookback = samplingWindowsPerRecording(recordingLength.getSeconds(), windowDuration);
17+
int samplesPerWindow =
18+
limit / samplingWindowsPerRecording(recordingLength.getSeconds(), windowDuration);
19+
sampler = new AdaptiveSampler(windowDuration, samplesPerWindow, lookback, budgetLookback, true);
20+
}
21+
22+
public boolean permit() {
23+
return sampler.sample();
24+
}
25+
26+
public static int samplingWindowsPerRecording(long uploadPeriodSeconds, Duration samplingWindow) {
27+
/*
28+
* Java8 doesn't have dividedBy#Duration so we have to implement poor man's version.
29+
* None of these durations should be big enough to warrant dealing with bigints.
30+
* We also do not care about nanoseconds here.
31+
*/
32+
return (int)
33+
Math.min(
34+
Duration.of(uploadPeriodSeconds, ChronoUnit.SECONDS).toMillis()
35+
/ samplingWindow.toMillis(),
36+
Integer.MAX_VALUE);
37+
}
38+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package datadog.trace.api.sampling;
2+
3+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
6+
import java.time.Duration;
7+
import java.util.Arrays;
8+
import org.junit.jupiter.api.Test;
9+
10+
public class PerRecordingRateLimiterTest {
11+
12+
@Test
13+
public void testLimitApplied() {
14+
Duration window = Duration.ofMillis(100);
15+
int limit = 20;
16+
Duration recordingDurationSeconds = Duration.ofSeconds(1);
17+
PerRecordingRateLimiter rateLimiter =
18+
new PerRecordingRateLimiter(window, limit, recordingDurationSeconds, 5);
19+
// no rate limiting is applied during the first window
20+
int[] slots = new int[(int) (recordingDurationSeconds.toMillis() / window.toMillis())];
21+
long start = System.nanoTime();
22+
while (true) {
23+
int slot = (int) (NANOSECONDS.toMillis(System.nanoTime() - start) / window.toMillis());
24+
if (slot >= slots.length) {
25+
break;
26+
}
27+
if (rateLimiter.permit()) {
28+
slots[slot]++;
29+
}
30+
}
31+
assertTrue(Arrays.stream(slots).max().orElse(limit + 1) <= limit);
32+
assertTrue(Arrays.stream(slots).sum() <= 2 * limit);
33+
}
34+
}

0 commit comments

Comments
 (0)