Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
<mockito.version>5.18.0</mockito.version>
<netty.version>4.0.56.Final</netty.version>
<netty.all.version>4.0.21.Final</netty.all.version>
<netty.codec.http.version>4.1.119.Final</netty.codec.http.version>
<ning.http.client.version>1.9.31</ning.http.client.version>
<oval.version>3.2.1</oval.version>
<play.ws.version>3.0.7</play.ws.version>
Expand Down Expand Up @@ -866,6 +867,11 @@
<artifactId>async-http-client</artifactId>
<version>${asynchttpclient.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.codec.http.version}</version>
</dependency>
<dependency>
<groupId>org.playframework</groupId>
<artifactId>play-ahc-ws-standalone_${scala.version}</artifactId>
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/com/arpnetworking/tsdcore/model/RequestEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,27 @@ public Optional<Long> getPopulationSize() {
return _populationSize;
}

public Long getRequestBodyBytes() {
return _requestBodyBytes;
}

public Long getRequestBodyCompressedBytes() {
return _requestBodyCompressedBytes;
}

private RequestEntry(final Builder builder) {
_request = builder._request;
_enterTime = builder._enterTime;
_populationSize = builder._populationSize;
_requestBodyBytes = builder._requestBodyBytes;
_requestBodyCompressedBytes = builder._requestBodyCompressedBytes;
}

private final Request _request;
private final Instant _enterTime;
private final Optional<Long> _populationSize;
private final Long _requestBodyBytes;
private final Long _requestBodyCompressedBytes;

/**
* {@link com.arpnetworking.commons.builder.Builder} implementation for
Expand Down Expand Up @@ -99,12 +111,38 @@ public Builder setPopulationSize(final Optional<Long> value) {
return this;
}

/**
* Set the request body size in bytes. Required. Cannot be null.
*
* @param value The request body size in bytes.
* @return This {@link Builder} instance.
*/
public Builder setRequestBodyBytes(final Long value) {
_requestBodyBytes = value;
return this;
}

/**
* Set the request body compressed size in bytes. Required. Cannot be null.
*
* @param value The request body compressed size in bytes.
* @return This {@link Builder} instance.
*/
public Builder setRequestBodyCompressedBytes(final Long value) {
_requestBodyCompressedBytes = value;
return this;
}

@NotNull
private Request _request;
@NotNull
private Instant _enterTime;
@NotNull
private Optional<Long> _populationSize = Optional.empty();
@NotNull
private Long _requestBodyBytes;
@NotNull
private Long _requestBodyCompressedBytes;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,15 @@ private static List<String> createTags(final PeriodicData periodicData, final Ag
}

@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.setHeader("Content-Type", "application/json")
.addQueryParam("api_key", _apiKey)
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

private DataDogSink(final Builder builder) {
Expand Down
78 changes: 68 additions & 10 deletions src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import net.sf.oval.Validator;
import net.sf.oval.constraint.CheckWith;
import net.sf.oval.constraint.CheckWithCheck;
Expand All @@ -33,7 +35,6 @@
import net.sf.oval.context.OValContext;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.http.javadsl.model.HttpMethods;
import org.apache.pekko.http.javadsl.model.MediaTypes;
import org.apache.pekko.http.javadsl.model.StatusCodes;
import org.apache.pekko.pattern.Patterns;
Expand All @@ -44,14 +45,18 @@
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.uri.Uri;
import org.asynchttpclient.util.HttpConstants;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serial;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.zip.GZIPOutputStream;

/**
* Publishes to an HTTP endpoint. This class is thread safe.
Expand Down Expand Up @@ -103,13 +108,33 @@
* @param serializedData The serialized data.
* @return {@link Request} to execute
*/
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
.setUri(_aysncHttpClientUri)
.setHeader("Content-Type", MediaTypes.APPLICATION_JSON.toString())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final byte[] bodyData;
if (_enableCompression) {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gzipStream = new GZIPOutputStream(bos)) {

gzipStream.write(serializedData);
gzipStream.flush();

Check warning on line 118 in src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java

View check run for this annotation

Jenkins - ArpNetworking / Code Coverage

Not covered lines

Lines 114-118 are not covered by tests
bodyData = bos.toByteArray();
} catch (final IOException e) {
throw new RuntimeException(e);
}
} else {
bodyData = serializedData;
}

final RequestBuilder requestBuilder = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader(HttpHeaderNames.CONTENT_TYPE, MediaTypes.APPLICATION_JSON.toString())
.setBody(bodyData)
.setMethod(HttpConstants.Methods.POST);

if (_enableCompression) {
requestBuilder.setHeader(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
}

return new RequestInfo(requestBuilder.build(), serializedData.length, bodyData.length);
}

