diff --git a/pom.xml b/pom.xml
index 034d30d7..fecc547d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,6 +140,7 @@
5.18.0
4.0.56.Final
4.0.21.Final
+ 4.1.119.Final
1.9.31
3.2.1
3.0.7
@@ -866,6 +867,11 @@
async-http-client
${asynchttpclient.version}
+
+ io.netty
+ netty-codec-http
+ ${netty.codec.http.version}
+
org.playframework
play-ahc-ws-standalone_${scala.version}
diff --git a/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java b/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java
index b656ca01..5580bf19 100644
--- a/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java
+++ b/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java
@@ -40,15 +40,27 @@ public Optional getPopulationSize() {
return _populationSize;
}
+ public Long getRequestBodyBytes() {
+ return _requestBodyBytes;
+ }
+
+ public Long getRequestBodyCompressedBytes() {
+ return _requestBodyCompressedBytes;
+ }
+
private RequestEntry(final Builder builder) {
_request = builder._request;
_enterTime = builder._enterTime;
_populationSize = builder._populationSize;
+ _requestBodyBytes = builder._requestBodyBytes;
+ _requestBodyCompressedBytes = builder._requestBodyCompressedBytes;
}
private final Request _request;
private final Instant _enterTime;
private final Optional _populationSize;
+ private final Long _requestBodyBytes;
+ private final Long _requestBodyCompressedBytes;
/**
* {@link com.arpnetworking.commons.builder.Builder} implementation for
@@ -99,12 +111,38 @@ public Builder setPopulationSize(final Optional value) {
return this;
}
+ /**
+ * Set the request body size in bytes. Required. Cannot be null.
+ *
+ * @param value The request body size in bytes.
+ * @return This {@link Builder} instance.
+ */
+ public Builder setRequestBodyBytes(final Long value) {
+ _requestBodyBytes = value;
+ return this;
+ }
+
+ /**
+ * Set the request body compressed size in bytes. Required. Cannot be null.
+ *
+ * @param value The request body compressed size in bytes.
+ * @return This {@link Builder} instance.
+ */
+ public Builder setRequestBodyCompressedBytes(final Long value) {
+ _requestBodyCompressedBytes = value;
+ return this;
+ }
+
@NotNull
private Request _request;
@NotNull
private Instant _enterTime;
@NotNull
private Optional _populationSize = Optional.empty();
+ @NotNull
+ private Long _requestBodyBytes;
+ @NotNull
+ private Long _requestBodyCompressedBytes;
}
}
diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java
index a28efa8d..277e3a0d 100644
--- a/src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java
+++ b/src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java
@@ -119,14 +119,15 @@ private static List createTags(final PeriodicData periodicData, final Ag
}
@Override
- protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
- return new RequestBuilder()
+ protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
+ final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.setHeader("Content-Type", "application/json")
.addQueryParam("api_key", _apiKey)
.build();
+ return new RequestInfo(request, serializedData.length, serializedData.length);
}
private DataDogSink(final Builder builder) {
diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java
index 83eccf39..d4d80de7 100644
--- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java
+++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java
@@ -25,6 +25,8 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
import net.sf.oval.Validator;
import net.sf.oval.constraint.CheckWith;
import net.sf.oval.constraint.CheckWithCheck;
@@ -33,7 +35,6 @@
import net.sf.oval.context.OValContext;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
-import org.apache.pekko.http.javadsl.model.HttpMethods;
import org.apache.pekko.http.javadsl.model.MediaTypes;
import org.apache.pekko.http.javadsl.model.StatusCodes;
import org.apache.pekko.pattern.Patterns;
@@ -44,7 +45,10 @@
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.uri.Uri;
+import org.asynchttpclient.util.HttpConstants;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.Serial;
import java.net.URI;
import java.time.Duration;
@@ -52,6 +56,7 @@
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
+import java.util.zip.GZIPOutputStream;
/**
* Publishes to an HTTP endpoint. This class is thread safe.
@@ -103,13 +108,33 @@ public Object toLogValue() {
* @param serializedData The serialized data.
* @return {@link Request} to execute
*/
- protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
- return new RequestBuilder()
- .setUri(_aysncHttpClientUri)
- .setHeader("Content-Type", MediaTypes.APPLICATION_JSON.toString())
- .setBody(serializedData)
- .setMethod(HttpMethods.POST.value())
- .build();
+ protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
+ final byte[] bodyData;
+ if (_enableCompression) {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ GZIPOutputStream gzipStream = new GZIPOutputStream(bos)) {
+
+ gzipStream.write(serializedData);
+ gzipStream.flush();
+ bodyData = bos.toByteArray();
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ bodyData = serializedData;
+ }
+
+ final RequestBuilder requestBuilder = new RequestBuilder()
+ .setUri(getAysncHttpClientUri())
+ .setHeader(HttpHeaderNames.CONTENT_TYPE, MediaTypes.APPLICATION_JSON.toString())
+ .setBody(bodyData)
+ .setMethod(HttpConstants.Methods.POST);
+
+ if (_enableCompression) {
+ requestBuilder.setHeader(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
+ }
+
+ return new RequestInfo(requestBuilder.build(), serializedData.length, bodyData.length);
}
/**
@@ -130,8 +155,12 @@ protected Collection createRequests(
// Unfortunately, the split builder logic across HttpPostSink and
// HttpSinkActor does not permit this as-is. The logic would need
// to be refactored to permit the use of a TLB.
+ final byte[] datum = serializedDatum.getDatum();
+ final RequestInfo requestInfo = createRequest(client, datum);
requestEntryBuilders.add(new RequestEntry.Builder()
- .setRequest(createRequest(client, serializedDatum.getDatum()))
+ .setRequest(requestInfo.request())
+ .setRequestBodyBytes(requestInfo.bodySize())
+ .setRequestBodyCompressedBytes(requestInfo.encodedSize())
.setPopulationSize(serializedDatum.getPopulationSize()));
}
return requestEntryBuilders;
@@ -223,6 +252,7 @@ protected HttpPostSink(final Builder, ?> builder) {
_maximumDelay = builder._maximumDelay;
_acceptedStatusCodes = builder._acceptedStatusCodes;
_retryableStatusCodes = builder._retryableStatusCodes;
+ _enableCompression = builder._enableCompression;
}
/**
@@ -250,6 +280,7 @@ protected void start(final Builder, ?> builder) {
private final Duration _maximumDelay;
private final ImmutableSet _acceptedStatusCodes;
private final ImmutableSet _retryableStatusCodes;
+ private final boolean _enableCompression;
private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
private static final AsyncHttpClient CLIENT;
@@ -407,10 +438,22 @@ public B setRetryableStatusCodes(final ImmutableSet value) {
return self();
}
+ /**
+ * Sets whether to enable request compression.
+ * Optional. Defaults to false.
+ *
+ * @param value true to enable compression
+ * @return This instance of {@link Builder}.
+ */
+ public B setEnableCompression(final Boolean value) {
+ _enableCompression = value;
+ return self();
+ }
+
/**
* Protected constructor for subclasses.
*
- * @param targetConstructor The constructor for the concrete type to be created by this builder.
+ * @param targetConstructor The constructor for this builder to create the concrete type.
*/
protected Builder(final Function targetConstructor) {
super(targetConstructor);
@@ -444,6 +487,8 @@ protected Builder(final Function targetConstructor) {
private ImmutableSet _acceptedStatusCodes = DEFAULT_ACCEPTED_STATUS_CODES;
@NotNull
private ImmutableSet _retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES;
+ @NotNull
+ private Boolean _enableCompression = false;
private static final ImmutableSet DEFAULT_ACCEPTED_STATUS_CODES;
private static final ImmutableSet DEFAULT_RETRYABLE_STATUS_CODES;
@@ -486,6 +531,19 @@ public boolean isSatisfied(
}
}
+ /**
+ * Represents metadata for a request in the HttpPostSink. This record encapsulates a request
+ * alongside its associated sizes, including the body size and the encoded size.
+ *
+ * The metadata is used to track details about the request and its payload, such as the size of
+ * the data being transmitted to provide diagnostic or operational insights.
+ *
+ * @param request The {@link Request} object that represents the HTTP request to be sent.
+ * @param bodySize The size of the request body in bytes.
+ * @param encodedSize The size of the encoded request body in bytes.
+ */
+ protected record RequestInfo(Request request, long bodySize, long encodedSize) { }
+
static final class SerializedDatum {
SerializedDatum(final byte[] datum, final Optional populationSize) {
_datum = datum;
diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java
index d2fe1c94..45e77d55 100644
--- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java
+++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java
@@ -113,6 +113,8 @@ public HttpSinkActor(
}
_evictedRequestsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/evicted_requests";
_requestLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_latency";
+ _requestBodyBytesName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_bytes";
+ _requestBodyCompressedBytesName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_compressed_bytes";
_inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time";
_pendingRequestsQueueSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_size";
_inflightRequestsCountName = "sinks/http_post/" + _sink.getMetricSafeName() + "/inflight_count";
@@ -121,6 +123,7 @@ public HttpSinkActor(
_httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts";
_samplesSentName = "sinks/http_post/" + sink.getMetricSafeName() + "/samples_sent";
_samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped";
+
}
@Override
@@ -499,6 +502,14 @@ private void fireRequest(final RequestEntry request, final int attempt) {
}
});
Patterns.pipe(responsePromise, context().dispatcher()).to(self());
+ _periodicMetrics.recordCounter(
+ _requestBodyBytesName,
+ request.getRequestBodyBytes()
+ );
+ _periodicMetrics.recordCounter(
+ _requestBodyCompressedBytesName,
+ request.getRequestBodyCompressedBytes()
+ );
_client.executeRequest(request.getRequest(), new ResponseAsyncCompletionHandler(promise));
}
@@ -547,6 +558,8 @@ private void scheduleRetry(final RequestEntry requestEntry, final int attempt) {
private final String _httpSinkAttemptsName;
private final String _samplesSentName;
private final String _samplesDroppedName;
+ private final String _requestBodyBytesName;
+ private final String _requestBodyCompressedBytesName;
private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkActor.class);
private static final Logger POST_RETRY_LOGGER = LoggerFactory.getRateLimitLogger(HttpSinkActor.class, Duration.ofSeconds(30));
diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/InfluxDbSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/InfluxDbSink.java
index 5e50be24..fbdba8cb 100644
--- a/src/main/java/com/arpnetworking/tsdcore/sinks/InfluxDbSink.java
+++ b/src/main/java/com/arpnetworking/tsdcore/sinks/InfluxDbSink.java
@@ -44,13 +44,14 @@ public final class InfluxDbSink extends HttpPostSink {
@Override
- protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
- return new RequestBuilder()
+ protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
+ final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
+ return new RequestInfo(request, serializedData.length, serializedData.length);
}
@Override
diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java
index 33711163..05584739 100644
--- a/src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java
+++ b/src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java
@@ -137,13 +137,14 @@ protected Collection serialize(final PeriodicData periodicData)
}
@Override
- protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
- return new RequestBuilder()
+ protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
+ final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
+ return new RequestInfo(request, serializedData.length, serializedData.length);
}
private int serializeCondition(
diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/MonitordSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/MonitordSink.java
index 1b2a94a4..7ff61e15 100644
--- a/src/main/java/com/arpnetworking/tsdcore/sinks/MonitordSink.java
+++ b/src/main/java/com/arpnetworking/tsdcore/sinks/MonitordSink.java
@@ -146,13 +146,14 @@ protected Collection serialize(final PeriodicData periodicData)
}
@Override
- protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
- return new RequestBuilder()
+ protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
+ final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
+ return new RequestInfo(request, serializedData.length, serializedData.length);
}
private Multimap prepareConditions(final Collection conditions) {
diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/SignalFxSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/SignalFxSink.java
index c0f72193..f6cde278 100644
--- a/src/main/java/com/arpnetworking/tsdcore/sinks/SignalFxSink.java
+++ b/src/main/java/com/arpnetworking/tsdcore/sinks/SignalFxSink.java
@@ -99,8 +99,8 @@ protected Collection serialize(final PeriodicData periodicData)
}
@Override
- protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
- return new RequestBuilder()
+ protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
+ final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
@@ -110,6 +110,7 @@ protected Request createRequest(final AsyncHttpClient client, final byte[] seria
.setHeader("X-SF-TOKEN", _apiToken)
.addQueryParam("orgid", _organizationId)
.build();
+ return new RequestInfo(request, serializedData.length, serializedData.length);
}
private SignalFxProtocolBuffers.DataPoint createDataPoint(