Skip to content

Commit a68a11f

Browse files
committed
enable compression in http post sink
1 parent 0c4ac29 commit a68a11f

File tree

9 files changed

+139
-19
lines changed

9 files changed

+139
-19
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@
140140
<mockito.version>5.18.0</mockito.version>
141141
<netty.version>4.0.56.Final</netty.version>
142142
<netty.all.version>4.0.21.Final</netty.all.version>
143+
<netty.codec.http.version>4.1.119.Final</netty.codec.http.version>
143144
<ning.http.client.version>1.9.31</ning.http.client.version>
144145
<oval.version>3.2.1</oval.version>
145146
<play.ws.version>3.0.7</play.ws.version>
@@ -866,6 +867,11 @@
866867
<artifactId>async-http-client</artifactId>
867868
<version>${asynchttpclient.version}</version>
868869
</dependency>
870+
<dependency>
871+
<groupId>io.netty</groupId>
872+
<artifactId>netty-codec-http</artifactId>
873+
<version>${netty.codec.http.version}</version>
874+
</dependency>
869875
<dependency>
870876
<groupId>org.playframework</groupId>
871877
<artifactId>play-ahc-ws-standalone_${scala.version}</artifactId>

src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,27 @@ public Optional<Long> getPopulationSize() {
4040
return _populationSize;
4141
}
4242

43+
public Long getRequestBodySize() {
44+
return _requestBodySize;
45+
}
46+
47+
public Long getRequestBodyEncodedSize() {
48+
return _requestBodyEncodedSize;
49+
}
50+
4351
private RequestEntry(final Builder builder) {
4452
_request = builder._request;
4553
_enterTime = builder._enterTime;
4654
_populationSize = builder._populationSize;
55+
_requestBodySize = builder._requestBodySize;
56+
_requestBodyEncodedSize = builder._requestBodyEncodedSize;
4757
}
4858

4959
private final Request _request;
5060
private final Instant _enterTime;
5161
private final Optional<Long> _populationSize;
62+
private final Long _requestBodySize;
63+
private final Long _requestBodyEncodedSize;
5264

5365
/**
5466
* {@link com.arpnetworking.commons.builder.Builder} implementation for
@@ -99,12 +111,38 @@ public Builder setPopulationSize(final Optional<Long> value) {
99111
return this;
100112
}
101113

114+
/**
115+
* Set the request body size. Cannot be null.
116+
*
117+
* @param value The request body size.
118+
* @return This {@link Builder} instance.
119+
*/
120+
public Builder setRequestBodySize(final Long value) {
121+
_requestBodySize = value;
122+
return this;
123+
}
124+
125+
/**
126+
* Set the request body encoded size. Cannot be null.
127+
*
128+
* @param value The request body encoded size.
129+
* @return This {@link Builder} instance.
130+
*/
131+
public Builder setRequestBodyEncodedSize(final Long value) {
132+
_requestBodyEncodedSize = value;
133+
return this;
134+
}
135+
102136
@NotNull
103137
private Request _request;
104138
@NotNull
105139
private Instant _enterTime;
106140
@NotNull
107141
private Optional<Long> _populationSize = Optional.empty();
142+
@NotNull
143+
private Long _requestBodySize = null;
144+
@NotNull
145+
private Long _requestBodyEncodedSize = null;
108146
}
109147
}
110148

src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,15 @@ private static List<String> createTags(final PeriodicData periodicData, final Ag
119119
}
120120

121121
@Override
122-
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
123-
return new RequestBuilder()
122+
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
123+
final Request request = new RequestBuilder()
124124
.setUri(getAysncHttpClientUri())
125125
.setBody(serializedData)
126126
.setMethod(HttpMethods.POST.value())
127127
.setHeader("Content-Type", "application/json")
128128
.addQueryParam("api_key", _apiKey)
129129
.build();
130+
return new RequestInfo(request, serializedData.length, serializedData.length);
130131
}
131132

