Skip to content

Commit 703f2e8

Browse files
authored
Configurable accept http status codes. (#229)
1 parent e8514ed commit 703f2e8

File tree

3 files changed

+44
-15
lines changed

3 files changed

+44
-15
lines changed

config/pipelines/pipeline.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ sinks=[
214214
sink={
215215
type="com.arpnetworking.tsdcore.sinks.AggregationServerHttpSink"
216216
name="cluster_http_sink"
217+
#acceptedStatusCodes=[200, 201, 202, 204]
217218
uri="http://"${cluster_aggregator_host}":"${cluster_aggregator_port}"/metrics/v1/data/persist"
218219
}
219220
}

src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import akka.actor.PoisonPill;
2121
import akka.http.javadsl.model.HttpMethods;
2222
import akka.http.javadsl.model.MediaTypes;
23+
import akka.http.javadsl.model.StatusCodes;
2324
import com.arpnetworking.logback.annotations.LogValue;
2425
import com.arpnetworking.metrics.MetricsFactory;
2526
import com.arpnetworking.steno.LogValueMapFactory;
@@ -28,6 +29,7 @@
2829
import com.arpnetworking.tsdcore.model.PeriodicData;
2930
import com.arpnetworking.tsdcore.model.RequestEntry;
3031
import com.fasterxml.jackson.annotation.JacksonInject;
32+
import com.google.common.collect.ImmutableSet;
3133
import com.google.common.collect.Lists;
3234
import net.sf.oval.constraint.Min;
3335
import net.sf.oval.constraint.NotNull;
@@ -145,6 +147,15 @@ protected Uri getAysncHttpClientUri() {
145147
return _aysncHttpClientUri;
146148
}
147149

150+
/**
151+
* Accessor for the status codes accepted as successful.
152+
*
153+
* @return the status codes accepted as success
154+
*/
155+
ImmutableSet<Integer> getAcceptedStatusCodes() {
156+
return _acceptedStatusCodes;
157+
}
158+
148159
/**
149160
* Serialize the {@link PeriodicData} instances for posting.
150161
*
@@ -161,6 +172,7 @@ protected Uri getAysncHttpClientUri() {
161172
protected HttpPostSink(final Builder<?, ?> builder) {
162173
super(builder);
163174
_uri = builder._uri;
175+
_acceptedStatusCodes = builder._acceptedStatusCodes;
164176
_aysncHttpClientUri = Uri.create(_uri.toString());
165177

166178
_sinkActor = builder._actorSystem.actorOf(
@@ -176,6 +188,7 @@ protected HttpPostSink(final Builder<?, ?> builder) {
176188
private final URI _uri;
177189
private final Uri _aysncHttpClientUri;
178190
private final ActorRef _sinkActor;
191+
private final ImmutableSet<Integer> _acceptedStatusCodes;
179192

180193
private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
181194
private static final AsyncHttpClient CLIENT;
@@ -266,6 +279,18 @@ public B setMetricsFactory(final MetricsFactory value) {
266279
return self();
267280
}
268281

282+
/**
283+
* Sets the http status codes accepted as success. Optional. Cannot be null.
284+
* Default is: [200, 201, 202, 204]
285+
*
286+
* @param value the status codes accepted as success
287+
* @return this builder
288+
*/
289+
public B setAcceptedStatusCodes(final ImmutableSet<Integer> value) {
290+
_acceptedStatusCodes = value;
291+
return self();
292+
}
293+
269294
/**
270295
* Protected constructor for subclasses.
271296
*
@@ -291,6 +316,19 @@ protected Builder(final Function<B, S> targetConstructor) {
291316
@JacksonInject
292317
@NotNull
293318
private MetricsFactory _metricsFactory;
319+
@NotNull
320+
private ImmutableSet<Integer> _acceptedStatusCodes = DEFAULT_ACCEPTED_STATUS_CODES;
321+
322+
private static final ImmutableSet<Integer> DEFAULT_ACCEPTED_STATUS_CODES;
323+
324+
static {
325+
DEFAULT_ACCEPTED_STATUS_CODES = ImmutableSet.<Integer>builder()
326+
.add(StatusCodes.OK.intValue())
327+
.add(StatusCodes.CREATED.intValue())
328+
.add(StatusCodes.ACCEPTED.intValue())
329+
.add(StatusCodes.NO_CONTENT.intValue())
330+
.build();
331+
}
294332
}
295333

296334
static final class SerializedDatum {

src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSinkActor.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import akka.actor.AbstractActor;
1919
import akka.actor.Props;
20-
import akka.http.javadsl.model.StatusCodes;
2120
import akka.pattern.PatternsCS;
2221
import com.arpnetworking.logback.annotations.LogValue;
2322
import com.arpnetworking.metrics.Metrics;
@@ -31,7 +30,7 @@
3130
import com.google.common.base.Charsets;
3231
import com.google.common.collect.EvictingQueue;
3332
import com.google.common.collect.ImmutableList;
34-
import com.google.common.collect.Sets;
33+
import com.google.common.collect.ImmutableSet;
3534
import org.asynchttpclient.AsyncCompletionHandler;
3635
import org.asynchttpclient.AsyncHttpClient;
3736
import org.asynchttpclient.Request;
@@ -44,7 +43,6 @@
4443
import java.util.Objects;
4544
import java.util.Optional;
4645
import java.util.Random;
47-
import java.util.Set;
4846
import java.util.concurrent.CompletableFuture;
4947
import java.util.concurrent.CompletionStage;
5048
import java.util.concurrent.TimeUnit;
@@ -95,6 +93,7 @@ public HttpPostSinkActor(
9593
final MetricsFactory metricsFactory) {
9694
_client = client;
9795
_sink = sink;
96+
_acceptedStatusCodes = sink.getAcceptedStatusCodes();
9897
_maximumConcurrency = maximumConcurrency;
9998
_pendingRequests = EvictingQueue.create(maximumQueueSize);
10099
if (Objects.equals(Duration.ZERO, spreadPeriod)) {
@@ -121,6 +120,7 @@ public HttpPostSinkActor(
121120
public Object toLogValue() {
122121
return LogValueMapFactory.builder(this)
123122
.put("sink", _sink)
123+
.put("acceptedStatusCodes", _acceptedStatusCodes)
124124
.put("maximumConcurrency", _maximumConcurrency)
125125
.put("spreadingDelayMillis", _spreadingDelayMillis)
126126
.put("waiting", _waiting)
@@ -293,7 +293,7 @@ private void fireNextRequest() {
293293
String.format("%s/%dxx", _responseStatusName, i),
294294
responseStatusClass == i ? 1 : 0);
295295
}
296-
if (ACCEPTED_STATUS_CODES.contains(responseStatusCode)) {
296+
if (_acceptedStatusCodes.contains(responseStatusCode)) {
297297
returnValue = new PostSuccess(result);
298298
metrics.incrementCounter(_samplesSentName, requestEntry.getPopulationSize());
299299
} else {
@@ -330,6 +330,7 @@ public void postStop() throws Exception {
330330
private final HttpPostSink _sink;
331331
private final int _spreadingDelayMillis;
332332
private final MetricsFactory _metricsFactory;
333+
private final ImmutableSet<Integer> _acceptedStatusCodes;
333334

334335
private final String _evictedRequestsName;
335336
private final String _requestLatencyName;
@@ -339,22 +340,11 @@ public void postStop() throws Exception {
339340
private final String _samplesDroppedName;
340341
private final String _samplesSentName;
341342

342-
343343
private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
344344
private static final Logger POST_ERROR_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30));
345345
private static final Logger EVICTED_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30));
346-
private static final Set<Integer> ACCEPTED_STATUS_CODES = Sets.newHashSet();
347346
private static final com.google.common.collect.ImmutableList<Integer> STATUS_CLASSES = ImmutableList.of(2, 3, 4, 5);
348347

349-
350-
static {
351-
// TODO(vkoskela): Make accepted status codes configurable [AINT-682]
352-
ACCEPTED_STATUS_CODES.add(StatusCodes.OK.intValue());
353-
ACCEPTED_STATUS_CODES.add(StatusCodes.CREATED.intValue());
354-
ACCEPTED_STATUS_CODES.add(StatusCodes.ACCEPTED.intValue());
355-
ACCEPTED_STATUS_CODES.add(StatusCodes.NO_CONTENT.intValue());
356-
}
357-
358348
/**
359349
* Message class to wrap a list of {@link AggregatedData}.
360350
*/

0 commit comments

Comments
 (0)