/**
Expand All @@ -130,8 +155,12 @@
// Unfortunately, the split builder logic across HttpPostSink and
// HttpSinkActor does not permit this as-is. The logic would need
// to be refactored to permit the use of a TLB.
final byte[] datum = serializedDatum.getDatum();
final RequestInfo requestInfo = createRequest(client, datum);
requestEntryBuilders.add(new RequestEntry.Builder()
.setRequest(createRequest(client, serializedDatum.getDatum()))
.setRequest(requestInfo.request())
.setRequestBodyBytes(requestInfo.bodySize())
.setRequestBodyCompressedBytes(requestInfo.encodedSize())
.setPopulationSize(serializedDatum.getPopulationSize()));
}
return requestEntryBuilders;
Expand Down Expand Up @@ -223,6 +252,7 @@
_maximumDelay = builder._maximumDelay;
_acceptedStatusCodes = builder._acceptedStatusCodes;
_retryableStatusCodes = builder._retryableStatusCodes;
_enableCompression = builder._enableCompression;
}

/**
Expand Down Expand Up @@ -250,6 +280,7 @@
private final Duration _maximumDelay;
private final ImmutableSet<Integer> _acceptedStatusCodes;
private final ImmutableSet<Integer> _retryableStatusCodes;
private final boolean _enableCompression;

private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
private static final AsyncHttpClient CLIENT;
Expand Down Expand Up @@ -407,10 +438,22 @@
return self();
}

/**
* Sets whether to enable request compression.
* Optional. Defaults to false.
*
* @param value true to enable compression
* @return This instance of {@link Builder}.
*/
public B setEnableCompression(final Boolean value) {
_enableCompression = value;
return self();
}

/**
* Protected constructor for subclasses.
*
* @param targetConstructor The constructor for the concrete type to be created by this builder.
* @param targetConstructor The constructor for this builder to create the concrete type.
*/
protected Builder(final Function<B, S> targetConstructor) {
super(targetConstructor);
Expand Down Expand Up @@ -444,6 +487,8 @@
private ImmutableSet<Integer> _acceptedStatusCodes = DEFAULT_ACCEPTED_STATUS_CODES;
@NotNull
private ImmutableSet<Integer> _retryableStatusCodes = DEFAULT_RETRYABLE_STATUS_CODES;
@NotNull
private Boolean _enableCompression = false;

private static final ImmutableSet<Integer> DEFAULT_ACCEPTED_STATUS_CODES;
private static final ImmutableSet<Integer> DEFAULT_RETRYABLE_STATUS_CODES;
Expand Down Expand Up @@ -486,6 +531,19 @@
}
}

/**
* Represents metadata for a request in the HttpPostSink. This record encapsulates a request
* alongside its associated sizes, including the body size and the encoded size.
*
* The metadata is used to track details about the request and its payload, such as the size of
* the data being transmitted to provide diagnostic or operational insights.
*
* @param request The {@link Request} object that represents the HTTP request to be sent.
* @param bodySize The size of the request body in bytes.
* @param encodedSize The size of the encoded request body in bytes.
*/
protected record RequestInfo(Request request, long bodySize, long encodedSize) { }

static final class SerializedDatum {
SerializedDatum(final byte[] datum, final Optional<Long> populationSize) {
_datum = datum;
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public HttpSinkActor(
}
_evictedRequestsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/evicted_requests";
_requestLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_latency";
_requestBodyBytesName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_bytes";
_requestBodyCompressedBytesName = "sinks/http_post/" + _sink.getMetricSafeName() + "/request_body_compressed_bytes";
_inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time";
_pendingRequestsQueueSizeName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_size";
_inflightRequestsCountName = "sinks/http_post/" + _sink.getMetricSafeName() + "/inflight_count";
Expand All @@ -121,6 +123,7 @@ public HttpSinkActor(
_httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts";
_samplesSentName = "sinks/http_post/" + sink.getMetricSafeName() + "/samples_sent";
_samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped";

}

@Override
Expand Down Expand Up @@ -499,6 +502,14 @@ private void fireRequest(final RequestEntry request, final int attempt) {
}
});
Patterns.pipe(responsePromise, context().dispatcher()).to(self());
_periodicMetrics.recordCounter(
_requestBodyBytesName,
request.getRequestBodyBytes()
);
_periodicMetrics.recordCounter(
_requestBodyCompressedBytesName,
request.getRequestBodyCompressedBytes()
);
_client.executeRequest(request.getRequest(), new ResponseAsyncCompletionHandler(promise));
}

Expand Down Expand Up @@ -547,6 +558,8 @@ private void scheduleRetry(final RequestEntry requestEntry, final int attempt) {
private final String _httpSinkAttemptsName;
private final String _samplesSentName;
private final String _samplesDroppedName;
private final String _requestBodyBytesName;
private final String _requestBodyCompressedBytesName;

private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkActor.class);
private static final Logger POST_RETRY_LOGGER = LoggerFactory.getRateLimitLogger(HttpSinkActor.class, Duration.ofSeconds(30));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public final class InfluxDbSink extends HttpPostSink {


@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/arpnetworking/tsdcore/sinks/KMonDSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,14 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
}

@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

private int serializeCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,14 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
}

@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setHeader("Content-Type", MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toString())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

private Multimap<String, Condition> prepareConditions(final Collection<Condition> conditions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ protected Collection<SerializedDatum> serialize(final PeriodicData periodicData)
}

@Override
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
return new RequestBuilder()
protected RequestInfo createRequest(final AsyncHttpClient client, final byte[] serializedData) {
final Request request = new RequestBuilder()
.setUri(getAysncHttpClientUri())
.setBody(serializedData)
.setMethod(HttpMethods.POST.value())
Expand All @@ -110,6 +110,7 @@ protected Request createRequest(final AsyncHttpClient client, final byte[] seria
.setHeader("X-SF-TOKEN", _apiToken)
.addQueryParam("orgid", _organizationId)
.build();
return new RequestInfo(request, serializedData.length, serializedData.length);
}

private SignalFxProtocolBuffers.DataPoint createDataPoint(
Expand Down