From 6ac4fc1cfdd3abeebcec1e7eded6db2342f4bd26 Mon Sep 17 00:00:00 2001 From: Qinyan Li Date: Fri, 26 Jun 2020 10:25:12 -0700 Subject: [PATCH 1/4] add retries to cagg httpSinkActor --- .../tsdcore/sinks/HttpPostSink.java | 17 ++++++- .../tsdcore/sinks/HttpSinkActor.java | 50 +++++++++++++++++-- .../tsdcore/sinks/KairosDbSinkTest.java | 8 +-- 3 files changed, 66 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java index 063832b1..7fbcb489 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java @@ -160,7 +160,8 @@ protected HttpPostSink(final Builder builder) { builder._maximumConcurrency, builder._maximumQueueSize, builder._spreadPeriod, - builder._metricsFactory)); + builder._metricsFactory, + builder._maxRetries)); } private final URI _uri; @@ -244,6 +245,18 @@ public B setSpreadPeriod(final Period value) { return self(); } + /** + * Sets the maximum number of retries of the http requests. Optional. Cannot be null. + * Default is 0. + * + * @param value the maximum number of retries of the http requests. + * @return this builder + */ + public B setMaxRetries(final Integer value) { + _maxRetries = value; + return self(); + } + /** * Sets the maximum pending queue size. Optional Cannot be null. * Default is 25000. Minimum is 1. @@ -281,5 +294,7 @@ protected Builder(final Function targetConstructor) { @JacksonInject @NotNull private MetricsFactory _metricsFactory; + @NotNull + private Integer _maxRetries = 0; } } diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java index 5fd3c0d7..dface013 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java @@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * Actor that sends HTTP requests via a Ning HTTP client. @@ -61,6 +62,7 @@ public class HttpSinkActor extends AbstractActor { * @param maximumQueueSize Maximum number of pending requests. * @param spreadPeriod Maximum time to delay sending new aggregates to spread load. * @param metricsFactory metrics factory to record metrics. + * @param maxRetries Maximum number of retries for the http requests. * @return A new Props */ public static Props props( @@ -69,8 +71,17 @@ public static Props props( final int maximumConcurrency, final int maximumQueueSize, final Period spreadPeriod, - final MetricsFactory metricsFactory) { - return Props.create(HttpSinkActor.class, client, sink, maximumConcurrency, maximumQueueSize, spreadPeriod, metricsFactory); + final MetricsFactory metricsFactory, + final int maxRetries) { + return Props.create( + HttpSinkActor.class, + client, + sink, + maximumConcurrency, + maximumQueueSize, + spreadPeriod, + metricsFactory, + maxRetries); } /** @@ -82,6 +93,7 @@ public static Props props( * @param maximumQueueSize Maximum number of pending requests. * @param spreadPeriod Maximum time to delay sending new aggregates to spread load. * @param metricsFactory metrics factory to record metrics. + * @param maxRetries Maximum number of retries for the http requests. */ public HttpSinkActor( final AsyncHttpClient client, @@ -89,12 +101,14 @@ public HttpSinkActor( final int maximumConcurrency, final int maximumQueueSize, final Period spreadPeriod, - final MetricsFactory metricsFactory) { + final MetricsFactory metricsFactory, + final int maxRetries) { _client = client; _sink = sink; _maximumConcurrency = maximumConcurrency; _pendingRequests = EvictingQueue.create(maximumQueueSize); _metricsFactory = metricsFactory; + _maxRetries = maxRetries; if (Period.ZERO.equals(spreadPeriod)) { _spreadingDelayMillis = 0; } else { @@ -106,6 +120,8 @@ public HttpSinkActor( _inQueueLatencyName = "sinks/http_post/" + _sink.getMetricSafeName() + "/queue_time"; _requestSuccessName = "sinks/http_post/" + _sink.getMetricSafeName() + "/success"; _responseStatusName = "sinks/http_post/" + _sink.getMetricSafeName() + "/status"; + _httpSinkAttemptsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/attempts"; + _samplesDroppedName = "sinks/http_post/" + _sink.getMetricSafeName() + "/samples_dropped"; } /** @@ -276,9 +292,9 @@ private void fireNextRequest() { final Request request = requestEntry.getRequest(); _inflightRequestsCount++; - final CompletableFuture promise = new CompletableFuture<>(); + final AtomicReference attempt = new AtomicReference<>(0); metrics.startTimer(_requestLatencyName); - _client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); + final CompletableFuture promise = sendHttpRequest(request, attempt); final CompletionStage responsePromise = promise .handle((result, err) -> { metrics.stopTimer(_requestLatencyName); @@ -295,9 +311,11 @@ private void fireNextRequest() { returnValue = new PostSuccess(result); } else { returnValue = new PostRejected(request, result); + metrics.incrementCounter(_samplesDroppedName); } } else { returnValue = new PostFailure(request, err); + metrics.incrementCounter(_samplesDroppedName); } metrics.incrementCounter(_requestSuccessName, (returnValue instanceof PostSuccess) ? 1 : 0); metrics.close(); @@ -306,6 +324,25 @@ private void fireNextRequest() { PatternsCS.pipe(responsePromise, context().dispatcher()).to(self()); } + private CompletableFuture sendHttpRequest( + final Request request, + final AtomicReference attempt) { + final CompletableFuture promise = new CompletableFuture<>(); + _client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); + promise.handle((result, err) -> { + if (err == null && ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) { + try (Metrics metrics = _metricsFactory.create()) { + metrics.incrementCounter(_httpSinkAttemptsName, attempt.get()); + } + return promise; + } else { + attempt.set(attempt.get() + 1); + return attempt.get() <= _maxRetries ? sendHttpRequest(request, attempt) : promise; + } + }); + return promise; + } + @Override public void postStop() throws Exception { super.postStop(); @@ -325,12 +362,15 @@ public void postStop() throws Exception { private final HttpPostSink _sink; private final int _spreadingDelayMillis; private final MetricsFactory _metricsFactory; + private final int _maxRetries; private final String _evictedRequestsName; private final String _requestLatencyName; private final String _inQueueLatencyName; private final String _requestSuccessName; private final String _responseStatusName; + private final String _httpSinkAttemptsName; + private final String _samplesDroppedName; private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class); private static final Logger EVICTED_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30)); diff --git a/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java b/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java index 9e23d82f..3cc88e6e 100644 --- a/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java +++ b/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java @@ -63,7 +63,8 @@ public void setUp() { .setName("kairosdb_sink_test") .setActorSystem(getSystem()) .setUri(URI.create("http://localhost:" + _wireMockServer.port() + PATH)) - .setMetricsFactory(_mockMetricsFactory); + .setMetricsFactory(_mockMetricsFactory) + .setMaxRetries(5); Mockito.doReturn(_mockMetrics).when(_mockMetricsFactory).create(); } @@ -134,7 +135,8 @@ public void testPost() throws InterruptedException, IOException { Assert.assertEquals(expected, actual); // Verify that metrics has been recorded. - Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create(); + Mockito.verify(_mockMetricsFactory, Mockito.times(2)).create(); + Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/attempts", 0); Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/success", 1); Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/2xx", 1); Mockito.verify(_mockMetrics, Mockito.times(1)).setTimer( @@ -143,7 +145,7 @@ public void testPost() throws InterruptedException, IOException { Mockito.any()); Mockito.verify(_mockMetrics, Mockito.times(1)).startTimer("sinks/http_post/kairosdb_sink_test/request_latency"); Mockito.verify(_mockMetrics, Mockito.times(1)).stopTimer("sinks/http_post/kairosdb_sink_test/request_latency"); - Mockito.verify(_mockMetrics, Mockito.times(1)).close(); + Mockito.verify(_mockMetrics, Mockito.times(2)).close(); } private KairosDbSink.Builder _kairosDbSinkBuilder; From 36013a99b7f21b0f39ce80d7cf7840ff4c36cb48 Mon Sep 17 00:00:00 2001 From: Qinyan Li Date: Fri, 26 Jun 2020 10:48:27 -0700 Subject: [PATCH 2/4] no need to use atomicReference for attempt, use int instead --- .../arpnetworking/tsdcore/sinks/HttpSinkActor.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java index dface013..a99bb400 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java @@ -45,7 +45,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; /** * Actor that sends HTTP requests via a Ning HTTP client. @@ -292,9 +291,8 @@ private void fireNextRequest() { final Request request = requestEntry.getRequest(); _inflightRequestsCount++; - final AtomicReference attempt = new AtomicReference<>(0); metrics.startTimer(_requestLatencyName); - final CompletableFuture promise = sendHttpRequest(request, attempt); + final CompletableFuture promise = sendHttpRequest(request, 0); final CompletionStage responsePromise = promise .handle((result, err) -> { metrics.stopTimer(_requestLatencyName); @@ -326,18 +324,17 @@ private void fireNextRequest() { private CompletableFuture sendHttpRequest( final Request request, - final AtomicReference attempt) { + final int attempt) { final CompletableFuture promise = new CompletableFuture<>(); _client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); promise.handle((result, err) -> { if (err == null && ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) { try (Metrics metrics = _metricsFactory.create()) { - metrics.incrementCounter(_httpSinkAttemptsName, attempt.get()); + metrics.incrementCounter(_httpSinkAttemptsName, attempt); } return promise; } else { - attempt.set(attempt.get() + 1); - return attempt.get() <= _maxRetries ? sendHttpRequest(request, attempt) : promise; + return attempt < _maxRetries ? sendHttpRequest(request, attempt + 1) : promise; } }); return promise; From cf2e798bb742d90048c56563a815c81161271d26 Mon Sep 17 00:00:00 2001 From: Qinyan Li Date: Sat, 27 Jun 2020 19:10:02 -0700 Subject: [PATCH 3/4] record attempt metric with other metrics and add post failure test --- .../tsdcore/sinks/HttpSinkActor.java | 51 ++++++++++++++----- .../tsdcore/sinks/KairosDbSinkTest.java | 31 +++++++++-- 2 files changed, 63 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java index a99bb400..9ced09e4 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java @@ -292,12 +292,14 @@ private void fireNextRequest() { _inflightRequestsCount++; metrics.startTimer(_requestLatencyName); - final CompletableFuture promise = sendHttpRequest(request, 0); - final CompletionStage responsePromise = promise + final HttpResponse httpResponse = sendHttpRequest(request, 0); + final CompletionStage responsePromise = httpResponse.getResponsePromise() .handle((result, err) -> { + System.out.printf("SHIT! result status code : %s.\n", result.getStatusCode()); metrics.stopTimer(_requestLatencyName); final Object returnValue; if (err == null) { + System.out.printf("SHIT! Error is null!"); final int responseStatusCode = result.getStatusCode(); final int responseStatusClass = responseStatusCode / 100; for (final int i : STATUS_CLASSES) { @@ -306,8 +308,10 @@ private void fireNextRequest() { responseStatusClass == i ? 1 : 0); } if (ACCEPTED_STATUS_CODES.contains(responseStatusCode)) { + metrics.incrementCounter(_httpSinkAttemptsName, httpResponse.getAttempt()); returnValue = new PostSuccess(result); } else { + System.out.printf("SHIT! Going to PostReject!"); returnValue = new PostRejected(request, result); metrics.incrementCounter(_samplesDroppedName); } @@ -322,22 +326,23 @@ private void fireNextRequest() { PatternsCS.pipe(responsePromise, context().dispatcher()).to(self()); } - private CompletableFuture sendHttpRequest( + private HttpResponse sendHttpRequest( final Request request, final int attempt) { final CompletableFuture promise = new CompletableFuture<>(); _client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); - promise.handle((result, err) -> { - if (err == null && ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) { - try (Metrics metrics = _metricsFactory.create()) { - metrics.incrementCounter(_httpSinkAttemptsName, attempt); - } - return promise; - } else { - return attempt < _maxRetries ? sendHttpRequest(request, attempt + 1) : promise; - } - }); - return promise; + return new HttpResponse( + promise.thenCompose(result -> { + if (ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) { + return CompletableFuture.completedFuture(result); + }else{ + System.out.printf("Failed for the %d time.\n", attempt+1); + return attempt < _maxRetries ? + sendHttpRequest(request, attempt+1).getResponsePromise() : + CompletableFuture.completedFuture(result); + } + }), + attempt); } @Override @@ -502,4 +507,22 @@ public long getEnterTime() { private final Request _request; private final long _enterTime; } + + private static final class HttpResponse { + private HttpResponse(final CompletableFuture responsePromise, final int attempt) { + _responsePromise = responsePromise; + _attempt = attempt; + } + + public CompletableFuture getResponsePromise() { + return _responsePromise; + } + + public int getAttempt() { + return _attempt; + } + + private final CompletableFuture _responsePromise; + private final int _attempt; + } } diff --git a/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java b/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java index 3cc88e6e..1584a062 100644 --- a/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java +++ b/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java @@ -19,6 +19,7 @@ import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory; import com.arpnetworking.metrics.Metrics; import com.arpnetworking.metrics.MetricsFactory; +import com.arpnetworking.test.TestBeanFactory; import com.arpnetworking.tsdcore.model.AggregatedData; import com.arpnetworking.tsdcore.model.Condition; import com.arpnetworking.tsdcore.model.FQDSN; @@ -63,8 +64,7 @@ public void setUp() { .setName("kairosdb_sink_test") .setActorSystem(getSystem()) .setUri(URI.create("http://localhost:" + _wireMockServer.port() + PATH)) - .setMetricsFactory(_mockMetricsFactory) - .setMaxRetries(5); + .setMetricsFactory(_mockMetricsFactory); Mockito.doReturn(_mockMetrics).when(_mockMetricsFactory).create(); } @@ -76,7 +76,7 @@ public void tearDown() { } @Test - public void testPost() throws InterruptedException, IOException { + public void testPostSuccess() throws InterruptedException, IOException { // Fake a successful post to KairosDb _wireMock.register(WireMock.post(WireMock.urlEqualTo(PATH)) .willReturn(WireMock.aResponse() @@ -135,7 +135,7 @@ public void testPost() throws InterruptedException, IOException { Assert.assertEquals(expected, actual); // Verify that metrics has been recorded. - Mockito.verify(_mockMetricsFactory, Mockito.times(2)).create(); + Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create(); Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/attempts", 0); Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/success", 1); Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/2xx", 1); @@ -145,9 +145,30 @@ public void testPost() throws InterruptedException, IOException { Mockito.any()); Mockito.verify(_mockMetrics, Mockito.times(1)).startTimer("sinks/http_post/kairosdb_sink_test/request_latency"); Mockito.verify(_mockMetrics, Mockito.times(1)).stopTimer("sinks/http_post/kairosdb_sink_test/request_latency"); - Mockito.verify(_mockMetrics, Mockito.times(2)).close(); + Mockito.verify(_mockMetrics, Mockito.times(1)).close(); } + @Test + public void testPostFailure() throws InterruptedException, IOException { + // Fake a successful post to KairosDb + _wireMock.register(WireMock.post(WireMock.urlEqualTo(PATH)) + .willReturn(WireMock.aResponse() + .withStatus(404))); + _kairosDbSinkBuilder.setMaxRetries(1).build().recordAggregateData(TestBeanFactory.createPeriodicData()); + Thread.sleep(1000); + + Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create(); + Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/4xx", 1); + Mockito.verify(_mockMetrics, Mockito.times(1)).setTimer( + Mockito.matches("sinks/http_post/kairosdb_sink_test/queue_time"), + Mockito.anyLong(), + Mockito.any()); + Mockito.verify(_mockMetrics, Mockito.times(1)).startTimer("sinks/http_post/kairosdb_sink_test/request_latency"); + Mockito.verify(_mockMetrics, Mockito.times(1)).stopTimer("sinks/http_post/kairosdb_sink_test/request_latency"); + Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/samples_dropped"); + Mockito.verify(_mockMetrics, Mockito.times(1)).close(); + } + private KairosDbSink.Builder _kairosDbSinkBuilder; private WireMockServer _wireMockServer; private WireMock _wireMock; From 0639d7267aeec772483e9292005777759ed98ef5 Mon Sep 17 00:00:00 2001 From: Qinyan Li Date: Mon, 29 Jun 2020 10:22:32 -0700 Subject: [PATCH 4/4] add exponential backoff for retry --- .../tsdcore/sinks/HttpSinkActor.java | 17 ++++++++++------- .../tsdcore/sinks/KairosDbSinkTest.java | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java index 9ced09e4..9776328f 100644 --- a/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java +++ b/src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java @@ -295,11 +295,9 @@ private void fireNextRequest() { final HttpResponse httpResponse = sendHttpRequest(request, 0); final CompletionStage responsePromise = httpResponse.getResponsePromise() .handle((result, err) -> { - System.out.printf("SHIT! result status code : %s.\n", result.getStatusCode()); metrics.stopTimer(_requestLatencyName); final Object returnValue; if (err == null) { - System.out.printf("SHIT! Error is null!"); final int responseStatusCode = result.getStatusCode(); final int responseStatusClass = responseStatusCode / 100; for (final int i : STATUS_CLASSES) { @@ -311,7 +309,6 @@ private void fireNextRequest() { metrics.incrementCounter(_httpSinkAttemptsName, httpResponse.getAttempt()); returnValue = new PostSuccess(result); } else { - System.out.printf("SHIT! Going to PostReject!"); returnValue = new PostRejected(request, result); metrics.incrementCounter(_samplesDroppedName); } @@ -329,16 +326,22 @@ private void fireNextRequest() { private HttpResponse sendHttpRequest( final Request request, final int attempt) { + try { + Thread.sleep(Double.valueOf(Math.pow(2, attempt) - 1).longValue() * 1000); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } final CompletableFuture promise = new CompletableFuture<>(); _client.executeRequest(request, new ResponseAsyncCompletionHandler(promise)); return new HttpResponse( promise.thenCompose(result -> { if (ACCEPTED_STATUS_CODES.contains(result.getStatusCode())) { return CompletableFuture.completedFuture(result); - }else{ - System.out.printf("Failed for the %d time.\n", attempt+1); - return attempt < _maxRetries ? - sendHttpRequest(request, attempt+1).getResponsePromise() : + } else { + return attempt < _maxRetries + ? + sendHttpRequest(request, attempt + 1).getResponsePromise() + : CompletableFuture.completedFuture(result); } }), diff --git a/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java b/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java index 1584a062..c36dd034 100644 --- a/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java +++ b/src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java @@ -155,7 +155,7 @@ public void testPostFailure() throws InterruptedException, IOException { .willReturn(WireMock.aResponse() .withStatus(404))); _kairosDbSinkBuilder.setMaxRetries(1).build().recordAggregateData(TestBeanFactory.createPeriodicData()); - Thread.sleep(1000); + Thread.sleep(3000); Mockito.verify(_mockMetricsFactory, Mockito.times(1)).create(); Mockito.verify(_mockMetrics, Mockito.times(1)).incrementCounter("sinks/http_post/kairosdb_sink_test/status/4xx", 1);