Skip to content

Commit 579165c

Browse files
amarzialimcculls
andauthored
🍒 9184 - Make rum injector stream/writer more resilient to errors (#9340)
* Make rum injector stream/writer more resilient to errors (cherry picked from commit a298c60) * fix tests (cherry picked from commit 62acf7c) * fix tests (cherry picked from commit 5a32d51) * Update dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java Co-authored-by: Stuart McCulloch <[email protected]> (cherry picked from commit 87d15fb) * apply suggestions (cherry picked from commit 21afeb2) --------- Co-authored-by: Stuart McCulloch <[email protected]>
1 parent 7beb0c3 commit 579165c

File tree

4 files changed

+240
-53
lines changed

4 files changed

+240
-53
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55

66
/**
77
* An OutputStream containing a circular buffer with a lookbehind buffer of n bytes. The first time
8-
* that the latest n bytes matches the marker, a content is injected before.
8+
* that the latest n bytes matches the marker, a content is injected before. In case of IOException
9+
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
10+
* this case the draining will be resumed.
911
*/
1012
public class InjectingPipeOutputStream extends OutputStream {
1113
private final byte[] lookbehind;
1214
private int pos;
13-
private boolean bufferFilled;
15+
private int count;
1416
private final byte[] marker;
1517
private final byte[] contentToInject;
16-
private boolean found = false;
17-
private int matchingPos = 0;
18+
private boolean filter;
19+
private boolean wasDraining;
20+
private int matchingPos;
1821
private final Runnable onContentInjected;
1922
private final int bulkWriteThreshold;
2023
private final OutputStream downstream;
@@ -34,32 +37,39 @@ public InjectingPipeOutputStream(
3437
this.marker = marker;
3538
this.lookbehind = new byte[marker.length];
3639
this.pos = 0;
40+
this.count = 0;
41+
this.matchingPos = 0;
42+
this.wasDraining = false;
43+
// should filter the stream to potentially inject into it.
44+
this.filter = true;
3745
this.contentToInject = contentToInject;
3846
this.onContentInjected = onContentInjected;
3947
this.bulkWriteThreshold = marker.length * 2 - 2;
4048
}
4149

4250
@Override
4351
public void write(int b) throws IOException {
44-
if (found) {
52+
if (!filter) {
53+
if (wasDraining) {
54+
// continue draining
55+
drain();
56+
}
4557
downstream.write(b);
4658
return;
4759
}
4860

49-
if (bufferFilled) {
61+
if (count == lookbehind.length) {
5062
downstream.write(lookbehind[pos]);
63+
} else {
64+
count++;
5165
}
5266

5367
lookbehind[pos] = (byte) b;
5468
pos = (pos + 1) % lookbehind.length;
5569

56-
if (!bufferFilled) {
57-
bufferFilled = pos == 0;
58-
}
59-
6070
if (marker[matchingPos++] == b) {
6171
if (matchingPos == marker.length) {
62-
found = true;
72+
filter = false;
6373
downstream.write(contentToInject);
6474
if (onContentInjected != null) {
6575
onContentInjected.run();
@@ -73,18 +83,23 @@ public void write(int b) throws IOException {
7383

7484
@Override
7585
public void write(byte[] array, int off, int len) throws IOException {
76-
if (found) {
86+
if (!filter) {
87+
if (wasDraining) {
88+
// needs drain
89+
drain();
90+
}
7791
downstream.write(array, off, len);
7892
return;
7993
}
94+
8095
if (len > bulkWriteThreshold) {
8196
// if the content is large enough, we can bulk write everything but the N trail and tail.
8297
// This because the buffer can already contain some byte from a previous single write.
8398
// Also we need to fill the buffer with the tail since we don't know about the next write.
8499
int idx = arrayContains(array, off, len, marker);
85100
if (idx >= 0) {
86101
// we have a full match. just write everything
87-
found = true;
102+
filter = false;
88103
drain();
89104
downstream.write(array, off, idx);
90105
downstream.write(contentToInject);
@@ -99,7 +114,12 @@ public void write(byte[] array, int off, int len) throws IOException {
99114
write(array[i]);
100115
}
101116
drain();
117+
boolean wasFiltering = filter;
118+
119+
// will be reset if no errors after the following write
120+
filter = false;
102121
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
122+
filter = wasFiltering;
103123
for (int i = len - marker.length + 1; i < len; i++) {
104124
write(array[i]);
105125
}
@@ -133,16 +153,19 @@ private int arrayContains(byte[] array, int off, int len, byte[] search) {
133153
}
134154

135155
private void drain() throws IOException {
136-
if (bufferFilled) {
137-
for (int i = 0; i < lookbehind.length; i++) {
138-
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
156+
if (count > 0) {
157+
boolean wasFiltering = filter;
158+
filter = false;
159+
wasDraining = true;
160+
int start = (pos - count + lookbehind.length) % lookbehind.length;
161+
int cnt = count;
162+
for (int i = 0; i < cnt; i++) {
163+
downstream.write(lookbehind[(start + i) % lookbehind.length]);
164+
count--;
139165
}
140-
} else {
141-
downstream.write(this.lookbehind, 0, pos);
166+
filter = wasFiltering;
167+
wasDraining = false;
142168
}
143-
pos = 0;
144-
matchingPos = 0;
145-
bufferFilled = false;
146169
}
147170

148171
@Override
@@ -152,9 +175,12 @@ public void flush() throws IOException {
152175

153176
@Override
154177
public void close() throws IOException {
155-
if (!found) {
156-
drain();
178+
try {
179+
if (filter || wasDraining) {
180+
drain();
181+
}
182+
} finally {
183+
downstream.close();
157184
}
158-
downstream.close();
159185
}
160186
}

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55

66
/**
77
* A Writer containing a circular buffer with a lookbehind buffer of n bytes. The first time that
8-
* the latest n bytes matches the marker, a content is injected before.
8+
* the latest n bytes matches the marker, a content is injected before. In case of IOException
9+
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
10+
* this case the draining will be resumed.
911
*/
1012
public class InjectingPipeWriter extends Writer {
1113
private final char[] lookbehind;
1214
private int pos;
13-
private boolean bufferFilled;
15+
private int count;
1416
private final char[] marker;
1517
private final char[] contentToInject;
16-
private boolean found = false;
17-
private int matchingPos = 0;
18+
private boolean filter;
19+
private boolean wasDraining;
20+
private int matchingPos;
1821
private final Runnable onContentInjected;
1922
private final int bulkWriteThreshold;
2023
private final Writer downstream;
@@ -34,32 +37,39 @@ public InjectingPipeWriter(
3437
this.marker = marker;
3538
this.lookbehind = new char[marker.length];
3639
this.pos = 0;
40+
this.count = 0;
41+
this.matchingPos = 0;
42+
this.wasDraining = false;
43+
// should filter the stream to potentially inject into it.
44+
this.filter = true;
3745
this.contentToInject = contentToInject;
3846
this.onContentInjected = onContentInjected;
3947
this.bulkWriteThreshold = marker.length * 2 - 2;
4048
}
4149

4250
@Override
4351
public void write(int c) throws IOException {
44-
if (found) {
52+
if (!filter) {
53+
if (wasDraining) {
54+
// continue draining
55+
drain();
56+
}
4557
downstream.write(c);
4658
return;
4759
}
4860

49-
if (bufferFilled) {
61+
if (count == lookbehind.length) {
5062
downstream.write(lookbehind[pos]);
63+
} else {
64+
count++;
5165
}
5266

5367
lookbehind[pos] = (char) c;
5468
pos = (pos + 1) % lookbehind.length;
5569

56-
if (!bufferFilled) {
57-
bufferFilled = pos == 0;
58-
}
59-
6070
if (marker[matchingPos++] == c) {
6171
if (matchingPos == marker.length) {
62-
found = true;
72+
filter = false;
6373
downstream.write(contentToInject);
6474
if (onContentInjected != null) {
6575
onContentInjected.run();
@@ -71,25 +81,25 @@ public void write(int c) throws IOException {
7181
}
7282
}
7383

74-
@Override
75-
public void flush() throws IOException {
76-
downstream.flush();
77-
}
78-
7984
@Override
8085
public void write(char[] array, int off, int len) throws IOException {
81-
if (found) {
86+
if (!filter) {
87+
if (wasDraining) {
88+
// needs drain
89+
drain();
90+
}
8291
downstream.write(array, off, len);
8392
return;
8493
}
94+
8595
if (len > bulkWriteThreshold) {
8696
// if the content is large enough, we can bulk write everything but the N trail and tail.
8797
// This because the buffer can already contain some byte from a previous single write.
8898
// Also we need to fill the buffer with the tail since we don't know about the next write.
8999
int idx = arrayContains(array, off, len, marker);
90100
if (idx >= 0) {
91101
// we have a full match. just write everything
92-
found = true;
102+
filter = false;
93103
drain();
94104
downstream.write(array, off, idx);
95105
downstream.write(contentToInject);
@@ -104,7 +114,13 @@ public void write(char[] array, int off, int len) throws IOException {
104114
write(array[i]);
105115
}
106116
drain();
117+
boolean wasFiltering = filter;
118+
119+
// will be reset if no errors after the following write
120+
filter = false;
107121
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
122+
filter = wasFiltering;
123+
108124
for (int i = len - marker.length + 1; i < len; i++) {
109125
write(array[i]);
110126
}
@@ -138,23 +154,34 @@ private int arrayContains(char[] array, int off, int len, char[] search) {
138154
}
139155

140156
private void drain() throws IOException {
141-
if (bufferFilled) {
142-
for (int i = 0; i < lookbehind.length; i++) {
143-
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
157+
if (count > 0) {
158+
boolean wasFiltering = filter;
159+
filter = false;
160+
wasDraining = true;
161+
int start = (pos - count + lookbehind.length) % lookbehind.length;
162+
int cnt = count;
163+
for (int i = 0; i < cnt; i++) {
164+
downstream.write(lookbehind[(start + i) % lookbehind.length]);
165+
count--;
144166
}
145-
} else {
146-
downstream.write(this.lookbehind, 0, pos);
167+
filter = wasFiltering;
168+
wasDraining = false;
147169
}
148-
pos = 0;
149-
matchingPos = 0;
150-
bufferFilled = false;
170+
}
171+
172+
@Override
173+
public void flush() throws IOException {
174+
downstream.flush();
151175
}
152176

153177
@Override
154178
public void close() throws IOException {
155-
if (!found) {
156-
drain();
179+
try {
180+
if (filter || wasDraining) {
181+
drain();
182+
}
183+
} finally {
184+
downstream.close();
157185
}
158-
downstream.close();
159186
}
160187
}

dd-java-agent/agent-bootstrap/src/test/groovy/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamTest.groovy

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,36 @@ package datadog.trace.bootstrap.instrumentation.buffer
33
import datadog.trace.test.util.DDSpecification
44

55
class InjectingPipeOutputStreamTest extends DDSpecification {
6+
static class GlitchedOutputStream extends FilterOutputStream {
7+
int glitchesPos
8+
int count
9+
final OutputStream out
10+
11+
GlitchedOutputStream(OutputStream out, int glitchesPos) {
12+
super(out)
13+
this.out = out
14+
this.glitchesPos = glitchesPos
15+
}
16+
17+
@Override
18+
void write(byte[] b, int off, int len) throws IOException {
19+
count += len
20+
if (count >= glitchesPos) {
21+
glitchesPos = Integer.MAX_VALUE
22+
throw new IOException("Glitched after $count bytes")
23+
}
24+
out.write(b, off, len)
25+
}
26+
27+
@Override
28+
void write(int b) throws IOException {
29+
if (++count == glitchesPos) {
30+
throw new IOException("Glitched after $glitchesPos bytes")
31+
}
32+
out.write(b)
33+
}
34+
}
35+
636
def 'should filter a buffer and inject if found #found'() {
737
setup:
838
def downstream = new ByteArrayOutputStream()
@@ -20,4 +50,41 @@ class InjectingPipeOutputStreamTest extends DDSpecification {
2050
"<html><body/></html>" | "</head>" | "<something/>" | false | "<html><body/></html>"
2151
"<foo/>" | "<longerThanFoo>" | "<nothing>" | false | "<foo/>"
2252
}
53+
54+
def 'should be resilient to exceptions when writing #body'() {
55+
setup:
56+
def baos = new ByteArrayOutputStream()
57+
def downstream = new GlitchedOutputStream(baos, glichesAt)
58+
def piped = new InjectingPipeOutputStream(downstream, marker.getBytes("UTF-8"), contentToInject.getBytes("UTF-8"), null)
59+
when:
60+
try {
61+
for (String line : body) {
62+
final bytes = line.getBytes("UTF-8")
63+
try {
64+
piped.write(bytes)
65+
} catch (IOException ioe) {
66+
ioe.printStackTrace()
67+
piped.write(bytes)
68+
}
69+
}
70+
} finally {
71+
// it can throw when draining at close
72+
try {
73+
piped.close()
74+
} catch (IOException ignored) {
75+
}
76+
}
77+
then:
78+
assert baos.toByteArray() == expected.getBytes("UTF-8")
79+
where:
80+
body | marker | contentToInject | glichesAt | expected
81+
// write fails after the content has been injected
82+
["<html>", "<head>", "<foo/>", "</head>", "<body/>", "</html>"] | "</head>" | "<script>true</script>" | 60 | "<html><head><foo/><script>true</script></head><body/></html>"
83+
// write fails before the content has been injected
84+
["<html>", "<head>", "<foo/>", "</head>", "<body/>", "</html>"] | "</head>" | "<script>true</script>" | 20 | "<html><head><foo/></head><body/></html>"
85+
// write fails after having filled the buffer. The last line is written twice
86+
["<html>", "<body/>", "</html>"] | "</head>" | "<something/>" | 10 | "<html><body/></h</html>"
87+
// expected broken since the real write happens at close (drain) being the content smaller than the buffer. And retry on close is not a common practice. Hence, we suppose loosing content
88+
["<foo/>"] | "<longerThanFoo>" | "<nothing>" | 3 | "<f"
89+
}
2390
}

0 commit comments

Comments
 (0)