Skip to content

Commit 9bade09

Browse files
authored
Add telemetry for the RUM injector (#9267)
* Add RumInjectorHealthMetrics * Add telemetry collector and methods to RumInjector * Initialize health metrics and telemetry collector * Get injectionsucceed count * Add comments * Reorganize classes * Connect rum injector, telemetry collector, and statsdclient * Add tests * Get and test metrics for injection failures and skips * Add Content Security Policy and HTTP response size telemetry * Add injection duration telemetry * Fix some things * Fix content-length retrieval and add test for injection timing * Add injection initialization success telemetry * Fix CoreTracer compilation with InstrumenterConfig * Add tags to all metrics * Update InjectingPipeOutputStreamTest * Tweaks * Address jacoco coverage and injectingpipeoutstream interface updates * Add content-length detection for InjectingPipeWriter and improve tests * Address review comments * Fix header retrieval * Add lots of improvements from review comments * Fix constructors and address review comment * Clarify bytes written and address review comment * Use dynamic servlet version retrieval * Change injection timing logic * Clean up * Use dynamic servlet version retrieval for tagging as well * Add a telemetry check to HttpServerTest * Clean up
1 parent 27cd7f4 commit 9bade09

File tree

24 files changed

+1966
-57
lines changed

24 files changed

+1966
-57
lines changed

dd-java-agent/agent-bootstrap/src/jmh/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class InjectingPipeOutputStreamBenchmark {
5151
public void withPipe() throws Exception {
5252
try (final PrintWriter out =
5353
new PrintWriter(
54-
new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content, null))) {
54+
new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content))) {
5555
htmlContent.forEach(out::println);
5656
}
5757
}

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.IOException;
44
import java.io.OutputStream;
5+
import java.util.function.LongConsumer;
56
import javax.annotation.concurrent.NotThreadSafe;
67

78
/**
@@ -23,18 +24,41 @@ public class InjectingPipeOutputStream extends OutputStream {
2324
private final Runnable onContentInjected;
2425
private final int bulkWriteThreshold;
2526
private final OutputStream downstream;
27+
private final LongConsumer onBytesWritten;
28+
private final LongConsumer onInjectionTime;
29+
private long bytesWritten = 0;
2630

2731
/**
32+
* This constructor is typically used for testing where we care about the logic and not the
33+
* telemetry.
34+
*
35+
* @param downstream the delegate output stream
36+
* @param marker the marker to find in the stream. Must at least be one byte.
37+
* @param contentToInject the content to inject once before the marker if found.
38+
*/
39+
public InjectingPipeOutputStream(
40+
final OutputStream downstream, final byte[] marker, final byte[] contentToInject) {
41+
this(downstream, marker, contentToInject, null, null, null);
42+
}
43+
44+
/**
45+
* This constructor contains the full set of parameters.
46+
*
2847
* @param downstream the delegate output stream
2948
* @param marker the marker to find in the stream. Must at least be one byte.
3049
* @param contentToInject the content to inject once before the marker if found.
3150
* @param onContentInjected callback called when and if the content is injected.
51+
* @param onBytesWritten callback called when stream is closed to report total bytes written.
52+
* @param onInjectionTime callback called with the time in milliseconds taken to write the
53+
* injection content.
3254
*/
3355
public InjectingPipeOutputStream(
3456
final OutputStream downstream,
3557
final byte[] marker,
3658
final byte[] contentToInject,
37-
final Runnable onContentInjected) {
59+
final Runnable onContentInjected,
60+
final LongConsumer onBytesWritten,
61+
final LongConsumer onInjectionTime) {
3862
this.downstream = downstream;
3963
this.marker = marker;
4064
this.lookbehind = new byte[marker.length];
@@ -46,6 +70,8 @@ public InjectingPipeOutputStream(
4670
this.filter = true;
4771
this.contentToInject = contentToInject;
4872
this.onContentInjected = onContentInjected;
73+
this.onBytesWritten = onBytesWritten;
74+
this.onInjectionTime = onInjectionTime;
4975
this.bulkWriteThreshold = marker.length * 2 - 2;
5076
}
5177

@@ -57,11 +83,13 @@ public void write(int b) throws IOException {
5783
drain();
5884
}
5985
downstream.write(b);
86+
bytesWritten++;
6087
return;
6188
}
6289

