Skip to content

Commit 3f81f6f

Browse files
authored
No Content-Encoding when disabling compression for localhost (elastic#2238)
1 parent 080ff41 commit 3f81f6f

File tree

5 files changed

+63
-15
lines changed

5 files changed

+63
-15
lines changed

apm-agent-core/src/main/java/co/elastic/apm/agent/report/AbstractIntakeApiHandler.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class AbstractIntakeApiHandler {
5151
protected HttpURLConnection connection;
5252
@Nullable
5353
protected OutputStream os;
54+
@Nullable
55+
private CountingOutputStream countingOs;
5456
protected int errorCount;
5557
protected volatile boolean shutDown;
5658
private volatile boolean healthy = true;
@@ -60,7 +62,7 @@ public AbstractIntakeApiHandler(ReporterConfiguration reporterConfiguration, Pay
6062
this.reporterConfiguration = reporterConfiguration;
6163
this.payloadSerializer = payloadSerializer;
6264
this.apmServerClient = apmServerClient;
63-
this.deflater = new Deflater();
65+
this.deflater = new Deflater(Deflater.BEST_SPEED);
6466
}
6567

6668
/*
@@ -79,7 +81,10 @@ static long getBackoffTimeSeconds(long errorCount) {
7981
}
8082

8183
protected boolean shouldEndRequest() {
82-
final long written = deflater.getBytesWritten() + DslJsonSerializer.BUFFER_SIZE;
84+
if (countingOs == null) {
85+
return false;
86+
}
87+
final long written = countingOs.getCount() + payloadSerializer.getBufferSize();
8388
final boolean endRequest = written >= reporterConfiguration.getApiRequestSize();
8489
if (endRequest && logger.isDebugEnabled()) {
8590
logger.debug("Flushing, because request size limit exceeded {}/{}", written, reporterConfiguration.getApiRequestSize());
@@ -92,23 +97,26 @@ protected HttpURLConnection startRequest(String endpoint) throws Exception {
9297
payloadSerializer.blockUntilReady();
9398
final HttpURLConnection connection = apmServerClient.startRequest(endpoint);
9499
if (connection != null) {
95-
if (isLocalhost(connection)) {
96-
deflater.setLevel(Deflater.NO_COMPRESSION);
97-
} else {
98-
deflater.setLevel(Deflater.BEST_SPEED);
99-
}
100+
boolean useCompression = !isLocalhost(connection);
100101
try {
101102
if (logger.isDebugEnabled()) {
102103
logger.debug("Starting new request to {}", connection.getURL());
103104
}
104105
connection.setRequestMethod("POST");
105106
connection.setDoOutput(true);
106107
connection.setChunkedStreamingMode(DslJsonSerializer.BUFFER_SIZE);
107-
connection.setRequestProperty("Content-Encoding", "deflate");
108+
if (useCompression) {
109+
connection.setRequestProperty("Content-Encoding", "deflate");
110+
}
108111
connection.setRequestProperty("Content-Type", "application/x-ndjson");
109112
connection.setUseCaches(false);
110113
connection.connect();
111-
os = new DeflaterOutputStream(connection.getOutputStream(), deflater, true);
114+
countingOs = new CountingOutputStream(connection.getOutputStream());
115+
if (useCompression) {
116+
os = new DeflaterOutputStream(countingOs, deflater, true);
117+
} else {
118+
os = countingOs;
119+
}
112120
payloadSerializer.setOutputStream(os);
113121
payloadSerializer.appendMetaDataNdJsonToStream();
114122
payloadSerializer.flushToOutputStream();
@@ -181,6 +189,7 @@ public void endRequest() {
181189
HttpUtils.consumeAndClose(connection);
182190
connection = null;
183191
os = null;
192+
countingOs = null;
184193
deflater.reset();
185194
currentlyTransmitting = 0;
186195
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package co.elastic.apm.agent.report;
2+
3+
import javax.annotation.Nonnull;
4+
import java.io.IOException;
5+
import java.io.OutputStream;
6+
7+
class CountingOutputStream extends OutputStream {
8+
9+
private final OutputStream out;
10+
private long count;
11+
12+
CountingOutputStream(OutputStream out) {
13+
this.out = out;
14+
}
15+
16+
@Override
17+
public void write(int b) throws IOException {
18+
out.write(b);
19+
count++;
20+
}
21+
22+
@Override
23+
public void write(@Nonnull byte[] b, int off, int len) throws IOException {
24+
out.write(b, off, len);
25+
count += len;
26+
}
27+
28+
@Override
29+
public void close() throws IOException {
30+
out.close();
31+
}
32+
33+
@Override
34+
public void flush() throws IOException {
35+
out.flush();
36+
}
37+
38+
public long getCount() {
39+
return count;
40+
}
41+
}

apm-agent-core/src/test/java/co/elastic/apm/agent/report/ApmServerReporterIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@
4444
import org.junit.jupiter.api.Test;
4545
import org.stagemonitor.configuration.ConfigurationRegistry;
4646

47+
import java.io.InputStream;
4748
import java.net.InetSocketAddress;
4849
import java.net.URL;
4950
import java.util.Collections;
5051
import java.util.concurrent.TimeUnit;
5152
import java.util.concurrent.atomic.AtomicInteger;
52-
import java.util.zip.InflaterInputStream;
5353

5454
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
5555
import static org.awaitility.Awaitility.await;
@@ -92,7 +92,7 @@ void setUp() throws Exception {
9292
handler = new BlockingHandler(exchange -> {
9393
if (statusCode < 300 && exchange.getRequestPath().equals("/intake/v2/events")) {
9494
receivedIntakeApiCalls.incrementAndGet();
95-
InflaterInputStream in = new InflaterInputStream(exchange.getInputStream());
95+
InputStream in = exchange.getInputStream();
9696
try (in) {
9797
for (int n = 0; -1 != n; n = in.read()) {
9898
if (n == '\n') {

apm-agent-core/src/test/java/co/elastic/apm/agent/report/IntakeV2ReportingEventHandlerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.util.List;
5656
import java.util.stream.Collectors;
5757
import java.util.stream.Stream;
58-
import java.util.zip.InflaterInputStream;
5958

6059
import static co.elastic.apm.agent.report.IntakeV2ReportingEventHandler.INTAKE_V2_URL;
6160
import static com.github.tomakehurst.wiremock.client.WireMock.ok;
@@ -262,7 +261,7 @@ private List<JsonNode> getNdJsonNodes() {
262261
return Stream.of(mockApmServer1, mockApmServer2)
263262
.flatMap(apmServer -> apmServer.findAll(postRequestedFor(urlEqualTo(INTAKE_V2_URL))).stream())
264263
.findFirst()
265-
.map(request -> new BufferedReader(new InputStreamReader(new InflaterInputStream(new ByteArrayInputStream(request.getBody()))))
264+
.map(request -> new BufferedReader(new InputStreamReader(new ByteArrayInputStream(request.getBody())))
266265
.lines()
267266
.map(IntakeV2ReportingEventHandlerTest::getReadTree)
268267
.collect(Collectors.toList()))

apm-agent-plugins/apm-log-shipper-plugin/src/test/java/co/elastic/apm/agent/log/shipper/ApmServerLogShipperTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.util.concurrent.Future;
5252
import java.util.concurrent.TimeUnit;
5353
import java.util.stream.Collectors;
54-
import java.util.zip.InflaterInputStream;
5554

5655
import static com.github.tomakehurst.wiremock.client.WireMock.get;
5756
import static com.github.tomakehurst.wiremock.client.WireMock.ok;
@@ -143,7 +142,7 @@ void testSendLogsAfterServerUrlsSet() throws Exception {
143142
private List<String> getEvents() {
144143
return mockApmServer.findAll(postRequestedFor(urlEqualTo(ApmServerLogShipper.LOGS_ENDPOINT)))
145144
.stream()
146-
.flatMap(request -> new BufferedReader(new InputStreamReader(new InflaterInputStream(new ByteArrayInputStream(request.getBody())))).lines())
145+
.flatMap(request -> new BufferedReader(new InputStreamReader(new ByteArrayInputStream(request.getBody()))).lines())
147146
.collect(Collectors.toList());
148147
}
149148
}

0 commit comments

Comments
 (0)