Skip to content

Add telemetry for the RUM injector #9267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d19d27e
Add RumInjectorHealthMetrics
sarahchen6 Jul 28, 2025
e01d1a9
Add telemetry collector and methods to RumInjector
sarahchen6 Jul 28, 2025
8c71ef6
Initialize health metrics and telemetry collector
sarahchen6 Jul 28, 2025
6c01e10
Get injectionsucceed count
sarahchen6 Jul 28, 2025
ceb3e11
Add comments
sarahchen6 Jul 28, 2025
92f0c54
Reorganize classes
sarahchen6 Jul 31, 2025
c5c860f
Connect rum injector, telemetry collector, and statsdclient
sarahchen6 Jul 31, 2025
3d4ac53
Add tests
sarahchen6 Aug 1, 2025
3fd498d
Get and test metrics for injection failures and skips
sarahchen6 Aug 1, 2025
14c338b
Add Content Security Policy and HTTP response size telemetry
sarahchen6 Aug 1, 2025
c5b5389
Add injection duration telemetry
sarahchen6 Aug 1, 2025
feb40be
Fix some things
sarahchen6 Aug 2, 2025
23e0455
Fix content-length retrieval and add test for injection timing
sarahchen6 Aug 6, 2025
07db174
Add injection initialization success telemetry
sarahchen6 Aug 7, 2025
a5352ff
Fix CoreTracer compilation with InstrumenterConfig
sarahchen6 Aug 8, 2025
2a1f676
Add tags to all metrics
sarahchen6 Aug 8, 2025
b32874c
Update InjectingPipeOutputStreamTest
sarahchen6 Aug 8, 2025
7a5cd68
Tweaks
sarahchen6 Aug 8, 2025
412a3f8
Address jacoco coverage and injectingpipeoutstream interface updates
sarahchen6 Aug 9, 2025
b3fcde4
Add content-length detection for InjectingPipeWriter and improve tests
sarahchen6 Aug 11, 2025
4b2f6b3
Address review comments
sarahchen6 Aug 11, 2025
3e462d4
Fix header retrieval
sarahchen6 Aug 12, 2025
b1ab871
Add lots of improvements from review comments
sarahchen6 Aug 12, 2025
93bd0a4
Fix constructors and address review comment
sarahchen6 Aug 13, 2025
5a24d7d
Merge branch 'master' into sarahchen6/implement-rum-injector-telemetry
sarahchen6 Aug 13, 2025
b20d6c4
Clarify bytes written and address review comment
sarahchen6 Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class InjectingPipeOutputStreamBenchmark {
public void withPipe() throws Exception {
try (final PrintWriter out =
new PrintWriter(
new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content, null))) {
new InjectingPipeOutputStream(
new ByteArrayOutputStream(), marker, content, null, null))) {
htmlContent.forEach(out::println);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.LongConsumer;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand All @@ -23,18 +24,22 @@ public class InjectingPipeOutputStream extends OutputStream {
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final OutputStream downstream;
private final LongConsumer onBytesWritten;
private long bytesWritten = 0;

/**
* @param downstream the delegate output stream
* @param marker the marker to find in the stream. Must at least be one byte.
* @param contentToInject the content to inject once before the marker if found.
* @param onContentInjected callback called when and if the content is injected.
* @param onBytesWritten callback called when stream is closed to report total bytes written.
*/
public InjectingPipeOutputStream(
final OutputStream downstream,
final byte[] marker,
final byte[] contentToInject,
final Runnable onContentInjected) {
final Runnable onContentInjected,
final LongConsumer onBytesWritten) {
this.downstream = downstream;
this.marker = marker;
this.lookbehind = new byte[marker.length];
Expand All @@ -46,11 +51,13 @@ public InjectingPipeOutputStream(
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.onBytesWritten = onBytesWritten;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

@Override
public void write(int b) throws IOException {
bytesWritten++;
if (!filter) {
if (wasDraining) {
// continue draining
Expand Down Expand Up @@ -85,6 +92,7 @@ public void write(int b) throws IOException {

@Override
public void write(byte[] array, int off, int len) throws IOException {
bytesWritten += len;
if (!filter) {
if (wasDraining) {
// needs drain
Expand Down Expand Up @@ -113,6 +121,7 @@ public void write(byte[] array, int off, int len) throws IOException {
// we don't have a full match. write everything in a bulk except the lookbehind buffer
// sequentially
for (int i = off; i < off + marker.length - 1; i++) {
bytesWritten--; // avoid double counting
write(array[i]);
}
drain();
Expand All @@ -123,12 +132,14 @@ public void write(byte[] array, int off, int len) throws IOException {
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
filter = wasFiltering;
for (int i = len - marker.length + 1; i < len; i++) {
bytesWritten--; // avoid double counting
write(array[i]);
}
}
} else {
// use slow path because the length to write is small and within the lookbehind buffer size
for (int i = off; i < off + len; i++) {
bytesWritten--; // avoid double counting
write(array[i]);
}
}
Expand Down Expand Up @@ -185,6 +196,11 @@ public void flush() throws IOException {
public void close() throws IOException {
try {
commit();
// report the size of the original HTTP response before injecting via callback
if (onBytesWritten != null) {
onBytesWritten.accept(bytesWritten);
}
bytesWritten = 0;
} finally {
downstream.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.Writer;
import java.util.function.LongConsumer;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand All @@ -23,18 +24,22 @@ public class InjectingPipeWriter extends Writer {
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final Writer downstream;
private final LongConsumer onBytesWritten;
private long bytesWritten = 0;

/**
* @param downstream the delegate writer
* @param marker the marker to find in the stream. Must at least be one char.
* @param contentToInject the content to inject once before the marker if found.
* @param onContentInjected callback called when and if the content is injected.
* @param onBytesWritten callback called when writer is closed to report total bytes written.
*/
public InjectingPipeWriter(
final Writer downstream,
final char[] marker,
final char[] contentToInject,
final Runnable onContentInjected) {
final Runnable onContentInjected,
final LongConsumer onBytesWritten) {
this.downstream = downstream;
this.marker = marker;
this.lookbehind = new char[marker.length];
Expand All @@ -46,11 +51,13 @@ public InjectingPipeWriter(
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.onBytesWritten = onBytesWritten;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

@Override
public void write(int c) throws IOException {
bytesWritten++;
if (!filter) {
if (wasDraining) {
// continue draining
Expand Down Expand Up @@ -85,6 +92,7 @@ public void write(int c) throws IOException {

@Override
public void write(char[] array, int off, int len) throws IOException {
bytesWritten += len;
if (!filter) {
if (wasDraining) {
// needs drain
Expand Down Expand Up @@ -113,6 +121,7 @@ public void write(char[] array, int off, int len) throws IOException {
// we don't have a full match. write everything in a bulk except the lookbehind buffer
// sequentially
for (int i = off; i < off + marker.length - 1; i++) {
bytesWritten--; // avoid double counting
write(array[i]);
}
drain();
Expand All @@ -124,12 +133,14 @@ public void write(char[] array, int off, int len) throws IOException {
filter = wasFiltering;

for (int i = len - marker.length + 1; i < len; i++) {
bytesWritten--; // avoid double counting
write(array[i]);
}
}
} else {
// use slow path because the length to write is small and within the lookbehind buffer size
for (int i = off; i < off + len; i++) {
bytesWritten--; // avoid double counting
write(array[i]);
}
}
Expand Down Expand Up @@ -188,6 +199,11 @@ public void close() throws IOException {
commit();
} finally {
downstream.close();
// report the size of the original HTTP response before injecting via callback
if (onBytesWritten != null) {
onBytesWritten.accept(bytesWritten);
}
bytesWritten = 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
def 'should filter a buffer and inject if found #found'() {
setup:
def downstream = new ByteArrayOutputStream()
def piped = new OutputStreamWriter(new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null),
def piped = new OutputStreamWriter(new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null, null),
"UTF-8")
when:
try (def closeme = piped) {
Expand All @@ -55,7 +55,7 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
setup:
def baos = new ByteArrayOutputStream()
def downstream = new GlitchedOutputStream(baos, glichesAt)
def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null)
def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null, null)
when:
try {
for (String line : body) {
Expand Down Expand Up @@ -87,4 +87,94 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
// expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content
["<foo/>"] | "<longerThanFoo>" | "<nothing>" | 3 | "<f"
}

def 'should count bytes correctly when writing byte arrays'() {
setup:
def downstream = new ByteArrayOutputStream()
def bytesWritten = []
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, onBytesWritten)

when:
piped.write("test content".getBytes("UTF-8"))
piped.close()

then:
bytesWritten.size() == 1
bytesWritten[0] == 12
downstream.toByteArray() == "test content".getBytes("UTF-8")
}

def 'should count bytes correctly when writing bytes individually'() {
setup:
def downstream = new ByteArrayOutputStream()
def bytesWritten = []
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, onBytesWritten)

when:
def bytes = "test".getBytes("UTF-8")
for (int i = 0; i < bytes.length; i++) {
piped.write((int) bytes[i])
}
piped.close()

then:
bytesWritten.size() == 1
bytesWritten[0] == 4
downstream.toByteArray() == "test".getBytes("UTF-8")
}

def 'should count bytes correctly with multiple writes'() {
setup:
def downstream = new ByteArrayOutputStream()
def bytesWritten = []
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, onBytesWritten)

when:
piped.write("test".getBytes("UTF-8"))
piped.write(" ".getBytes("UTF-8"))
piped.write("content".getBytes("UTF-8"))
piped.close()

then:
bytesWritten.size() == 1
bytesWritten[0] == 12
downstream.toByteArray() == "test content".getBytes("UTF-8")
}

def 'should be resilient to exceptions when onBytesWritten callback is null'() {
setup:
def downstream = new ByteArrayOutputStream()
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, null)

when:
piped.write("test content".getBytes("UTF-8"))
piped.close()

then:
noExceptionThrown()
downstream.toByteArray() == "test content".getBytes("UTF-8")
}

def 'should reset byte count after close'() {
setup:
def downstream = new ByteArrayOutputStream()
def bytesWritten = []
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, onBytesWritten)

when:
piped.write("test".getBytes("UTF-8"))
piped.close()

piped.write("content".getBytes("UTF-8"))
piped.close()

then:
bytesWritten.size() == 2
bytesWritten[0] == 4
bytesWritten[1] == 7
}
}
Loading