Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
<mockito.version>5.18.0</mockito.version>
<netty.version>4.0.56.Final</netty.version>
<netty.all.version>4.0.21.Final</netty.all.version>
<netty.codec.http.version>4.1.119.Final</netty.codec.http.version>
<ning.http.client.version>1.9.31</ning.http.client.version>
<oval.version>3.2.1</oval.version>
<play.ws.version>3.0.7</play.ws.version>
Expand Down Expand Up @@ -866,6 +867,11 @@
<artifactId>async-http-client</artifactId>
<version>${asynchttpclient.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.codec.http.version}</version>
</dependency>
<dependency>
<groupId>org.playframework</groupId>
<artifactId>play-ahc-ws-standalone_${scala.version}</artifactId>
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,27 @@ public Optional<Long> getPopulationSize() {
return _populationSize;
}

public Long getRequestBodySize() {
return _requestBodySize;
}

public Long getRequestBodyEncodedSize() {
return _requestBodyEncodedSize;
}

private RequestEntry(final Builder builder) {
_request = builder._request;
_enterTime = builder._enterTime;
_populationSize = builder._populationSize;
_requestBodySize = builder._requestBodySize;
_requestBodyEncodedSize = builder._requestBodyEncodedSize;
}

private final Request _request;
private final Instant _enterTime;
private final Optional<Long> _populationSize;
private final Long _requestBodySize;
private final Long _requestBodyEncodedSize;

/**
* {@link com.arpnetworking.commons.builder.Builder} implementation for
Expand Down Expand Up @@ -99,12 +111,38 @@ public Builder setPopulationSize(final Optional<Long> value) {
return this;
}

/**
* Set the request body size. Cannot be null.
*
* @param value The request body size.
* @return This {@link Builder} instance.
*/
public Builder setRequestBodySize(final Long value) {
_requestBodySize = value;
return this;
}

/**
* Set the request body encoded size. Cannot be null.
*
* @param value The request body encoded size.
* @return This {@link Builder} instance.
*/
public Builder setRequestBodyEncodedSize(final Long value) {
_requestBodyEncodedSize = value;
return this;
}

@NotNull
private Request _request;
@NotNull
private Instant _enterTime;
@NotNull
private Optional<Long> _populationSize = Optional.empty();
@NotNull
private Long _requestBodySize = null;
@NotNull
private Long _requestBodyEncodedSize = null;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ private static List<String> createTags(final PeriodicData periodicData, final Ag
}

@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.setHeader("Content-Type", "application/json")
.addQueryParam("api_key", _apiKey)
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

