Skip to content

Commit dc92b28

Browse files
committed
Add RumInjectorHealthMetrics
1 parent e82c8b5 commit dc92b28

File tree

3 files changed

+367
-0
lines changed

3 files changed

+367
-0
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package datadog.trace.core.monitor;
2+
3+
import static java.util.concurrent.TimeUnit.SECONDS;
4+
5+
import datadog.trace.api.StatsDClient;
6+
import datadog.trace.api.rum.RumTelemetryCollector;
7+
import datadog.trace.util.AgentTaskScheduler;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
import org.jctools.counters.CountersFactory;
11+
import org.jctools.counters.FixedSizeStripedLongCounter;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
// Default implementation of RumInjectorHealthMetrics that reports metrics to StatsDClient
16+
public class DefaultRumInjectorHealthMetrics extends RumInjectorHealthMetrics
17+
implements RumTelemetryCollector {
18+
private static final Logger log = LoggerFactory.getLogger(DefaultRumInjectorHealthMetrics.class);
19+
20+
private static final String[] NO_TAGS = new String[0];
21+
22+
private final AtomicBoolean started = new AtomicBoolean(false);
23+
private volatile AgentTaskScheduler.Scheduled<DefaultRumInjectorHealthMetrics> cancellation;
24+
25+
private final FixedSizeStripedLongCounter injectionSucceed =
26+
CountersFactory.createFixedSizeStripedCounter(8);
27+
private final FixedSizeStripedLongCounter injectionFailed =
28+
CountersFactory.createFixedSizeStripedCounter(8);
29+
private final FixedSizeStripedLongCounter injectionSkipped =
30+
CountersFactory.createFixedSizeStripedCounter(8);
31+
32+
private final StatsDClient statsd;
33+
private final long interval;
34+
private final TimeUnit units;
35+
36+
@Override
37+
public void start() {
38+
if (started.compareAndSet(false, true)) {
39+
cancellation =
40+
AgentTaskScheduler.INSTANCE.scheduleAtFixedRate(
41+
new Flush(), this, interval, interval, units);
42+
}
43+
}
44+
45+
public DefaultRumInjectorHealthMetrics(final StatsDClient statsd) {
46+
this(statsd, 30, SECONDS);
47+
}
48+
49+
public DefaultRumInjectorHealthMetrics(final StatsDClient statsd, long interval, TimeUnit units) {
50+
this.statsd = statsd;
51+
this.interval = interval;
52+
this.units = units;
53+
}
54+
55+
@Override
56+
public void onInjectionSucceed() {
57+
injectionSucceed.inc();
58+
}
59+
60+
@Override
61+
public void onInjectionFailed() {
62+
injectionFailed.inc();
63+
}
64+
65+
@Override
66+
public void onInjectionSkipped() {
67+
injectionSkipped.inc();
68+
}
69+
70+
@Override
71+
public void close() {
72+
if (null != cancellation) {
73+
cancellation.cancel();
74+
}
75+
}
76+
77+
@Override
78+
public String summary() {
79+
return "injectionSucceed="
80+
+ injectionSucceed.get()
81+
+ "\ninjectionFailed="
82+
+ injectionFailed.get()
83+
+ "\ninjectionSkipped="
84+
+ injectionSkipped.get();
85+
}
86+
87+
private static class Flush implements AgentTaskScheduler.Task<DefaultRumInjectorHealthMetrics> {
88+
89+
private final long[] previousCounts = new long[3]; // one per counter
90+
private int countIndex;
91+
92+
@Override
93+
public void run(DefaultRumInjectorHealthMetrics target) {
94+
countIndex = -1;
95+
try {
96+
reportIfChanged(target.statsd, "rum.injection.succeed", target.injectionSucceed, NO_TAGS);
97+
reportIfChanged(target.statsd, "rum.injection.failed", target.injectionFailed, NO_TAGS);
98+
reportIfChanged(target.statsd, "rum.injection.skipped", target.injectionSkipped, NO_TAGS);
99+
} catch (ArrayIndexOutOfBoundsException e) {
100+
log.warn(
101+
"previousCounts array needs resizing to at least {}, was {}",
102+
countIndex + 1,
103+
previousCounts.length);
104+
}
105+
}
106+
107+
private void reportIfChanged(
108+
StatsDClient statsDClient,
109+
String aspect,
110+
FixedSizeStripedLongCounter counter,
111+
String[] tags) {
112+
long count = counter.get();
113+
long delta = count - previousCounts[++countIndex];
114+
if (delta > 0) {
115+
statsDClient.count(aspect, delta, tags);
116+
previousCounts[countIndex] = count;
117+
}
118+
}
119+
}
120+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package datadog.trace.core.monitor;
2+
3+
// Abstract health metrics for RUM injector
4+
// This class defines the interface for monitoring RUM injection operations
5+
public abstract class RumInjectorHealthMetrics implements AutoCloseable {
6+
public static RumInjectorHealthMetrics NO_OP = new RumInjectorHealthMetrics() {};
7+
8+
public void start() {}
9+
10+
public void onInjectionSucceed() {}
11+
12+
public void onInjectionFailed() {}
13+
14+
public void onInjectionSkipped() {}
15+
16+
/** @return Human-readable summary of the current health metrics. */
17+
public String summary() {
18+
return "";
19+
}
20+
21+
@Override
22+
public void close() {}
23+
}
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
package datadog.trace.core.monitor
2+
3+
import datadog.trace.api.StatsDClient
4+
import spock.lang.Specification
5+
import spock.lang.Subject
6+
7+
import java.util.concurrent.CountDownLatch
8+
import java.util.concurrent.TimeUnit
9+
10+
class RumInjectorHealthMetricsTest extends Specification {
11+
def statsD = Mock(StatsDClient)
12+
13+
@Subject
14+
def healthMetrics = new DefaultRumInjectorHealthMetrics(statsD)
15+
16+
def "test onInjectionSucceed"() {
17+
setup:
18+
def latch = new CountDownLatch(1)
19+
def healthMetrics = new DefaultRumInjectorHealthMetrics(new Latched(statsD, latch), 10, TimeUnit.MILLISECONDS)
20+
healthMetrics.start()
21+
22+
when:
23+
healthMetrics.onInjectionSucceed()
24+
latch.await(5, TimeUnit.SECONDS)
25+
26+
then:
27+
1 * statsD.count('rum.injection.succeed', 1, _)
28+
0 * _
29+
30+
cleanup:
31+
healthMetrics.close()
32+
}
33+
34+
def "test onInjectionFailed"() {
35+
setup:
36+
def latch = new CountDownLatch(1)
37+
def healthMetrics = new DefaultRumInjectorHealthMetrics(new Latched(statsD, latch), 10, TimeUnit.MILLISECONDS)
38+
healthMetrics.start()
39+
40+
when:
41+
healthMetrics.onInjectionFailed()
42+
latch.await(5, TimeUnit.SECONDS)
43+
44+
then:
45+
1 * statsD.count('rum.injection.failed', 1, _)
46+
0 * _
47+
48+
cleanup:
49+
healthMetrics.close()
50+
}
51+
52+
def "test onInjectionSkipped"() {
53+
setup:
54+
def latch = new CountDownLatch(1)
55+
def healthMetrics = new DefaultRumInjectorHealthMetrics(new Latched(statsD, latch), 10, TimeUnit.MILLISECONDS)
56+
healthMetrics.start()
57+
58+
when:
59+
healthMetrics.onInjectionSkipped()
60+
latch.await(5, TimeUnit.SECONDS)
61+
62+
then:
63+
1 * statsD.count('rum.injection.skipped', 1, _)
64+
0 * _
65+
66+
cleanup:
67+
healthMetrics.close()
68+
}
69+
70+
def "test multiple events"() {
71+
setup:
72+
def latch = new CountDownLatch(3) // expecting 3 metric types
73+
def healthMetrics = new DefaultRumInjectorHealthMetrics(new Latched(statsD, latch), 10, TimeUnit.MILLISECONDS)
74+
healthMetrics.start()
75+
76+
when:
77+
healthMetrics.onInjectionSucceed()
78+
healthMetrics.onInjectionFailed()
79+
healthMetrics.onInjectionSkipped()
80+
latch.await(5, TimeUnit.SECONDS)
81+
82+
then:
83+
1 * statsD.count('rum.injection.succeed', 1, _)
84+
1 * statsD.count('rum.injection.failed', 1, _)
85+
1 * statsD.count('rum.injection.skipped', 1, _)
86+
0 * _
87+
88+
cleanup:
89+
healthMetrics.close()
90+
}
91+
92+
def "test summary"() {
93+
when:
94+
healthMetrics.onInjectionSucceed()
95+
healthMetrics.onInjectionFailed()
96+
healthMetrics.onInjectionSkipped()
97+
def summary = healthMetrics.summary()
98+
99+
then:
100+
summary.contains("injectionSucceed=1")
101+
summary.contains("injectionFailed=1")
102+
summary.contains("injectionSkipped=1")
103+
0 * _
104+
}
105+
106+
// taken from HealthMetricsTest
107+
private static class Latched implements StatsDClient {
108+
final StatsDClient delegate
109+
final CountDownLatch latch
110+
111+
Latched(StatsDClient delegate, CountDownLatch latch) {
112+
this.delegate = delegate
113+
this.latch = latch
114+
}
115+
116+
@Override
117+
void incrementCounter(String metricName, String... tags) {
118+
try {
119+
delegate.incrementCounter(metricName, tags)
120+
} finally {
121+
latch.countDown()
122+
}
123+
}
124+
125+
@Override
126+
void count(String metricName, long delta, String... tags) {
127+
try {
128+
delegate.count(metricName, delta, tags)
129+
} finally {
130+
latch.countDown()
131+
}
132+
}
133+
134+
@Override
135+
void gauge(String metricName, long value, String... tags) {
136+
try {
137+
delegate.gauge(metricName, value, tags)
138+
} finally {
139+
latch.countDown()
140+
}
141+
}
142+
143+
@Override
144+
void gauge(String metricName, double value, String... tags) {
145+
try {
146+
delegate.gauge(metricName, value, tags)
147+
} finally {
148+
latch.countDown()
149+
}
150+
}
151+
152+
@Override
153+
void histogram(String metricName, long value, String... tags) {
154+
try {
155+
delegate.histogram(metricName, value, tags)
156+
} finally {
157+
latch.countDown()
158+
}
159+
}
160+
161+
@Override
162+
void histogram(String metricName, double value, String... tags) {
163+
try {
164+
delegate.histogram(metricName, value, tags)
165+
} finally {
166+
latch.countDown()
167+
}
168+
}
169+
170+
@Override
171+
void distribution(String metricName, long value, String... tags) {
172+
try {
173+
delegate.distribution(metricName, value, tags)
174+
} finally {
175+
latch.countDown()
176+
}
177+
}
178+
179+
@Override
180+
void distribution(String metricName, double value, String... tags) {
181+
try {
182+
delegate.distribution(metricName, value, tags)
183+
} finally {
184+
latch.countDown()
185+
}
186+
}
187+
188+
@Override
189+
void serviceCheck(String serviceCheckName, String status, String message, String... tags) {
190+
try {
191+
delegate.serviceCheck(serviceCheckName, status, message, tags)
192+
} finally {
193+
latch.countDown()
194+
}
195+
}
196+
197+
@Override
198+
void error(Exception error) {
199+
try {
200+
delegate.error(error)
201+
} finally {
202+
latch.countDown()
203+
}
204+
}
205+
206+
@Override
207+
int getErrorCount() {
208+
try {
209+
return delegate.getErrorCount()
210+
} finally {
211+
latch.countDown()
212+
}
213+
}
214+
215+
@Override
216+
void close() {
217+
try {
218+
delegate.close()
219+
} finally {
220+
latch.countDown()
221+
}
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)