Skip to content

Commit e87e1c4

Browse files
Merge branch 'master' into alexeyk/play-not-a-directory-fix
2 parents 7765e84 + 5ed2f59 commit e87e1c4

File tree

48 files changed

+1112
-247
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1112
-247
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,24 @@
22

33
import java.io.IOException;
44
import java.io.OutputStream;
5+
import javax.annotation.concurrent.NotThreadSafe;
56

67
/**
78
* An OutputStream containing a circular buffer with a lookbehind buffer of n bytes. The first time
8-
* that the latest n bytes matches the marker, a content is injected before.
9+
* that the latest n bytes matches the marker, a content is injected before. In case of IOException
10+
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
11+
* this case the draining will be resumed.
912
*/
13+
@NotThreadSafe
1014
public class InjectingPipeOutputStream extends OutputStream {
1115
private final byte[] lookbehind;
1216
private int pos;
13-
private boolean bufferFilled;
17+
private int count;
1418
private final byte[] marker;
1519
private final byte[] contentToInject;
16-
private boolean found = false;
17-
private int matchingPos = 0;
20+
private boolean filter;
21+
private boolean wasDraining;
22+
private int matchingPos;
1823
private final Runnable onContentInjected;
1924
private final int bulkWriteThreshold;
2025
private final OutputStream downstream;
@@ -34,32 +39,39 @@ public InjectingPipeOutputStream(
3439
this.marker = marker;
3540
this.lookbehind = new byte[marker.length];
3641
this.pos = 0;
42+
this.count = 0;
43+
this.matchingPos = 0;
44+
this.wasDraining = false;
45+
// should filter the stream to potentially inject into it.
46+
this.filter = true;
3747
this.contentToInject = contentToInject;
3848
this.onContentInjected = onContentInjected;
3949
this.bulkWriteThreshold = marker.length * 2 - 2;
4050
}
4151

4252
@Override
4353
public void write(int b) throws IOException {
44-
if (found) {
54+
if (!filter) {
55+
if (wasDraining) {
56+
// continue draining
57+
drain();
58+
}
4559
downstream.write(b);
4660
return;
4761
}
4862

49-
if (bufferFilled) {
63+
if (count == lookbehind.length) {
5064
downstream.write(lookbehind[pos]);
65+
} else {
66+
count++;
5167
}
5268

5369
lookbehind[pos] = (byte) b;
5470
pos = (pos + 1) % lookbehind.length;
5571

56-
if (!bufferFilled) {
57-
bufferFilled = pos == 0;
58-
}
59-
6072
if (marker[matchingPos++] == b) {
6173
if (matchingPos == marker.length) {
62-
found = true;
74+
filter = false;
6375
downstream.write(contentToInject);
6476
if (onContentInjected != null) {
6577
onContentInjected.run();
@@ -73,18 +85,23 @@ public void write(int b) throws IOException {
7385

7486
@Override
7587
public void write(byte[] array, int off, int len) throws IOException {
76-
if (found) {
88+
if (!filter) {
89+
if (wasDraining) {
90+
// needs drain
91+
drain();
92+
}
7793
downstream.write(array, off, len);
7894
return;
7995
}
96+
8097
if (len > bulkWriteThreshold) {
8198
// if the content is large enough, we can bulk write everything but the N trail and tail.
8299
// This because the buffer can already contain some byte from a previous single write.
83100
// Also we need to fill the buffer with the tail since we don't know about the next write.
84101
int idx = arrayContains(array, off, len, marker);
85102
if (idx >= 0) {
86103
// we have a full match. just write everything
87-
found = true;
104+
filter = false;
88105
drain();
89106
downstream.write(array, off, idx);
90107
downstream.write(contentToInject);
@@ -99,7 +116,12 @@ public void write(byte[] array, int off, int len) throws IOException {
99116
write(array[i]);
100117
}
101118
drain();
119+
boolean wasFiltering = filter;
120+
121+
// will be reset if no errors after the following write
122+
filter = false;
102123
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
124+
filter = wasFiltering;
103125
for (int i = len - marker.length + 1; i < len; i++) {
104126
write(array[i]);
105127
}
@@ -133,16 +155,25 @@ private int arrayContains(byte[] array, int off, int len, byte[] search) {
133155
}
134156

135157
private void drain() throws IOException {
136-
if (bufferFilled) {
137-
for (int i = 0; i < lookbehind.length; i++) {
138-
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
158+
if (count > 0) {
159+
boolean wasFiltering = filter;
160+
filter = false;
161+
wasDraining = true;
162+
int start = (pos - count + lookbehind.length) % lookbehind.length;
163+
int cnt = count;
164+
for (int i = 0; i < cnt; i++) {
165+
downstream.write(lookbehind[(start + i) % lookbehind.length]);
166+
count--;
139167
}
140-
} else {
141-
downstream.write(this.lookbehind, 0, pos);
168+
filter = wasFiltering;
169+
wasDraining = false;
170+
}
171+
}
172+
173+
public void commit() throws IOException {
174+
if (filter || wasDraining) {
175+
drain();
142176
}
143-
pos = 0;
144-
matchingPos = 0;
145-
bufferFilled = false;
146177
}
147178

148179
@Override
@@ -152,9 +183,14 @@ public void flush() throws IOException {
152183

153184
@Override
154185
public void close() throws IOException {
155-
if (!found) {
156-
drain();
186+
try {
187+
commit();
188+
} finally {
189+
downstream.close();
157190
}
158-
downstream.close();
191+
}
192+
193+
public void setFilter(boolean filter) {
194+
this.filter = filter;
159195
}
160196
}

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeWriter.java

Lines changed: 66 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,24 @@
22

33
import java.io.IOException;
44
import java.io.Writer;
5+
import javax.annotation.concurrent.NotThreadSafe;
56

67
/**
78
* A Writer containing a circular buffer with a lookbehind buffer of n bytes. The first time that
8-
* the latest n bytes matches the marker, a content is injected before.
9+
* the latest n bytes matches the marker, a content is injected before. In case of IOException
10+
* thrown by the downstream, the buffer will be lost unless the error occurred when draining it. In
11+
* this case the draining will be resumed.
912
*/
13+
@NotThreadSafe
1014
public class InjectingPipeWriter extends Writer {
1115
private final char[] lookbehind;
1216
private int pos;
13-
private boolean bufferFilled;
17+
private int count;
1418
private final char[] marker;
1519
private final char[] contentToInject;
16-
private boolean found = false;
17-
private int matchingPos = 0;
20+
private boolean filter;
21+
private boolean wasDraining;
22+
private int matchingPos;
1823
private final Runnable onContentInjected;
1924
private final int bulkWriteThreshold;
2025
private final Writer downstream;
@@ -34,32 +39,39 @@ public InjectingPipeWriter(
3439
this.marker = marker;
3540
this.lookbehind = new char[marker.length];
3641
this.pos = 0;
42+
this.count = 0;
43+
this.matchingPos = 0;
44+
this.wasDraining = false;
45+
// should filter the stream to potentially inject into it.
46+
this.filter = true;
3747
this.contentToInject = contentToInject;
3848
this.onContentInjected = onContentInjected;
3949
this.bulkWriteThreshold = marker.length * 2 - 2;
4050
}
4151

4252
@Override
4353
public void write(int c) throws IOException {
44-
if (found) {
54+
if (!filter) {
55+
if (wasDraining) {
56+
// continue draining
57+
drain();
58+
}
4559
downstream.write(c);
4660
return;
4761
}
4862

49-
if (bufferFilled) {
63+
if (count == lookbehind.length) {
5064
downstream.write(lookbehind[pos]);
65+
} else {
66+
count++;
5167
}
5268

5369
lookbehind[pos] = (char) c;
5470
pos = (pos + 1) % lookbehind.length;
5571

56-
if (!bufferFilled) {
57-
bufferFilled = pos == 0;
58-
}
59-
6072
if (marker[matchingPos++] == c) {
6173
if (matchingPos == marker.length) {
62-
found = true;
74+
filter = false;
6375
downstream.write(contentToInject);
6476
if (onContentInjected != null) {
6577
onContentInjected.run();
@@ -71,25 +83,25 @@ public void write(int c) throws IOException {
7183
}
7284
}
7385

74-
@Override
75-
public void flush() throws IOException {
76-
downstream.flush();
77-
}
78-
7986
@Override
8087
public void write(char[] array, int off, int len) throws IOException {
81-
if (found) {
88+
if (!filter) {
89+
if (wasDraining) {
90+
// needs drain
91+
drain();
92+
}
8293
downstream.write(array, off, len);
8394
return;
8495
}
96+
8597
if (len > bulkWriteThreshold) {
8698
// if the content is large enough, we can bulk write everything but the N trail and tail.
8799
// This because the buffer can already contain some byte from a previous single write.
88100
// Also we need to fill the buffer with the tail since we don't know about the next write.
89101
int idx = arrayContains(array, off, len, marker);
90102
if (idx >= 0) {
91103
// we have a full match. just write everything
92-
found = true;
104+
filter = false;
93105
drain();
94106
downstream.write(array, off, idx);
95107
downstream.write(contentToInject);
@@ -104,7 +116,13 @@ public void write(char[] array, int off, int len) throws IOException {
104116
write(array[i]);
105117
}
106118
drain();
119+
boolean wasFiltering = filter;
120+
121+
// will be reset if no errors after the following write
122+
filter = false;
107123
downstream.write(array, off + marker.length - 1, len - bulkWriteThreshold);
124+
filter = wasFiltering;
125+
108126
for (int i = len - marker.length + 1; i < len; i++) {
109127
write(array[i]);
110128
}
@@ -138,23 +156,42 @@ private int arrayContains(char[] array, int off, int len, char[] search) {
138156
}
139157

140158
private void drain() throws IOException {
141-
if (bufferFilled) {
142-
for (int i = 0; i < lookbehind.length; i++) {
143-
downstream.write(lookbehind[(pos + i) % lookbehind.length]);
159+
if (count > 0) {
160+
boolean wasFiltering = filter;
161+
filter = false;
162+
wasDraining = true;
163+
int start = (pos - count + lookbehind.length) % lookbehind.length;
164+
int cnt = count;
165+
for (int i = 0; i < cnt; i++) {
166+
downstream.write(lookbehind[(start + i) % lookbehind.length]);
167+
count--;
144168
}
145-
} else {
146-
downstream.write(this.lookbehind, 0, pos);
169+
filter = wasFiltering;
170+
wasDraining = false;
147171
}
148-
pos = 0;
149-
matchingPos = 0;
150-
bufferFilled = false;
172+
}
173+
174+
public void commit() throws IOException {
175+
if (filter || wasDraining) {
176+
drain();
177+
}
178+
}
179+
180+
@Override
181+
public void flush() throws IOException {
182+
downstream.flush();
151183
}
152184

153185
@Override
154186
public void close() throws IOException {
155-
if (!found) {
156-
drain();
187+
try {
188+
commit();
189+
} finally {
190+
downstream.close();
157191
}
158-
downstream.close();
192+
}
193+
194+
public void setFilter(boolean filter) {
195+
this.filter = filter;
159196
}
160197
}

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/http/ClientIpAddressResolver.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,14 @@ private static InetAddress doResolve(AgentSpanContext.Extracted context, Mutable
9292
result = coalesce(result, addr);
9393
}
9494

95+
addr = tryHeader(context.getForwarded(), FORWARDED_PARSER);
96+
if (addr != null) {
97+
if (!isIpAddrPrivate(addr)) {
98+
return addr;
99+
}
100+
result = coalesce(result, addr);
101+
}
102+
95103
addr = tryHeader(context.getForwardedFor(), PLAIN_IP_ADDRESS_PARSER);
96104
if (addr != null) {
97105
if (!isIpAddrPrivate(addr)) {

0 commit comments

Comments
 (0)