private DataDogSink(final Builder builder) {
Expand Down
76 changes: 67 additions & 9 deletions src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import net.sf.oval.Validator;
import net.sf.oval.constraint.CheckWith;
import net.sf.oval.constraint.CheckWithCheck;
Expand All @@ -33,7 +35,6 @@
import net.sf.oval.context.OValContext;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.javadsl.model.HttpMethods;
import org.apache.pekko.http.javadsl.model.MediaTypes;
import org.apache.pekko.http.javadsl.model.StatusCodes;
import org.apache.pekko.pattern.Patterns;
Expand All @@ -44,14 +45,18 @@
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.uri.Uri;
import org.asynchttpclient.util.HttpConstants;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serial;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.zip.GZIPOutputStream;

/**
* Publishes to an HTTP endpoint. This class is thread safe.
Expand Down Expand Up @@ -103,13 +108,33 @@ public Object toLogValue() {
* @param serializedData The serialized data.
* @return {@link Request} to execute
*/
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
.setUri(_aysncHttpClientUri)
.setHeader("Content-Type", MediaTypes.APPLICATION_JSON.toString())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final byte[] bodyData;
if (_enableCompression) {
try {
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
final GZIPOutputStream gzipStream = new GZIPOutputStream(bos);
gzipStream.write(serializedData);
gzipStream.close();
bodyData = bos.toByteArray();
} catch (final IOException e) {
throw new RuntimeException(e);
}
} else {
bodyData = serializedData;
}

final RequestBuilder requestBuilder = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader(HttpHeaderNames.CONTENT_TYPE, MediaTypes.APPLICATION_JSON.toString())
.setBody(bodyData)
.setMethod(HttpConstants.Methods.POST);

if (_enableCompression) {
requestBuilder.setHeader(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
}

return new RequestInfo(requestBuilder.build(), serializedData.length, bodyData.length);
}

/**
Expand All @@ -130,8 +155,12 @@ protected Collection<RequestEntry.Builder> createRequests(
// Unfortunately, the split builder logic across HttpPostSink and
// HttpSinkActor does not permit this as-is. The logic would need
// to be refactored to permit the use of a TLB.
final byte[] datum = serializedDatum.getDatum();
final RequestInfo requestInfo = createRequest(client, datum);
requestEntryBuilders.add(new RequestEntry.Builder()
.setRequest(createRequest(client, serializedDatum.getDatum()))
.setRequest(requestInfo.request())
.setRequestBodySize(requestInfo.bodySize())
.setRequestBodyEncodedSize(requestInfo.encodedSize())
.setPopulationSize(serializedDatum.getPopulationSize()));
}
return requestEntryBuilders;
Expand Down Expand Up @@ -223,6 +252,7 @@ protected HttpPostSink(final Builder<?, ?> builder) {
_maximumDelay = builder._maximumDelay;
_acceptedStatusCodes = builder._acceptedStatusCodes;
_retryableStatusCodes = builder._retryableStatusCodes;
_enableCompression = builder._enableCompression;
}

/**
Expand Down Expand Up @@ -250,6 +280,7 @@ protected void start(final Builder<?, ?> builder) {
private final Duration _maximumDelay;
private final ImmutableSet<Integer> _acceptedStatusCodes;
private final ImmutableSet<Integer> _retryableStatusCodes;
private final boolean _enableCompression;

private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
private static final AsyncHttpClient CLIENT;
Expand Down Expand Up @@ -407,6 +438,18 @@ public B setRetryableStatusCodes(final ImmutableSet<Integer> value) {
return self();
}

/**
* Sets whether to enable request compression.
* Optional. Defaults to true.
*
* @param value true to enable compression
* @return This instance of {@link Builder}.
*/
public B setEnableCompression(final Boolean value) {
_enableCompression = value;
return self();
}

/**
* Protected constructor for subclasses.
*
Expand Down Expand Up @@ -444,6 +487,8 @@ protected Builder(final Function<B, S> targetConstructor) {
private ImmutableSet<Integer> _acceptedStatusCodes = DEFAULT_ACCEPTED_STATUS_CODES;
@NotNull
private ImmutableSet<Integer> _retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES;
@NotNull
private Boolean _enableCompression = false;

private static final ImmutableSet<Integer> DEFAULT_ACCEPTED_STATUS_CODES;
private static final ImmutableSet<Integer> DEFAULT_RETRYABLE_STATUS_CODES;
Expand Down Expand Up @@ -486,6 +531,19 @@ public boolean isSatisfied(
}
}

/**
* Represents metadata for a request in the HttpPostSink. This record encapsulates a request
* alongside its associated sizes, including the body size and the encoded size.
*
* The metadata is used to track details about the request and its payload, such as the size of
* the data being transmitted to provide diagnostic or operational insights.
*
* @param request The {@link Request} object that represents the HTTP request to be sent.
* @param bodySize The size of the request body in bytes.
* @param encodedSize The size of the encoded request body in bytes.
*/
record RequestInfo(Request request, long bodySize, long encodedSize) { }

static final class SerializedDatum {
SerializedDatum(final byte[] datum, final Optional<Long> populationSize) {
_datum = datum;
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public HttpSinkActor(
}
_evictedRequestsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/evicted_requests";
_requestLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_latency";
_requestBodySizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_size";
_requestBodyEncodedSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_encoded_size";
_inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time";
_pendingRequestsQueueSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_size";
_inflightRequestsCountName = "sinks/http_post/" + _sink.getMetricSafeName() + "/inflight_count";
Expand All @@ -121,6 +123,7 @@ public HttpSinkActor(
_httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts";
_samplesSentName = "sinks/http_post/" + sink.getMetricSafeName() + "/samples_sent";
_samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped";

}

@Override
Expand Down Expand Up @@ -499,6 +502,14 @@ private void fireRequest(final RequestEntry request, final int attempt) {
}
});
Patterns.pipe(responsePromise, context().dispatcher()).to(self());
_periodicMetrics.recordCounter(
_requestBodySizeName,
request.getRequestBodySize()
);
_periodicMetrics.recordCounter(
_requestBodyEncodedSizeName,
request.getRequestBodyEncodedSize()
);
_client.executeRequest(request.getRequest(), new ResponseAsyncCompletionHandler(promise));
}

Expand Down Expand Up @@ -547,6 +558,8 @@ private void scheduleRetry(final RequestEntry requestEntry, final int attempt) {
private final String _httpSinkAttemptsName;
private final String _samplesSentName;
private final String _samplesDroppedName;
private final String _requestBodySizeName;
private final String _requestBodyEncodedSizeName;

private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkActor.class);
private static final Logger POST_RETRY_LOGGER = LoggerFactory.getRateLimitLogger(HttpSinkActor.class, Duration.ofSeconds(30));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public final class InfluxDbSink extends HttpPostSink {


@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,14 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
}

@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

private int serializeCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,14 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
}

@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

private Multimap<String, Condition> prepareConditions(final Collection<Condition> conditions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
}

@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
Expand All @@ -110,6 +110,7 @@ protected Request createRequest(final AsyncHttpClient client, final byte[] seria
.setHeader("X-SF-TOKEN", _apiToken)
.addQueryParam("orgid", _organizationId)
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

private SignalFxProtocolBuffers.DataPoint createDataPoint(
Expand Down