6390
if (count == lookbehind.length) {
6491
downstream.write(lookbehind[pos]);
92+
bytesWritten++;
6593
} else {
6694
count++;
6795
}
@@ -72,7 +100,12 @@ public void write(int b) throws IOException {
72100
if (marker[matchingPos++] == b) {
73101
if (matchingPos == marker.length) {
74102
filter = false;
103+
long injectionStart = System.nanoTime();
75104
downstream.write(contentToInject);
105+
long injectionEnd = System.nanoTime();
106+
if (onInjectionTime != null) {
107+
onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L);
108+
}
76109
if (onContentInjected != null) {
77110
onContentInjected.run();
78111
}
@@ -91,6 +124,7 @@ public void write(byte[] array, int off, int len) throws IOException {
91124
drain();
92125
}
93126
downstream.write(array, off, len);
127+
bytesWritten += len;
94128
return;
95129
}
96130

@@ -103,12 +137,21 @@ public void write(byte[] array, int off, int len) throws IOException {
103137
// we have a full match. just write everything
104138
filter = false;
105139
drain();
106-
downstream.write(array, off, idx);
140+
int bytesToWrite = idx;
141+
downstream.write(array, off, bytesToWrite);
142+
bytesWritten += bytesToWrite;
143+
long injectionStart = System.nanoTime();
107144
downstream.write(contentToInject);
145+
long injectionEnd = System.nanoTime();
146+
if (onInjectionTime != null) {
147+
onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L);
148+
}
108149
if (onContentInjected != null) {
109150
onContentInjected.run();
110151
}
111-
downstream.write(array, off + idx, len - idx);
152+
bytesToWrite = len - idx;
153+
downstream.write(array, off + idx, bytesToWrite);
154+
bytesWritten += bytesToWrite;
112155
} else {
113156
// we don't have a full match. write everything in a bulk except the lookbehind buffer
114157
// sequentially
@@ -120,7 +163,9 @@ public void write(byte[] array, int off, int len) throws IOException {
120163

121164
// will be reset if no errors after the following write
122165
filter = false;
123-
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
166+
int bytesToWrite = len - bulkWriteThreshold;
167+
downstream.write(array, off + marker.length - 1, bytesToWrite);
168+
bytesWritten += bytesToWrite;
124169
filter = wasFiltering;
125170
for (int i = len - marker.length + 1; i < len; i++) {
126171
write(array[i]);
@@ -163,6 +208,7 @@ private void drain() throws IOException {
163208
int cnt = count;
164209
for (int i = 0; i < cnt; i++) {
165210
downstream.write(lookbehind[(start + i) % lookbehind.length]);
211+
bytesWritten++;
166212
count--;
167213
}
168214
filter = wasFiltering;
@@ -185,6 +231,11 @@ public void flush() throws IOException {
185231
public void close() throws IOException {
186232
try {
187233
commit();
234+
// report the size of the original HTTP response before injecting via callback
235+
if (onBytesWritten != null) {
236+
onBytesWritten.accept(bytesWritten);
237+
}
238+
bytesWritten = 0;
188239
} finally {
189240
downstream.close();
190241
}

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.IOException;
44
import java.io.Writer;
5+
import java.util.function.LongConsumer;
56
import javax.annotation.concurrent.NotThreadSafe;
67

78
/**
@@ -23,18 +24,41 @@ public class InjectingPipeWriter extends Writer {
2324
private final Runnable onContentInjected;
2425
private final int bulkWriteThreshold;
2526
private final Writer downstream;
27+
private final LongConsumer onBytesWritten;
28+
private final LongConsumer onInjectionTime;
29+
private long bytesWritten = 0;
2630

2731
/**
32+
* This constructor is typically used for testing where we care about the logic and not the
33+
* telemetry.
34+
*
35+
* @param downstream the delegate writer
36+
* @param marker the marker to find in the stream. Must at least be one char.
37+
* @param contentToInject the content to inject once before the marker if found.
38+
*/
39+
public InjectingPipeWriter(
40+
final Writer downstream, final char[] marker, final char[] contentToInject) {
41+
this(downstream, marker, contentToInject, null, null, null);
42+
}
43+
44+
/**
45+
* This constructor contains the full set of parameters.
46+
*
2847
* @param downstream the delegate writer
2948
* @param marker the marker to find in the stream. Must at least be one char.
3049
* @param contentToInject the content to inject once before the marker if found.
3150
* @param onContentInjected callback called when and if the content is injected.
51+
* @param onBytesWritten callback called when writer is closed to report total bytes written.
52+
* @param onInjectionTime callback called with the time in milliseconds taken to write the
53+
* injection content.
3254
*/
3355
public InjectingPipeWriter(
3456
final Writer downstream,
3557
final char[] marker,
3658
final char[] contentToInject,
37-
final Runnable onContentInjected) {
59+
final Runnable onContentInjected,
60+
final LongConsumer onBytesWritten,
61+
final LongConsumer onInjectionTime) {
3862
this.downstream = downstream;
3963
this.marker = marker;
4064
this.lookbehind = new char[marker.length];
@@ -46,6 +70,8 @@ public InjectingPipeWriter(
4670
this.filter = true;
4771
this.contentToInject = contentToInject;
4872
this.onContentInjected = onContentInjected;
73+
this.onBytesWritten = onBytesWritten;
74+
this.onInjectionTime = onInjectionTime;
4975
this.bulkWriteThreshold = marker.length * 2 - 2;
5076
}
5177

@@ -57,11 +83,13 @@ public void write(int c) throws IOException {
5783
drain();
5884
}
5985
downstream.write(c);
86+
bytesWritten++;
6087
return;
6188
}
6289

6390
if (count == lookbehind.length) {
6491
downstream.write(lookbehind[pos]);
92+
bytesWritten++;
6593
} else {
6694
count++;
6795
}
@@ -72,7 +100,12 @@ public void write(int c) throws IOException {
72100
if (marker[matchingPos++] == c) {
73101
if (matchingPos == marker.length) {
74102
filter = false;
103+
long injectionStart = System.nanoTime();
75104
downstream.write(contentToInject);
105+
long injectionEnd = System.nanoTime();
106+
if (onInjectionTime != null) {
107+
onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L);
108+
}
76109
if (onContentInjected != null) {
77110
onContentInjected.run();
78111
}
@@ -91,6 +124,7 @@ public void write(char[] array, int off, int len) throws IOException {
91124
drain();
92125
}
93126
downstream.write(array, off, len);
127+
bytesWritten += len;
94128
return;
95129
}
96130

@@ -103,12 +137,21 @@ public void write(char[] array, int off, int len) throws IOException {
103137
// we have a full match. just write everything
104138
filter = false;
105139
drain();
106-
downstream.write(array, off, idx);
140+
int bytesToWrite = idx;
141+
downstream.write(array, off, bytesToWrite);
142+
bytesWritten += bytesToWrite;
143+
long injectionStart = System.nanoTime();
107144
downstream.write(contentToInject);
145+
long injectionEnd = System.nanoTime();
146+
if (onInjectionTime != null) {
147+
onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L);
148+
}
108149
if (onContentInjected != null) {
109150
onContentInjected.run();
110151
}
111-
downstream.write(array, off + idx, len - idx);
152+
bytesToWrite = len - idx;
153+
downstream.write(array, off + idx, bytesToWrite);
154+
bytesWritten += bytesToWrite;
112155
} else {
113156
// we don't have a full match. write everything in a bulk except the lookbehind buffer
114157
// sequentially
@@ -120,7 +163,9 @@ public void write(char[] array, int off, int len) throws IOException {
120163

121164
// will be reset if no errors after the following write
122165
filter = false;
123-
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
166+
int bytesToWrite = len - bulkWriteThreshold;
167+
downstream.write(array, off + marker.length - 1, bytesToWrite);
168+
bytesWritten += bytesToWrite;
124169
filter = wasFiltering;
125170

126171
for (int i = len - marker.length + 1; i < len; i++) {
@@ -164,6 +209,7 @@ private void drain() throws IOException {
164209
int cnt = count;
165210
for (int i = 0; i < cnt; i++) {
166211
downstream.write(lookbehind[(start + i) % lookbehind.length]);
212+
bytesWritten++;
167213
count--;
168214
}
169215
filter = wasFiltering;
@@ -188,6 +234,11 @@ public void close() throws IOException {
188234
commit();
189235
} finally {
190236
downstream.close();
237+
// report the size of the original HTTP response before injecting via callback
238+
if (onBytesWritten != null) {
239+
onBytesWritten.accept(bytesWritten);
240+
}
241+
bytesWritten = 0;
191242
}
192243
}
193244

0 commit comments

Comments
 (0)