132133
private DataDogSink(final Builder builder) {

src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import com.fasterxml.jackson.annotation.JacksonInject;
2626
import com.google.common.collect.ImmutableSet;
2727
import com.google.common.collect.Lists;
28+
import io.netty.handler.codec.http.HttpHeaderNames;
29+
import io.netty.handler.codec.http.HttpHeaderValues;
2830
import net.sf.oval.Validator;
2931
import net.sf.oval.constraint.CheckWith;
3032
import net.sf.oval.constraint.CheckWithCheck;
@@ -33,7 +35,6 @@
3335
import net.sf.oval.context.OValContext;
3436
import org.apache.pekko.actor.ActorRef;
3537
import org.apache.pekko.actor.ActorSystem;
36-
import org.apache.pekko.http.javadsl.model.HttpMethods;
3738
import org.apache.pekko.http.javadsl.model.MediaTypes;
3839
import org.apache.pekko.http.javadsl.model.StatusCodes;
3940
import org.apache.pekko.pattern.Patterns;
@@ -44,14 +45,18 @@
4445
import org.asynchttpclient.Request;
4546
import org.asynchttpclient.RequestBuilder;
4647
import org.asynchttpclient.uri.Uri;
48+
import org.asynchttpclient.util.HttpConstants;
4749

50+
import java.io.ByteArrayOutputStream;
51+
import java.io.IOException;
4852
import java.io.Serial;
4953
import java.net.URI;
5054
import java.time.Duration;
5155
import java.util.Collection;
5256
import java.util.Optional;
5357
import java.util.concurrent.CompletionStage;
5458
import java.util.function.Function;
59+
import java.util.zip.GZIPOutputStream;
5560

5661
/**
5762
* Publishes to an HTTP endpoint. This class is thread safe.
@@ -103,13 +108,33 @@ public Object toLogValue() {
103108
* @param serializedData The serialized data.
104109
* @return {@link Request} to execute
105110
*/
106-
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
107-
return new RequestBuilder()
108-
.setUri(_aysncHttpClientUri)
109-
.setHeader("Content-Type", MediaTypes.APPLICATION_JSON.toString())
110-
.setBody(serializedData)
111-
.setMethod(HttpMethods.POST.value())
112-
.build();
111+
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
112+
final byte[] bodyData;
113+
if (_enableCompression) {
114+
try {
115+
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
116+
final GZIPOutputStream gzipStream = new GZIPOutputStream(bos);
117+
gzipStream.write(serializedData);
118+
gzipStream.close();
119+
bodyData = bos.toByteArray();
120+
} catch (final IOException e) {
121+
throw new RuntimeException(e);
122+
}
123+
} else {
124+
bodyData = serializedData;
125+
}
126+
127+
final RequestBuilder requestBuilder = new RequestBuilder()
128+
.setUri(getAysncHttpClientUri())
129+
.setHeader(HttpHeaderNames.CONTENT_TYPE, MediaTypes.APPLICATION_JSON.toString())
130+
.setBody(bodyData)
131+
.setMethod(HttpConstants.Methods.POST);
132+
133+
if (_enableCompression) {
134+
requestBuilder.setHeader(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
135+
}
136+
137+
return new RequestInfo(requestBuilder.build(), serializedData.length, bodyData.length);
113138
}
114139

115140
/**
@@ -130,8 +155,12 @@ protected Collection<RequestEntry.Builder> createRequests(
130155
// Unfortunately, the split builder logic across HttpPostSink and
131156
// HttpSinkActor does not permit this as-is. The logic would need
132157
// to be refactored to permit the use of a TLB.
158+
final byte[] datum = serializedDatum.getDatum();
159+
final RequestInfo requestInfo = createRequest(client, datum);
133160
requestEntryBuilders.add(new RequestEntry.Builder()
134-
.setRequest(createRequest(client, serializedDatum.getDatum()))
161+
.setRequest(requestInfo.request())
162+
.setRequestBodySize(requestInfo.bodySize())
163+
.setRequestBodyEncodedSize(requestInfo.encodedSize())
135164
.setPopulationSize(serializedDatum.getPopulationSize()));
136165
}
137166
return requestEntryBuilders;
@@ -223,6 +252,7 @@ protected HttpPostSink(final Builder<?, ?> builder) {
223252
_maximumDelay = builder._maximumDelay;
224253
_acceptedStatusCodes = builder._acceptedStatusCodes;
225254
_retryableStatusCodes = builder._retryableStatusCodes;
255+
_enableCompression = builder._enableCompression;
226256
}
227257

228258
/**
@@ -250,6 +280,7 @@ protected void start(final Builder<?, ?> builder) {
250280
private final Duration _maximumDelay;
251281
private final ImmutableSet<Integer> _acceptedStatusCodes;
252282
private final ImmutableSet<Integer> _retryableStatusCodes;
283+
private final boolean _enableCompression;
253284

254285
private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
255286
private static final AsyncHttpClient CLIENT;
@@ -407,6 +438,18 @@ public B setRetryableStatusCodes(final ImmutableSet<Integer> value) {
407438
return self();
408439
}
409440

441+
/**
442+
* Sets whether to enable request compression.
443+
* Optional. Defaults to true.
444+
*
445+
* @param value true to enable compression
446+
* @return This instance of {@link Builder}.
447+
*/
448+
public B setEnableCompression(final Boolean value) {
449+
_enableCompression = value;
450+
return self();
451+
}
452+
410453
/**
411454
* Protected constructor for subclasses.
412455
*
@@ -444,6 +487,8 @@ protected Builder(final Function<B, S> targetConstructor) {
444487
private ImmutableSet<Integer> _acceptedStatusCodes = DEFAULT_ACCEPTED_STATUS_CODES;
445488
@NotNull
446489
private ImmutableSet<Integer> _retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES;
490+
@NotNull
491+
private Boolean _enableCompression = false;
447492

448493
private static final ImmutableSet<Integer> DEFAULT_ACCEPTED_STATUS_CODES;
449494
private static final ImmutableSet<Integer> DEFAULT_RETRYABLE_STATUS_CODES;
@@ -486,6 +531,19 @@ public boolean isSatisfied(
486531
}
487532
}
488533

534+
/**
535+
* Represents metadata for a request in the HttpPostSink. This record encapsulates a request
536+
* alongside its associated sizes, including the body size and the encoded size.
537+
*
538+
* The metadata is used to track details about the request and its payload, such as the size of
539+
* the data being transmitted to provide diagnostic or operational insights.
540+
*
541+
* @param request The {@link Request} object that represents the HTTP request to be sent.
542+
* @param bodySize The size of the request body in bytes.
543+
* @param encodedSize The size of the encoded request body in bytes.
544+
*/
545+
record RequestInfo(Request request, long bodySize, long encodedSize) { }
546+
489547
static final class SerializedDatum {
490548
SerializedDatum(final byte[] datum, final Optional<Long> populationSize) {
491549
_datum = datum;

src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public HttpSinkActor(
113113
}
114114
_evictedRequestsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/evicted_requests";
115115
_requestLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_latency";
116+
_requestBodySizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_size";
117+
_requestBodyEncodedSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_encoded_size";
116118
_inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time";
117119
_pendingRequestsQueueSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_size";
118120
_inflightRequestsCountName = "sinks/http_post/" + _sink.getMetricSafeName() + "/inflight_count";
@@ -121,6 +123,7 @@ public HttpSinkActor(
121123
_httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts";
122124
_samplesSentName = "sinks/http_post/" + sink.getMetricSafeName() + "/samples_sent";
123125
_samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped";
126+
124127
}
125128

126129
@Override
@@ -499,6 +502,14 @@ private void fireRequest(final RequestEntry request, final int attempt) {
499502
}
500503
});
501504
Patterns.pipe(responsePromise, context().dispatcher()).to(self());
505+
_periodicMetrics.recordCounter(
506+
_requestBodySizeName,
507+
request.getRequestBodySize()
508+
);
509+
_periodicMetrics.recordCounter(
510+
_requestBodyEncodedSizeName,
511+
request.getRequestBodyEncodedSize()
512+
);
502513
_client.executeRequest(request.getRequest(), new ResponseAsyncCompletionHandler(promise));
503514
}
504515

