Skip to content

Commit b3fcde4

Browse files
committed
Add content-length detection for InjectingPipeWriter and improve tests
1 parent 412a3f8 commit b3fcde4

File tree

7 files changed

+240
-6
lines changed

7 files changed

+240
-6
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.IOException;
44
import java.io.Writer;
5+
import java.util.function.LongConsumer;
56
import javax.annotation.concurrent.NotThreadSafe;
67

78
/**
@@ -23,18 +24,22 @@ public class InjectingPipeWriter extends Writer {
2324
private final Runnable onContentInjected;
2425
private final int bulkWriteThreshold;
2526
private final Writer downstream;
27+
private final LongConsumer onBytesWritten;
28+
private long bytesWritten = 0;
2629

2730
/**
2831
* @param downstream the delegate writer
2932
* @param marker the marker to find in the stream. Must at least be one char.
3033
* @param contentToInject the content to inject once before the marker if found.
3134
* @param onContentInjected callback called when and if the content is injected.
35+
* @param onBytesWritten callback called when writer is closed to report total bytes written.
3236
*/
3337
public InjectingPipeWriter(
3438
final Writer downstream,
3539
final char[] marker,
3640
final char[] contentToInject,
37-
final Runnable onContentInjected) {
41+
final Runnable onContentInjected,
42+
final LongConsumer onBytesWritten) {
3843
this.downstream = downstream;
3944
this.marker = marker;
4045
this.lookbehind = new char[marker.length];
@@ -46,11 +51,13 @@ public InjectingPipeWriter(
4651
this.filter = true;
4752
this.contentToInject = contentToInject;
4853
this.onContentInjected = onContentInjected;
54+
this.onBytesWritten = onBytesWritten;
4955
this.bulkWriteThreshold = marker.length * 2 - 2;
5056
}
5157

5258
@Override
5359
public void write(int c) throws IOException {
60+
bytesWritten++;
5461
if (!filter) {
5562
if (wasDraining) {
5663
// continue draining
@@ -85,6 +92,7 @@ public void write(int c) throws IOException {
8592

8693
@Override
8794
public void write(char[] array, int off, int len) throws IOException {
95+
bytesWritten += len;
8896
if (!filter) {
8997
if (wasDraining) {
9098
// needs drain
@@ -113,6 +121,7 @@ public void write(char[] array, int off, int len) throws IOException {
113121
// we don't have a full match. write everything in a bulk except the lookbehind buffer
114122
// sequentially
115123
for (int i = off; i < off + marker.length - 1; i++) {
124+
bytesWritten--; // avoid double counting
116125
write(array[i]);
117126
}
118127
drain();
@@ -124,12 +133,14 @@ public void write(char[] array, int off, int len) throws IOException {
124133
filter = wasFiltering;
125134

126135
for (int i = len - marker.length + 1; i < len; i++) {
136+
bytesWritten--; // avoid double counting
127137
write(array[i]);
128138
}
129139
}
130140
} else {
131141
// use slow path because the length to write is small and within the lookbehind buffer size
132142
for (int i = off; i < off + len; i++) {
143+
bytesWritten--; // avoid double counting
133144
write(array[i]);
134145
}
135146
}
@@ -188,6 +199,11 @@ public void close() throws IOException {
188199
commit();
189200
} finally {
190201
downstream.close();
202+
// report the size of the original HTTP response before injecting via callback
203+
if (onBytesWritten != null) {
204+
onBytesWritten.accept(bytesWritten);
205+
}
206+
bytesWritten = 0;
191207
}
192208
}
193209

dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,94 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
8787
// expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content
8888
["<foo/>"] | "<longerThanFoo>" | "<nothing>" | 3 | "<f"
8989
}
90+
91+
def 'should count bytes correctly when writing byte arrays'() {
92+
setup:
93+
def downstream = new ByteArrayOutputStream()
94+
def bytesWritten = []
95+
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
96+
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, onBytesWritten)
97+
98+
when:
99+
piped.write("test content".getBytes("UTF-8"))
100+
piped.close()
101+
102+
then:
103+
bytesWritten.size() == 1
104+
bytesWritten[0] == 12
105+
downstream.toByteArray() == "test content".getBytes("UTF-8")
106+
}
107+
108+
def 'should count bytes correctly when writing bytes individually'() {
109+
setup:
110+
def downstream = new ByteArrayOutputStream()
111+
def bytesWritten = []
112+
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
113+
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, onBytesWritten)
114+
115+
when:
116+
def bytes = "test".getBytes("UTF-8")
117+
for (int i = 0; i < bytes.length; i++) {
118+
piped.write((int) bytes[i])
119+
}
120+
piped.close()
121+
122+
then:
123+
bytesWritten.size() == 1
124+
bytesWritten[0] == 4
125+
downstream.toByteArray() == "test".getBytes("UTF-8")
126+
}
127+
128+
def 'should count bytes correctly with multiple writes'() {
129+
setup:
130+
def downstream = new ByteArrayOutputStream()
131+
def bytesWritten = []
132+
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
133+
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, onBytesWritten)
134+
135+
when:
136+
piped.write("test".getBytes("UTF-8"))
137+
piped.write(" ".getBytes("UTF-8"))
138+
piped.write("content".getBytes("UTF-8"))
139+
piped.close()
140+
141+
then:
142+
bytesWritten.size() == 1
143+
bytesWritten[0] == 12
144+
downstream.toByteArray() == "test content".getBytes("UTF-8")
145+
}
146+
147+
def 'should be resilient to exceptions when onBytesWritten callback is null'() {
148+
setup:
149+
def downstream = new ByteArrayOutputStream()
150+
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, null)
151+
152+
when:
153+
piped.write("test content".getBytes("UTF-8"))
154+
piped.close()
155+
156+
then:
157+
noExceptionThrown()
158+
downstream.toByteArray() == "test content".getBytes("UTF-8")
159+
}
160+
161+
def 'should reset byte count after close'() {
162+
setup:
163+
def downstream = new ByteArrayOutputStream()
164+
def bytesWritten = []
165+
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
166+
def piped = new InjectingPipeOutputStream(downstream, "</head>".getBytes("UTF-8"), "<script></script>".getBytes("UTF-8"), null, onBytesWritten)
167+
168+
when:
169+
piped.write("test".getBytes("UTF-8"))
170+
piped.close()
171+
172+
piped.write("content".getBytes("UTF-8"))
173+
piped.close()
174+
175+
then:
176+
bytesWritten.size() == 2
177+
bytesWritten[0] == 4
178+
bytesWritten[1] == 7
179+
}
90180
}

dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriterTest.groovy

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class InjectingPipeWriterTest extends DDSpecification {
3636
def 'should filter a buffer and inject if found #found using write'() {
3737
setup:
3838
def downstream = new StringWriter()
39-
def piped = new PrintWriter(new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null))
39+
def piped = new PrintWriter(new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null, null))
4040
when:
4141
try (def closeme = piped) {
4242
piped.write(body)
@@ -53,7 +53,7 @@ class InjectingPipeWriterTest extends DDSpecification {
5353
def 'should filter a buffer and inject if found #found using append'() {
5454
setup:
5555
def downstream = new StringWriter()
56-
def piped = new PrintWriter(new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null))
56+
def piped = new PrintWriter(new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null, null))
5757
when:
5858
try (def closeme = piped) {
5959
piped.append(body)
@@ -71,7 +71,7 @@ class InjectingPipeWriterTest extends DDSpecification {
7171
setup:
7272
def writer = new StringWriter()
7373
def downstream = new GlitchedWriter(writer, glichesAt)
74-
def piped = new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null)
74+
def piped = new InjectingPipeWriter(downstream, marker.toCharArray(), contentToInject.toCharArray(), null, null)
7575
when:
7676
try {
7777
for (String line : body) {
@@ -103,4 +103,94 @@ class InjectingPipeWriterTest extends DDSpecification {
103103
// expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content
104104
["<foo/>"] | "<longerThanFoo>" | "<nothing>" | 3 | "<f"
105105
}
106+
107+
def 'should count bytes correctly when writing characters'() {
108+
setup:
109+
def downstream = new StringWriter()
110+
def bytesWritten = []
111+
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
112+
def piped = new InjectingPipeWriter(downstream, "</head>".toCharArray(), "<script></script>".toCharArray(), null, onBytesWritten)
113+
114+
when:
115+
piped.write("test content".toCharArray())
116+
piped.close()
117+
118+
then:
119+
bytesWritten.size() == 1
120+
bytesWritten[0] == 12
121+
downstream.toString() == "test content"
122+
}
123+
124+
def 'should count bytes correctly when writing characters individually'() {
125+
setup:
126+
def downstream = new StringWriter()
127+
def bytesWritten = []
128+
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
129+
def piped = new InjectingPipeWriter(downstream, "</head>".toCharArray(), "<script></script>".toCharArray(), null, onBytesWritten)
130+
131+
when:
132+
def content = "test"
133+
for (int i = 0; i < content.length(); i++) {
134+
piped.write((int) content.charAt(i))
135+
}
136+
piped.close()
137+
138+
then:
139+
bytesWritten.size() == 1
140+
bytesWritten[0] == 4
141+
downstream.toString() == "test"
142+
}
143+
144+
def 'should count bytes correctly with multiple writes'() {
145+
setup:
146+
def downstream = new StringWriter()
147+
def bytesWritten = []
148+
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
149+
def piped = new InjectingPipeWriter(downstream, "</head>".toCharArray(), "<script></script>".toCharArray(), null, onBytesWritten)
150+
151+
when:
152+
piped.write("test".toCharArray())
153+
piped.write(" ".toCharArray())
154+
piped.write("content".toCharArray())
155+
piped.close()
156+
157+
then:
158+
bytesWritten.size() == 1
159+
bytesWritten[0] == 12
160+
downstream.toString() == "test content"
161+
}
162+
163+
def 'should be resilient to exceptions when onBytesWritten callback is null'() {
164+
setup:
165+
def downstream = new StringWriter()
166+
def piped = new InjectingPipeWriter(downstream, "</head>".toCharArray(), "<script></script>".toCharArray(), null, null)
167+
168+
when:
169+
piped.write("test content".toCharArray())
170+
piped.close()
171+
172+
then:
173+
noExceptionThrown()
174+
downstream.toString() == "test content"
175+
}
176+
177+
def 'should reset byte count after close'() {
178+
setup:
179+
def downstream = new StringWriter()
180+
def bytesWritten = []
181+
def onBytesWritten = { long bytes -> bytesWritten.add(bytes) }
182+
def piped = new InjectingPipeWriter(downstream, "</head>".toCharArray(), "<script></script>".toCharArray(), null, onBytesWritten)
183+
184+
when:
185+
piped.write("test".toCharArray())
186+
piped.close()
187+
188+
piped.write("content".toCharArray())
189+
piped.close()
190+
191+
then:
192+
bytesWritten.size() == 2
193+
bytesWritten[0] == 4
194+
bytesWritten[1] == 7
195+
}
106196
}

dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletResponseWrapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ public PrintWriter getWriter() throws IOException {
9494
super.getWriter(),
9595
rumInjector.getMarkerChars(),
9696
rumInjector.getSnippetChars(),
97-
this::onInjected);
97+
this::onInjected,
98+
bytes -> RumInjector.getTelemetryCollector().onInjectionResponseSize("3", bytes));
9899
printWriter = new PrintWriter(wrappedPipeWriter);
99100
} catch (Exception e) {
100101
injectionStartTime = -1;

dd-java-agent/instrumentation/servlet/request-3/src/test/groovy/RumHttpServletResponseWrapperTest.groovy

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,24 @@ class RumHttpServletResponseWrapperTest extends AgentTestRunner {
159159
1 * onBytesWritten.accept(11)
160160
}
161161

162+
void 'response sizes are reported by the InjectingPipeWriter callback'() {
163+
setup:
164+
def downstream = Mock(java.io.Writer)
165+
def marker = "</head>".toCharArray()
166+
def contentToInject = "<script></script>".toCharArray()
167+
def onBytesWritten = Mock(java.util.function.LongConsumer)
168+
def writer = new datadog.trace.bootstrap.instrumentation.buffer.InjectingPipeWriter(
169+
downstream, marker, contentToInject, null, onBytesWritten)
170+
171+
when:
172+
writer.write("test".toCharArray())
173+
writer.write("content".toCharArray())
174+
writer.close()
175+
176+
then:
177+
1 * onBytesWritten.accept(11)
178+
}
179+
162180
void 'injection timing is reported when injection is successful'() {
163181
setup:
164182
// set the injection start time to simulate timing

dd-java-agent/instrumentation/servlet/request-5/src/main/java/datadog/trace/instrumentation/servlet5/RumHttpServletResponseWrapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ public PrintWriter getWriter() throws IOException {
7575
super.getWriter(),
7676
rumInjector.getMarkerChars(),
7777
rumInjector.getSnippetChars(),
78-
this::onInjected);
78+
this::onInjected,
79+
bytes -> RumInjector.getTelemetryCollector().onInjectionResponseSize("5", bytes));
7980
printWriter = new PrintWriter(wrappedPipeWriter);
8081
} catch (Exception e) {
8182
injectionStartTime = -1;

dd-java-agent/instrumentation/servlet/request-5/src/test/groovy/RumHttpServletResponseWrapperTest.groovy

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,24 @@ class RumHttpServletResponseWrapperTest extends AgentTestRunner {
159159
1 * onBytesWritten.accept(11)
160160
}
161161

162+
void 'response sizes are reported by the InjectingPipeWriter callback'() {
163+
setup:
164+
def downstream = Mock(java.io.Writer)
165+
def marker = "</head>".toCharArray()
166+
def contentToInject = "<script></script>".toCharArray()
167+
def onBytesWritten = Mock(java.util.function.LongConsumer)
168+
def writer = new datadog.trace.bootstrap.instrumentation.buffer.InjectingPipeWriter(
169+
downstream, marker, contentToInject, null, onBytesWritten)
170+
171+
when:
172+
writer.write("test".toCharArray())
173+
writer.write("content".toCharArray())
174+
writer.close()
175+
176+
then:
177+
1 * onBytesWritten.accept(11)
178+
}
179+
162180
void 'injection timing is reported when injection is successful'() {
163181
setup:
164182
// set the injection start time to simulate timing

0 commit comments

Comments
 (0)