Skip to content

Add telemetry for the RUM injector #9267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d19d27e
Add RumInjectorHealthMetrics
sarahchen6 Jul 28, 2025
e01d1a9
Add telemetry collector and methods to RumInjector
sarahchen6 Jul 28, 2025
8c71ef6
Initialize health metrics and telemetry collector
sarahchen6 Jul 28, 2025
6c01e10
Get injectionsucceed count
sarahchen6 Jul 28, 2025
ceb3e11
Add comments
sarahchen6 Jul 28, 2025
92f0c54
Reorganize classes
sarahchen6 Jul 31, 2025
c5c860f
Connect rum injector, telemetry collector, and statsdclient
sarahchen6 Jul 31, 2025
3d4ac53
Add tests
sarahchen6 Aug 1, 2025
3fd498d
Get and test metrics for injection failures and skips
sarahchen6 Aug 1, 2025
14c338b
Add Content Security Policy and HTTP response size telemetry
sarahchen6 Aug 1, 2025
c5b5389
Add injection duration telemetry
sarahchen6 Aug 1, 2025
feb40be
Fix some things
sarahchen6 Aug 2, 2025
23e0455
Fix content-length retrieval and add test for injection timing
sarahchen6 Aug 6, 2025
07db174
Add injection initialization success telemetry
sarahchen6 Aug 7, 2025
a5352ff
Fix CoreTracer compilation with InstrumenterConfig
sarahchen6 Aug 8, 2025
2a1f676
Add tags to all metrics
sarahchen6 Aug 8, 2025
b32874c
Update InjectingPipeOutputStreamTest
sarahchen6 Aug 8, 2025
7a5cd68
Tweaks
sarahchen6 Aug 8, 2025
412a3f8
Address jacoco coverage and injectingpipeoutstream interface updates
sarahchen6 Aug 9, 2025
b3fcde4
Add content-length detection for InjectingPipeWriter and improve tests
sarahchen6 Aug 11, 2025
4b2f6b3
Address review comments
sarahchen6 Aug 11, 2025
3e462d4
Fix header retrieval
sarahchen6 Aug 12, 2025
b1ab871
Add lots of improvements from review comments
sarahchen6 Aug 12, 2025
93bd0a4
Fix constructors and address review comment
sarahchen6 Aug 13, 2025
5a24d7d
Merge branch 'master' into sarahchen6/implement-rum-injector-telemetry
sarahchen6 Aug 13, 2025
b20d6c4
Clarify bytes written and address review comment
sarahchen6 Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.LongConsumer;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand All @@ -23,6 +24,8 @@ public class InjectingPipeOutputStream extends OutputStream {
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final OutputStream downstream;
private final LongConsumer onBytesWritten;
private long bytesWritten = 0;

/**
* @param downstream the delegate output stream
Expand All @@ -35,6 +38,22 @@ public InjectingPipeOutputStream(
final byte[] marker,
final byte[] contentToInject,
final Runnable onContentInjected) {
this(downstream, marker, contentToInject, onContentInjected, null);
}

/**
* @param downstream the delegate output stream
* @param marker the marker to find in the stream. Must at least be one byte.
* @param contentToInject the content to inject once before the marker if found.
* @param onContentInjected callback called when and if the content is injected.
* @param onBytesWritten callback called when stream is closed to report total bytes written.
*/
public InjectingPipeOutputStream(
final OutputStream downstream,
final byte[] marker,
final byte[] contentToInject,
final Runnable onContentInjected,
final LongConsumer onBytesWritten) {
this.downstream = downstream;
this.marker = marker;
this.lookbehind = new byte[marker.length];
Expand All @@ -46,6 +65,7 @@ public InjectingPipeOutputStream(
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.onBytesWritten = onBytesWritten;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

Expand All @@ -57,11 +77,13 @@ public void write(int b) throws IOException {
drain();
}
downstream.write(b);
bytesWritten++;
return;
}

if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
bytesWritten++;
} else {
count++;
}
Expand Down Expand Up @@ -91,6 +113,7 @@ public void write(byte[] array, int off, int len) throws IOException {
drain();
}
downstream.write(array, off, len);
bytesWritten += len;
return;
}

