Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d19d27e
Add RumInjectorHealthMetrics
sarahchen6 Jul 28, 2025
e01d1a9
Add telemetry collector and methods to RumInjector
sarahchen6 Jul 28, 2025
8c71ef6
Initialize health metrics and telemetry collector
sarahchen6 Jul 28, 2025
6c01e10
Get injectionsucceed count
sarahchen6 Jul 28, 2025
ceb3e11
Add comments
sarahchen6 Jul 28, 2025
92f0c54
Reorganize classes
sarahchen6 Jul 31, 2025
c5c860f
Connect rum injector, telemetry collector, and statsdclient
sarahchen6 Jul 31, 2025
3d4ac53
Add tests
sarahchen6 Aug 1, 2025
3fd498d
Get and test metrics for injection failures and skips
sarahchen6 Aug 1, 2025
14c338b
Add Content Security Policy and HTTP response size telemetry
sarahchen6 Aug 1, 2025
c5b5389
Add injection duration telemetry
sarahchen6 Aug 1, 2025
feb40be
Fix some things
sarahchen6 Aug 2, 2025
23e0455
Fix content-length retrieval and add test for injection timing
sarahchen6 Aug 6, 2025
07db174
Add injection initialization success telemetry
sarahchen6 Aug 7, 2025
a5352ff
Fix CoreTracer compilation with InstrumenterConfig
sarahchen6 Aug 8, 2025
2a1f676
Add tags to all metrics
sarahchen6 Aug 8, 2025
b32874c
Update InjectingPipeOutputStreamTest
sarahchen6 Aug 8, 2025
7a5cd68
Tweaks
sarahchen6 Aug 8, 2025
412a3f8
Address jacoco coverage and injectingpipeoutstream interface updates
sarahchen6 Aug 9, 2025
b3fcde4
Add content-length detection for InjectingPipeWriter and improve tests
sarahchen6 Aug 11, 2025
4b2f6b3
Address review comments
sarahchen6 Aug 11, 2025
3e462d4
Fix header retrieval
sarahchen6 Aug 12, 2025
b1ab871
Add lots of improvements from review comments
sarahchen6 Aug 12, 2025
93bd0a4
Fix constructors and address review comment
sarahchen6 Aug 13, 2025
5a24d7d
Merge branch 'master' into sarahchen6/implement-rum-injector-telemetry
sarahchen6 Aug 13, 2025
b20d6c4
Clarify bytes written and address review comment
sarahchen6 Aug 13, 2025
f7607ba
Use dynamic servlet version retrieval
sarahchen6 Aug 18, 2025
e6afc52
Change injection timing logic
sarahchen6 Aug 18, 2025
6f2dc97
Clean up
sarahchen6 Aug 19, 2025
32372c1
Use dynamic servlet version retrieval for tagging as well
sarahchen6 Aug 19, 2025
821dc87
Address merge conflicts
sarahchen6 Aug 19, 2025
150ab05
Add a telemetry check to HttpServerTest
sarahchen6 Aug 19, 2025
e9095b5
Clean up
sarahchen6 Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class InjectingPipeOutputStreamBenchmark {
public void withPipe() throws Exception {
try (final PrintWriter out =
new PrintWriter(
new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content, null))) {
new InjectingPipeOutputStream(new ByteArrayOutputStream(), marker, content))) {
htmlContent.forEach(out::println);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.LongConsumer;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand All @@ -23,18 +24,41 @@ public class InjectingPipeOutputStream extends OutputStream {
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final OutputStream downstream;
private final LongConsumer onBytesWritten;
private final LongConsumer onInjectionTime;
private long bytesWritten = 0;

/**
* This constructor is typically used for testing where we care about the logic and not the
* telemetry.
*
* @param downstream the delegate output stream
* @param marker the marker to find in the stream. Must at least be one byte.
* @param contentToInject the content to inject once before the marker if found.
*/
public InjectingPipeOutputStream(
final OutputStream downstream, final byte[] marker, final byte[] contentToInject) {
this(downstream, marker, contentToInject, null, null, null);
}

/**
* This constructor contains the full set of parameters.
*
* @param downstream the delegate output stream
* @param marker the marker to find in the stream. Must at least be one byte.
* @param contentToInject the content to inject once before the marker if found.
* @param onContentInjected callback called when and if the content is injected.
* @param onBytesWritten callback called when stream is closed to report total bytes written.
* @param onInjectionTime callback called with the time in milliseconds taken to write the
* injection content.
*/
public InjectingPipeOutputStream(
final OutputStream downstream,
final byte[] marker,
final byte[] contentToInject,
final Runnable onContentInjected) {
final Runnable onContentInjected,
final LongConsumer onBytesWritten,
final LongConsumer onInjectionTime) {
this.downstream = downstream;
this.marker = marker;
this.lookbehind = new byte[marker.length];
Expand All @@ -46,6 +70,8 @@ public InjectingPipeOutputStream(
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.onBytesWritten = onBytesWritten;
this.onInjectionTime = onInjectionTime;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

Expand All @@ -57,11 +83,13 @@ public void write(int b) throws IOException {
drain();
}
downstream.write(b);
bytesWritten++;
return;
}

if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
bytesWritten++;
} else {
count++;
}
Expand All @@ -72,7 +100,12 @@ public void write(int b) throws IOException {
if (marker[matchingPos++] == b) {
if (matchingPos == marker.length) {
filter = false;
long injectionStart = System.nanoTime();
downstream.write(contentToInject);
long injectionEnd = System.nanoTime();
if (onInjectionTime != null) {
onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L);
}
if (onContentInjected != null) {
onContentInjected.run();
}
Expand All @@ -91,6 +124,7 @@ public void write(byte[] array, int off, int len) throws IOException {
drain();
}
downstream.write(array, off, len);
bytesWritten += len;
return;
}

Expand All @@ -103,12 +137,21 @@ public void write(byte[] array, int off, int len) throws IOException {
// we have a full match. just write everything
filter = false;
drain();
downstream.write(array, off, idx);
int bytesToWrite = idx;
downstream.write(array, off, bytesToWrite);
bytesWritten += bytesToWrite;
long injectionStart = System.nanoTime();
downstream.write(contentToInject);
long injectionEnd = System.nanoTime();
if (onInjectionTime != null) {
onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L);
}
if (onContentInjected != null) {
onContentInjected.run();
}
downstream.write(array, off + idx, len - idx);
bytesToWrite = len - idx;
downstream.write(array, off + idx, bytesToWrite);
bytesWritten += bytesToWrite;
} else {
// we don't have a full match. write everything in a bulk except the lookbehind buffer
// sequentially
Expand All @@ -120,7 +163,9 @@ public void write(byte[] array, int off, int len) throws IOException {

// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
int bytesToWrite = len - bulkWriteThreshold;
downstream.write(array, off + marker.length - 1, bytesToWrite);
bytesWritten += bytesToWrite;
filter = wasFiltering;
for (int i = len - marker.length + 1; i < len; i++) {
write(array[i]);
Expand Down Expand Up @@ -163,6 +208,7 @@ private void drain() throws IOException {
int cnt = count;
for (int i = 0; i < cnt; i++) {
downstream.write(lookbehind[(start + i) % lookbehind.length]);
bytesWritten++;
count--;
}
filter = wasFiltering;
Expand All @@ -185,6 +231,11 @@ public void flush() throws IOException {
public void close() throws IOException {
try {
commit();
// report the size of the original HTTP response before injecting via callback
if (onBytesWritten != null) {
onBytesWritten.accept(bytesWritten);
}
bytesWritten = 0;
} finally {
downstream.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.Writer;
import java.util.function.LongConsumer;
import javax.annotation.concurrent.NotThreadSafe;

/**
Expand All @@ -23,18 +24,41 @@ public class InjectingPipeWriter extends Writer {
private final Runnable onContentInjected;
private final int bulkWriteThreshold;
private final Writer downstream;
private final LongConsumer onBytesWritten;
private final LongConsumer onInjectionTime;
private long bytesWritten = 0;

/**
* This constructor is typically used for testing where we care about the logic and not the
* telemetry.
*
* @param downstream the delegate writer
* @param marker the marker to find in the stream. Must at least be one char.
* @param contentToInject the content to inject once before the marker if found.
*/
public InjectingPipeWriter(
final Writer downstream, final char[] marker, final char[] contentToInject) {
this(downstream, marker, contentToInject, null, null, null);
}

/**
* This constructor contains the full set of parameters.
*
* @param downstream the delegate writer
* @param marker the marker to find in the stream. Must at least be one char.
* @param contentToInject the content to inject once before the marker if found.
* @param onContentInjected callback called when and if the content is injected.
* @param onBytesWritten callback called when writer is closed to report total bytes written.
* @param onInjectionTime callback called with the time in milliseconds taken to write the
* injection content.
*/
public InjectingPipeWriter(
final Writer downstream,
final char[] marker,
final char[] contentToInject,
final Runnable onContentInjected) {
final Runnable onContentInjected,
final LongConsumer onBytesWritten,
final LongConsumer onInjectionTime) {
this.downstream = downstream;
this.marker = marker;
this.lookbehind = new char[marker.length];
Expand All @@ -46,6 +70,8 @@ public InjectingPipeWriter(
this.filter = true;
this.contentToInject = contentToInject;
this.onContentInjected = onContentInjected;
this.onBytesWritten = onBytesWritten;
this.onInjectionTime = onInjectionTime;
this.bulkWriteThreshold = marker.length * 2 - 2;
}

Expand All @@ -57,11 +83,13 @@ public void write(int c) throws IOException {
drain();
}
downstream.write(c);
bytesWritten++;
return;
}

if (count == lookbehind.length) {
downstream.write(lookbehind[pos]);
bytesWritten++;
} else {
count++;
}
Expand All @@ -72,7 +100,12 @@ public void write(int c) throws IOException {
if (marker[matchingPos++] == c) {
if (matchingPos == marker.length) {
filter = false;
long injectionStart = System.nanoTime();
downstream.write(contentToInject);
long injectionEnd = System.nanoTime();
if (onInjectionTime != null) {
onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L);
}
if (onContentInjected != null) {
onContentInjected.run();
}
Expand All @@ -91,6 +124,7 @@ public void write(char[] array, int off, int len) throws IOException {
drain();
}
downstream.write(array, off, len);
bytesWritten += len;
return;
}

Expand All @@ -103,12 +137,21 @@ public void write(char[] array, int off, int len) throws IOException {
// we have a full match. just write everything
filter = false;
drain();
downstream.write(array, off, idx);
int bytesToWrite = idx;
downstream.write(array, off, bytesToWrite);
bytesWritten += bytesToWrite;
long injectionStart = System.nanoTime();
downstream.write(contentToInject);
long injectionEnd = System.nanoTime();
if (onInjectionTime != null) {
onInjectionTime.accept((injectionEnd - injectionStart) / 1_000_000L);
}
if (onContentInjected != null) {
onContentInjected.run();
}
downstream.write(array, off + idx, len - idx);
bytesToWrite = len - idx;
downstream.write(array, off + idx, bytesToWrite);
bytesWritten += bytesToWrite;
} else {
// we don't have a full match. write everything in a bulk except the lookbehind buffer
// sequentially
Expand All @@ -120,7 +163,9 @@ public void write(char[] array, int off, int len) throws IOException {

// will be reset if no errors after the following write
filter = false;
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
int bytesToWrite = len - bulkWriteThreshold;
downstream.write(array, off + marker.length - 1, bytesToWrite);
bytesWritten += bytesToWrite;
filter = wasFiltering;

for (int i = len - marker.length + 1; i < len; i++) {
Expand Down Expand Up @@ -164,6 +209,7 @@ private void drain() throws IOException {
int cnt = count;
for (int i = 0; i < cnt; i++) {
downstream.write(lookbehind[(start + i) % lookbehind.length]);
bytesWritten++;
count--;
}
filter = wasFiltering;
Expand All @@ -188,6 +234,11 @@ public void close() throws IOException {
commit();
} finally {
downstream.close();
// report the size of the original HTTP response before injecting via callback
if (onBytesWritten != null) {
onBytesWritten.accept(bytesWritten);
}
bytesWritten = 0;
}
}

Expand Down
Loading