From 5fabb242814a5a35294497b3c12d376a8c001716 Mon Sep 17 00:00:00 2001 From: Brandon Arp Date: Thu, 7 Aug 2025 15:20:25 -0700 Subject: [PATCH 1/3] enable compression in http post sink --- pom.xml | 6 ++ .../tsdcore/model/RequestEntry.java | 38 ++++++++++ .../tsdcore/sinks/DataDogSink.java | 5 +- .../tsdcore/sinks/HttpPostSink.java | 76 ++++++++++++++++--- .../tsdcore/sinks/HttpSinkActor.java | 13 ++++ .../tsdcore/sinks/InfluxDbSink.java | 5 +- .../tsdcore/sinks/KMonDSink.java | 5 +- .../tsdcore/sinks/MonitordSink.java | 5 +- .../tsdcore/sinks/SignalFxSink.java | 5 +- 9 files changed, 139 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index 034d30d7..fecc547d 100644 --- a/pom.xml +++ b/pom.xml @@ -140,6 +140,7 @@ 5.18.0 4.0.56.Final 4.0.21.Final + 4.1.119.Final 1.9.31 3.2.1 3.0.7 @@ -866,6 +867,11 @@ async-http-client ${asynchttpclient.version} + + io.netty + netty-codec-http + ${netty.codec.http.version} + org.playframework play-ahc-ws-standalone_${scala.version} diff --git a/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java b/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java index b656ca01..c9935fe7 100644 --- a/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java +++ b/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java @@ -40,15 +40,27 @@ public Optional getPopulationSize() { return _populationSize; } + public Long getRequestBodySize() { + return _requestBodySize; + } + + public Long getRequestBodyEncodedSize() { + return _requestBodyEncodedSize; + } + private RequestEntry(final Builder builder) { _request = builder._request; _enterTime = builder._enterTime; _populationSize = builder._populationSize; + _requestBodySize = builder._requestBodySize; + _requestBodyEncodedSize = builder._requestBodyEncodedSize; } private final Request _request; private final Instant _enterTime; private final Optional _populationSize; + private final Long _requestBodySize; + private final Long _requestBodyEncodedSize; /** * {@link com.arpnetworking.commons.builder.Builder} implementation for @@ -99,12 +111,38 @@ public Builder setPopulationSize(final Optional value) { return this; } + /** + * Set the request body size. Cannot be null. + * + * @param value The request body size. + * @return This {@link Builder} instance. + */ + public Builder setRequestBodySize(final Long value) { + _requestBodySize = value; + return this; + } + + /** + * Set the request body encoded size. Cannot be null. + * + * @param value The request body encoded size. + * @return This {@link Builder} instance. + */ + public Builder setRequestBodyEncodedSize(final Long value) { + _requestBodyEncodedSize = value; + return this; + } + @NotNull private Request _request; @NotNull private Instant _enterTime; @NotNull private Optional _populationSize = Optional.empty(); + @NotNull + private Long _requestBodySize = null; + @NotNull + private Long _requestBodyEncodedSize = null; } } diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java index a28efa8d..277e3a0d 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/DataDogSink.java @@ -119,14 +119,15 @@ private static List createTags(final PeriodicData periodicData, final Ag } @Override - protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) { - return new RequestBuilder() + protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) { + final Request request = new RequestBuilder() .setUri(getAysncHttpClientUri()) .setBody(serializedData) .setMethod(HttpMethods.POST.value()) .setHeader("Content-Type", "application/json") .addQueryParam("api_key", _apiKey) .build(); + return new RequestInfo(request, serializedData.length, serializedData.length); } private DataDogSink(final Builder builder) { diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java index 83eccf39..7e7b362b 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java @@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; import net.sf.oval.Validator; import net.sf.oval.constraint.CheckWith; import net.sf.oval.constraint.CheckWithCheck; @@ -33,7 +35,6 @@ import net.sf.oval.context.OValContext; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.http.javadsl.model.HttpMethods; import org.apache.pekko.http.javadsl.model.MediaTypes; import org.apache.pekko.http.javadsl.model.StatusCodes; import org.apache.pekko.pattern.Patterns; @@ -44,7 +45,10 @@ import org.asynchttpclient.Request; import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.uri.Uri; +import org.asynchttpclient.util.HttpConstants; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.Serial; import java.net.URI; import java.time.Duration; @@ -52,6 +56,7 @@ import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.function.Function; +import java.util.zip.GZIPOutputStream; /** * Publishes to an HTTP endpoint. This class is thread safe. @@ -103,13 +108,33 @@ public Object toLogValue() { * @param serializedData The serialized data. * @return {@link Request} to execute */ - protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) { - return new RequestBuilder() - .setUri(_aysncHttpClientUri) - .setHeader("Content-Type", MediaTypes.APPLICATION_JSON.toString()) - .setBody(serializedData) - .setMethod(HttpMethods.POST.value()) - .build(); + protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) { + final byte[] bodyData; + if (_enableCompression) { + try { + final ByteArrayOutputStream bos = new ByteArrayOutputStream(); + final GZIPOutputStream gzipStream = new GZIPOutputStream(bos); + gzipStream.write(serializedData); + gzipStream.close(); + bodyData = bos.toByteArray(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } else { + bodyData = serializedData; + } + + final RequestBuilder requestBuilder = new RequestBuilder() + .setUri(getAysncHttpClientUri()) + .setHeader(HttpHeaderNames.CONTENT_TYPE, MediaTypes.APPLICATION_JSON.toString()) + .setBody(bodyData) + .setMethod(HttpConstants.Methods.POST); + + if (_enableCompression) { + requestBuilder.setHeader(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); + } + + return new RequestInfo(requestBuilder.build(), serializedData.length, bodyData.length); } /** @@ -130,8 +155,12 @@ protected Collection createRequests( // Unfortunately, the split builder logic across HttpPostSink and // HttpSinkActor does not permit this as-is. The logic would need // to be refactored to permit the use of a TLB. + final byte[] datum = serializedDatum.getDatum(); + final RequestInfo requestInfo = createRequest(client, datum); requestEntryBuilders.add(new RequestEntry.Builder() - .setRequest(createRequest(client, serializedDatum.getDatum())) + .setRequest(requestInfo.request()) + .setRequestBodySize(requestInfo.bodySize()) + .setRequestBodyEncodedSize(requestInfo.encodedSize()) .setPopulationSize(serializedDatum.getPopulationSize())); } return requestEntryBuilders; @@ -223,6 +252,7 @@ protected HttpPostSink(final Builder builder) { _maximumDelay = builder._maximumDelay; _acceptedStatusCodes = builder._acceptedStatusCodes; _retryableStatusCodes = builder._retryableStatusCodes; + _enableCompression = builder._enableCompression; } /** @@ -250,6 +280,7 @@ protected void start(final Builder builder) { private final Duration _maximumDelay; private final ImmutableSet _acceptedStatusCodes; private final ImmutableSet _retryableStatusCodes; + private final boolean _enableCompression; private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class); private static final AsyncHttpClient CLIENT; @@ -407,6 +438,18 @@ public B setRetryableStatusCodes(final ImmutableSet value) { return self(); } + /** + * Sets whether to enable request compression. + * Optional. Defaults to true. + * + * @param value true to enable compression + * @return This instance of {@link Builder}. + */ + public B setEnableCompression(final Boolean value) { + _enableCompression = value; + return self(); + } + /** * Protected constructor for subclasses. * @@ -444,6 +487,8 @@ protected Builder(final Function targetConstructor) { private ImmutableSet _acceptedStatusCodes = DEFAULT_ACCEPTED_STATUS_CODES; @NotNull private ImmutableSet _retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES; + @NotNull + private Boolean _enableCompression = false; private static final ImmutableSet DEFAULT_ACCEPTED_STATUS_CODES; private static final ImmutableSet DEFAULT_RETRYABLE_STATUS_CODES; @@ -486,6 +531,19 @@ public boolean isSatisfied( } } + /** + * Represents metadata for a request in the HttpPostSink. This record encapsulates a request + * alongside its associated sizes, including the body size and the encoded size. + * + * The metadata is used to track details about the request and its payload, such as the size of + * the data being transmitted to provide diagnostic or operational insights. + * + * @param request The {@link Request} object that represents the HTTP request to be sent. + * @param bodySize The size of the request body in bytes. + * @param encodedSize The size of the encoded request body in bytes. + */ + record RequestInfo(Request request, long bodySize, long encodedSize) { } + static final class SerializedDatum { SerializedDatum(final byte[] datum, final Optional populationSize) { _datum = datum; diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java index d2fe1c94..43ff232a 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java @@ -113,6 +113,8 @@ public HttpSinkActor( } _evictedRequestsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/evicted_requests"; _requestLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_latency"; + _requestBodySizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_size"; + _requestBodyEncodedSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_encoded_size"; _inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time"; _pendingRequestsQueueSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_size"; _inflightRequestsCountName = "sinks/http_post/" + _sink.getMetricSafeName() + "/inflight_count"; @@ -121,6 +123,7 @@ public HttpSinkActor( _httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts"; _samplesSentName = "sinks/http_post/" + sink.getMetricSafeName() + "/samples_sent"; _samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped"; + } @Override @@ -499,6 +502,14 @@ private void fireRequest(final RequestEntry request, final int attempt) { } }); Patterns.pipe(responsePromise, context().dispatcher()).to(self()); + _periodicMetrics.recordCounter( + _requestBodySizeName, + request.getRequestBodySize() + ); + _periodicMetrics.recordCounter( + _requestBodyEncodedSizeName, + request.getRequestBodyEncodedSize() + ); _client.executeRequest(request.getRequest(), new ResponseAsyncCompletionHandler(promise)); } @@ -547,6 +558,8 @@ private void scheduleRetry(final RequestEntry requestEntry, final int attempt) { private final String _httpSinkAttemptsName; private final String _samplesSentName; private final String _samplesDroppedName; + private final String _requestBodySizeName; + private final String _requestBodyEncodedSizeName; private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkActor.class); private static final Logger POST_RETRY_LOGGER = LoggerFactory.getRateLimitLogger(HttpSinkActor.class, Duration.ofSeconds(30)); diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/InfluxDbSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/InfluxDbSink.java index 5e50be24..fbdba8cb 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/InfluxDbSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/InfluxDbSink.java @@ -44,13 +44,14 @@ public final class InfluxDbSink extends HttpPostSink { @Override - protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) { - return new RequestBuilder() + protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) { + final Request request = new RequestBuilder() .setUri(getAysncHttpClientUri()) .setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8") .setBody(serializedData) .setMethod(HttpMethods.POST.value()) .build(); + return new RequestInfo(request, serializedData.length, serializedData.length); } @Override diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java index 33711163..05584739 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java @@ -137,13 +137,14 @@ protected Collection serialize(final PeriodicData periodicData) } @Override - protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) { - return new RequestBuilder() + protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) { + final Request request = new RequestBuilder() .setUri(getAysncHttpClientUri()) .setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString()) .setBody(serializedData) .setMethod(HttpMethods.POST.value()) .build(); + return new RequestInfo(request, serializedData.length, serializedData.length); } private int serializeCondition( diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/MonitordSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/MonitordSink.java index 1b2a94a4..7ff61e15 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/MonitordSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/MonitordSink.java @@ -146,13 +146,14 @@ protected Collection serialize(final PeriodicData periodicData) } @Override - protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) { - return new RequestBuilder() + protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) { + final Request request = new RequestBuilder() .setUri(getAysncHttpClientUri()) .setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString()) .setBody(serializedData) .setMethod(HttpMethods.POST.value()) .build(); + return new RequestInfo(request, serializedData.length, serializedData.length); } private Multimap prepareConditions(final Collection conditions) { diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/SignalFxSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/SignalFxSink.java index c0f72193..f6cde278 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/SignalFxSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/SignalFxSink.java @@ -99,8 +99,8 @@ protected Collection serialize(final PeriodicData periodicData) } @Override - protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) { - return new RequestBuilder() + protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) { + final Request request = new RequestBuilder() .setUri(getAysncHttpClientUri()) .setBody(serializedData) .setMethod(HttpMethods.POST.value()) @@ -110,6 +110,7 @@ protected Request createRequest(final AsyncHttpClient client, final byte[] seria .setHeader("X-SF-TOKEN", _apiToken) .addQueryParam("orgid", _organizationId) .build(); + return new RequestInfo(request, serializedData.length, serializedData.length); } private SignalFxProtocolBuffers.DataPoint createDataPoint( From 5c46f005e8b51fba741fed8e44e6ae30e84efc33 Mon Sep 17 00:00:00 2001 From: Brandon Arp Date: Thu, 7 Aug 2025 21:04:10 -0700 Subject: [PATCH 2/3] PR feedback --- .../arpnetworking/tsdcore/model/RequestEntry.java | 8 ++++---- .../arpnetworking/tsdcore/sinks/HttpPostSink.java | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java b/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java index c9935fe7..70574c92 100644 --- a/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java +++ b/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java @@ -112,7 +112,7 @@ public Builder setPopulationSize(final Optional value) { } /** - * Set the request body size. Cannot be null. + * Set the request body size. Required. Cannot be null. * * @param value The request body size. * @return This {@link Builder} instance. @@ -123,7 +123,7 @@ public Builder setRequestBodySize(final Long value) { } /** - * Set the request body encoded size. Cannot be null. + * Set the request body encoded size. Required. Cannot be null. * * @param value The request body encoded size. * @return This {@link Builder} instance. @@ -140,9 +140,9 @@ public Builder setRequestBodyEncodedSize(final Long value) { @NotNull private Optional _populationSize = Optional.empty(); @NotNull - private Long _requestBodySize = null; + private Long _requestBodySize; @NotNull - private Long _requestBodyEncodedSize = null; + private Long _requestBodyEncodedSize; } } diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java index 7e7b362b..63b4a0a0 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java @@ -111,11 +111,11 @@ public Object toLogValue() { protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) { final byte[] bodyData; if (_enableCompression) { - try { - final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - final GZIPOutputStream gzipStream = new GZIPOutputStream(bos); + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + GZIPOutputStream gzipStream = new GZIPOutputStream(bos)) { + gzipStream.write(serializedData); - gzipStream.close(); + gzipStream.flush(); bodyData = bos.toByteArray(); } catch (final IOException e) { throw new RuntimeException(e); @@ -440,7 +440,7 @@ public B setRetryableStatusCodes(final ImmutableSet value) { /** * Sets whether to enable request compression. - * Optional. Defaults to true. + * Optional. Defaults to false. * * @param value true to enable compression * @return This instance of {@link Builder}. @@ -453,7 +453,7 @@ public B setEnableCompression(final Boolean value) { /** * Protected constructor for subclasses. * - * @param targetConstructor The constructor for the concrete type to be created by this builder. + * @param targetConstructor The constructor for this builder to create the concrete type. */ protected Builder(final Function targetConstructor) { super(targetConstructor); @@ -542,7 +542,7 @@ public boolean isSatisfied( * @param bodySize The size of the request body in bytes. * @param encodedSize The size of the encoded request body in bytes. */ - record RequestInfo(Request request, long bodySize, long encodedSize) { } + protected record RequestInfo(Request request, long bodySize, long encodedSize) { } static final class SerializedDatum { SerializedDatum(final byte[] datum, final Optional populationSize) { From 50003814c22178d822cc980311d73a5fd4209562 Mon Sep 17 00:00:00 2001 From: Brandon Arp Date: Thu, 7 Aug 2025 21:14:18 -0700 Subject: [PATCH 3/3] update naming --- .../tsdcore/model/RequestEntry.java | 36 +++++++++---------- .../tsdcore/sinks/HttpPostSink.java | 4 +-- .../tsdcore/sinks/HttpSinkActor.java | 16 ++++----- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java b/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java index 70574c92..5580bf19 100644 --- a/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java +++ b/src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java @@ -40,27 +40,27 @@ public Optional getPopulationSize() { return _populationSize; } - public Long getRequestBodySize() { - return _requestBodySize; + public Long getRequestBodyBytes() { + return _requestBodyBytes; } - public Long getRequestBodyEncodedSize() { - return _requestBodyEncodedSize; + public Long getRequestBodyCompressedBytes() { + return _requestBodyCompressedBytes; } private RequestEntry(final Builder builder) { _request = builder._request; _enterTime = builder._enterTime; _populationSize = builder._populationSize; - _requestBodySize = builder._requestBodySize; - _requestBodyEncodedSize = builder._requestBodyEncodedSize; + _requestBodyBytes = builder._requestBodyBytes; + _requestBodyCompressedBytes = builder._requestBodyCompressedBytes; } private final Request _request; private final Instant _enterTime; private final Optional _populationSize; - private final Long _requestBodySize; - private final Long _requestBodyEncodedSize; + private final Long _requestBodyBytes; + private final Long _requestBodyCompressedBytes; /** * {@link com.arpnetworking.commons.builder.Builder} implementation for @@ -112,24 +112,24 @@ public Builder setPopulationSize(final Optional value) { } /** - * Set the request body size. Required. Cannot be null. + * Set the request body size in bytes. Required. Cannot be null. * - * @param value The request body size. + * @param value The request body size in bytes. * @return This {@link Builder} instance. */ - public Builder setRequestBodySize(final Long value) { - _requestBodySize = value; + public Builder setRequestBodyBytes(final Long value) { + _requestBodyBytes = value; return this; } /** - * Set the request body encoded size. Required. Cannot be null. + * Set the request body compressed size in bytes. Required. Cannot be null. * - * @param value The request body encoded size. + * @param value The request body compressed size in bytes. * @return This {@link Builder} instance. */ - public Builder setRequestBodyEncodedSize(final Long value) { - _requestBodyEncodedSize = value; + public Builder setRequestBodyCompressedBytes(final Long value) { + _requestBodyCompressedBytes = value; return this; } @@ -140,9 +140,9 @@ public Builder setRequestBodyEncodedSize(final Long value) { @NotNull private Optional _populationSize = Optional.empty(); @NotNull - private Long _requestBodySize; + private Long _requestBodyBytes; @NotNull - private Long _requestBodyEncodedSize; + private Long _requestBodyCompressedBytes; } } diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java index 63b4a0a0..d4d80de7 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java @@ -159,8 +159,8 @@ protected Collection createRequests( final RequestInfo requestInfo = createRequest(client, datum); requestEntryBuilders.add(new RequestEntry.Builder() .setRequest(requestInfo.request()) - .setRequestBodySize(requestInfo.bodySize()) - .setRequestBodyEncodedSize(requestInfo.encodedSize()) + .setRequestBodyBytes(requestInfo.bodySize()) + .setRequestBodyCompressedBytes(requestInfo.encodedSize()) .setPopulationSize(serializedDatum.getPopulationSize())); } return requestEntryBuilders; diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java index 43ff232a..45e77d55 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java @@ -113,8 +113,8 @@ public HttpSinkActor( } _evictedRequestsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/evicted_requests"; _requestLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_latency"; - _requestBodySizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_size"; - _requestBodyEncodedSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_encoded_size"; + _requestBodyBytesName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_bytes"; + _requestBodyCompressedBytesName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_compressed_bytes"; _inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time"; _pendingRequestsQueueSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_size"; _inflightRequestsCountName = "sinks/http_post/" + _sink.getMetricSafeName() + "/inflight_count"; @@ -503,12 +503,12 @@ private void fireRequest(final RequestEntry request, final int attempt) { }); Patterns.pipe(responsePromise, context().dispatcher()).to(self()); _periodicMetrics.recordCounter( - _requestBodySizeName, - request.getRequestBodySize() + _requestBodyBytesName, + request.getRequestBodyBytes() ); _periodicMetrics.recordCounter( - _requestBodyEncodedSizeName, - request.getRequestBodyEncodedSize() + _requestBodyCompressedBytesName, + request.getRequestBodyCompressedBytes() ); _client.executeRequest(request.getRequest(), new ResponseAsyncCompletionHandler(promise)); } @@ -558,8 +558,8 @@ private void scheduleRetry(final RequestEntry requestEntry, final int attempt) { private final String _httpSinkAttemptsName; private final String _samplesSentName; private final String _samplesDroppedName; - private final String _requestBodySizeName; - private final String _requestBodyEncodedSizeName; + private final String _requestBodyBytesName; + private final String _requestBodyCompressedBytesName; private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkActor.class); private static final Logger POST_RETRY_LOGGER = LoggerFactory.getRateLimitLogger(HttpSinkActor.class, Duration.ofSeconds(30));