Expand All @@ -104,11 +127,13 @@ public void write(byte[] array, int off, int len) throws IOException {
filter = false;
drain();
downstream.write(array, off, idx);
bytesWritten += idx;
downstream.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
}
downstream.write(array, off + idx, len - idx);
bytesWritten += (len - idx);
} else {
// we don't have a full match. write everything in a bulk except the lookbehind buffer
// sequentially
Expand All @@ -121,6 +146,7 @@ public void write(byte[] array, int off, int len) throws IOException {
// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
bytesWritten += (len - bulkWriteThreshold);
filter = wasFiltering;
for (int i = len - marker.length + 1; i < len; i++) {
write(array[i]);
Expand Down Expand Up @@ -163,6 +189,7 @@ private void drain() throws IOException {
int cnt = count;
for (int i = 0; i < cnt; i++) {
downstream.write(lookbehind[(start + i) % lookbehind.length]);
bytesWritten++;
count--;
}
filter = wasFiltering;
Expand All @@ -185,6 +212,11 @@ public void flush() throws IOException {
public void close() throws IOException {
try {
commit();
// report the size of the original HTTP response before injecting via callback
if (onBytesWritten != null) {
onBytesWritten.accept(bytesWritten);
}
bytesWritten = 0;
} finally {
downstream.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.Writer;
import java.util.function.LongConsumer;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand All @@ -23,6 +24,8 @@ public class InjectingPipeWriter extends Writer {
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final Writer downstream;
private final LongConsumer onBytesWritten;
private long bytesWritten = 0;

/**
* @param downstream the delegate writer
Expand All @@ -35,6 +38,22 @@ public InjectingPipeWriter(
final char[] marker,
final char[] contentToInject,
final Runnable onContentInjected) {
this(downstream, marker, contentToInject, onContentInjected, null);
}

/**
* @param downstream the delegate writer
* @param marker the marker to find in the stream. Must at least be one char.
* @param contentToInject the content to inject once before the marker if found.
* @param onContentInjected callback called when and if the content is injected.
* @param onBytesWritten callback called when writer is closed to report total bytes written.
*/
public InjectingPipeWriter(
final Writer downstream,
final char[] marker,
final char[] contentToInject,
final Runnable onContentInjected,
final LongConsumer onBytesWritten) {
this.downstream = downstream;
this.marker = marker;
this.lookbehind = new char[marker.length];
Expand All @@ -46,6 +65,7 @@ public InjectingPipeWriter(
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.onBytesWritten = onBytesWritten;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

Expand All @@ -57,11 +77,13 @@ public void write(int c) throws IOException {
drain();
}
downstream.write(c);
bytesWritten++;
return;
}

if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
bytesWritten++;
} else {
count++;
}
Expand Down Expand Up @@ -91,6 +113,7 @@ public void write(char[] array, int off, int len) throws IOException {
drain();
}
downstream.write(array, off, len);
bytesWritten += len;
return;
}

Expand All @@ -104,11 +127,13 @@ public void write(char[] array, int off, int len) throws IOException {
filter = false;
drain();
downstream.write(array, off, idx);
bytesWritten += idx;
downstream.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
}
downstream.write(array, off + idx, len - idx);
bytesWritten += (len - idx);
} else {
// we don't have a full match. write everything in a bulk except the lookbehind buffer
// sequentially
Expand All @@ -121,6 +146,7 @@ public void write(char[] array, int off, int len) throws IOException {
// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
bytesWritten += (len - bulkWriteThreshold);
filter = wasFiltering;

for (int i = len - marker.length + 1; i < len; i++) {
Expand Down Expand Up @@ -164,6 +190,7 @@ private void drain() throws IOException {
int cnt = count;
for (int i = 0; i < cnt; i++) {
downstream.write(lookbehind[(start + i) % lookbehind.length]);
bytesWritten++;
count--;
}
filter = wasFiltering;
Expand All @@ -188,6 +215,11 @@ public void close() throws IOException {
commit();
} finally {
downstream.close();
// report the size of the original HTTP response before injecting via callback
if (onBytesWritten != null) {
onBytesWritten.accept(bytesWritten);
}
bytesWritten = 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package datadog.trace.bootstrap.instrumentation.buffer
import datadog.trace.test.util.DDSpecification

class InjectingPipeOutputStreamTest extends DDSpecification {
static final byte[] MARKER_BYTES = "</head>".getBytes("UTF-8")
static final byte[] CONTEXT_BYTES = "<script></script>".getBytes("UTF-8")

static class GlitchedOutputStream extends FilterOutputStream {
int glitchesPos
int count
Expand Down Expand Up @@ -33,6 +36,14 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
}
}

static class Counter {
int value = 0

def incr(long count) {
this.value += count
}
}

def 'should filter a buffer and inject if found #found'() {
setup:
def downstream = new ByteArrayOutputStream()
Expand Down Expand Up @@ -87,4 +98,74 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
// expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content
["<foo/>"] | "<longerThanFoo>" | "<nothing>" | 3 | "<f"
}

def 'should count bytes correctly when writing byte arrays'() {
setup:
def testBytes = "test content".getBytes("UTF-8")
def downstream = new ByteArrayOutputStream()
def counter = new Counter()
def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES, null, { long bytes -> counter.incr(bytes) })

when:
piped.write(testBytes)
piped.close()

then:
counter.value == 12
downstream.toByteArray() == testBytes
}

def 'should count bytes correctly when writing bytes individually'() {
setup:
def testBytes = "test".getBytes("UTF-8")
def downstream = new ByteArrayOutputStream()
def counter = new Counter()
def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES, null, { long bytes -> counter.incr(bytes) })

when:
for (int i = 0; i < testBytes.length; i++) {
piped.write((int) testBytes[i])
}
piped.close()

then:
counter.value == 4
downstream.toByteArray() == testBytes
}

def 'should count bytes correctly with multiple writes'() {
setup:
def part1 = "test".getBytes("UTF-8")
def part2 = " ".getBytes("UTF-8")
def part3 = "content".getBytes("UTF-8")
def testBytes = "test content".getBytes("UTF-8")
def downstream = new ByteArrayOutputStream()
def counter = new Counter()
def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES, null, { long bytes -> counter.incr(bytes) })

when:
piped.write(part1)
piped.write(part2)
piped.write(part3)
piped.close()

then:
counter.value == 12
downstream.toByteArray() == testBytes
}

def 'should be resilient to exceptions when onBytesWritten callback is null'() {
setup:
def testBytes = "test content".getBytes("UTF-8")
def downstream = new ByteArrayOutputStream()
def piped = new InjectingPipeOutputStream(downstream, MARKER_BYTES, CONTEXT_BYTES, null, null)

when:
piped.write(testBytes)
piped.close()

then:
noExceptionThrown()
downstream.toByteArray() == testBytes
}
}
Loading