@@ -547,6 +558,8 @@ private void scheduleRetry(final RequestEntry requestEntry, final int attempt) {
547558
private final String _httpSinkAttemptsName;
548559
private final String _samplesSentName;
549560
private final String _samplesDroppedName;
561+
private final String _requestBodySizeName;
562+
private final String _requestBodyEncodedSizeName;
550563

551564
private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkActor.class);
552565
private static final Logger POST_RETRY_LOGGER = LoggerFactory.getRateLimitLogger(HttpSinkActor.class, Duration.ofSeconds(30));

src/main/java/com/arpnetworking/tsdcore/sinks/InfluxDbSink.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ public final class InfluxDbSink extends HttpPostSink {
4444

4545

4646
@Override
47-
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
48-
return new RequestBuilder()
47+
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
48+
final Request request = new RequestBuilder()
4949
.setUri(getAysncHttpClientUri())
5050
.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
5151
.setBody(serializedData)
5252
.setMethod(HttpMethods.POST.value())
5353
.build();
54+
return new RequestInfo(request, serializedData.length, serializedData.length);
5455
}
5556

5657
@Override

src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,14 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
137137
}
138138

139139
@Override
140-
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
141-
return new RequestBuilder()
140+
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
141+
final Request request = new RequestBuilder()
142142
.setUri(getAysncHttpClientUri())
143143
.setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString())
144144
.setBody(serializedData)
145145
.setMethod(HttpMethods.POST.value())
146146
.build();
147+
return new RequestInfo(request, serializedData.length, serializedData.length);
147148
}
148149

149150
private int serializeCondition(

src/main/java/com/arpnetworking/tsdcore/sinks/MonitordSink.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,14 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
146146
}
147147

148148
@Override
149-
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
150-
return new RequestBuilder()
149+
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
150+
final Request request = new RequestBuilder()
151151
.setUri(getAysncHttpClientUri())
152152
.setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString())
153153
.setBody(serializedData)
154154
.setMethod(HttpMethods.POST.value())
155155
.build();
156+
return new RequestInfo(request, serializedData.length, serializedData.length);
156157
}
157158

158159
private Multimap<String, Condition> prepareConditions(final Collection<Condition> conditions) {

src/main/java/com/arpnetworking/tsdcore/sinks/SignalFxSink.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
9999
}
100100

101101
@Override
102-
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
103-
return new RequestBuilder()
102+
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
103+
final Request request = new RequestBuilder()
104104
.setUri(getAysncHttpClientUri())
105105
.setBody(serializedData)
106106
.setMethod(HttpMethods.POST.value())
@@ -110,6 +110,7 @@ protected Request createRequest(final AsyncHttpClient client, final byte[] seria
110110
.setHeader("X-SF-TOKEN", _apiToken)
111111
.addQueryParam("orgid", _organizationId)
112112
.build();
113+
return new RequestInfo(request, serializedData.length, serializedData.length);
113114
}
114115

115116
private SignalFxProtocolBuffers.DataPoint createDataPoint(

0 commit comments

Comments
 (0)