Skip to content

Commit 8a232d2

Browse files
abbeyqyvjkoskela
andauthored
add retries to cagg httpSinkActor v2 (#126)
* implement cagg http retry as a message in actor * add more log info; make base backoff and maximum delay configurable; set retryable status code * make thread in test wait with a timeout * validate Period non-negative, calculate sample counts * remove getSamplesCount and remove samples_dropped metric Co-authored-by: Ville Koskela <[email protected]>
1 parent 885e1f0 commit 8a232d2

File tree

6 files changed

+310
-40
lines changed

6 files changed

+310
-40
lines changed

lib/awaitility-4.0.2.jar

87.8 KB
Binary file not shown.

lib/hamcrest-2.1.jar

120 KB
Binary file not shown.

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,12 @@
901901
<version>${akka.persistence.cassandra.version}</version>
902902
<scope>test</scope>
903903
</dependency>
904+
<dependency>
905+
<groupId>org.awaitility</groupId>
906+
<artifactId>awaitility</artifactId>
907+
<version>4.0.2</version>
908+
<scope>test</scope>
909+
</dependency>
904910
</dependencies>
905911
<profiles>
906912
<profile>

src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import com.arpnetworking.tsdcore.model.PeriodicData;
2929
import com.fasterxml.jackson.annotation.JacksonInject;
3030
import com.google.common.collect.Lists;
31+
import net.sf.oval.constraint.CheckWith;
32+
import net.sf.oval.constraint.CheckWithCheck;
3133
import net.sf.oval.constraint.Min;
3234
import net.sf.oval.constraint.NotNull;
3335
import org.asynchttpclient.AsyncHttpClient;
@@ -133,6 +135,33 @@ protected URI getUri() {
133135
protected Uri getAysncHttpClientUri() {
134136
return _aysncHttpClientUri;
135137
}
138+
139+
/**
140+
* Accessor for the MaximumAttempts.
141+
*
142+
* @return The MaximumAttempts.
143+
*/
144+
protected int getMaximumAttempts() {
145+
return _maximumAttempts;
146+
}
147+
148+
/**
149+
* Accessor for the BaseBackoff of retries <code>Period</code>.
150+
*
151+
* @return The BaseBackoff <code>Period</code>.
152+
*/
153+
protected Period getRetryBaseBackoff() {
154+
return _baseBackoff;
155+
}
156+
157+
/**
158+
* Accessor for the MaximumDelay of retries <code>Period</code>.
159+
*
160+
* @return The MaximumDelay <code>Period</code>.
161+
*/
162+
protected Period getRetryMaximumDelay() {
163+
return _maximumDelay;
164+
}
136165

137166
/**
138167
* Serialize the <code>PeriodicData</code> and <code>Condition</code> instances
@@ -161,11 +190,18 @@ protected HttpPostSink(final Builder<?, ?> builder) {
161190
builder._maximumQueueSize,
162191
builder._spreadPeriod,
163192
builder._metricsFactory));
193+
194+
_maximumAttempts = builder._maximumAttempts;
195+
_baseBackoff = builder._baseBackoff;
196+
_maximumDelay = builder._maximumDelay;
164197
}
165198

166199
private final URI _uri;
167200
private final Uri _aysncHttpClientUri;
168201
private final ActorRef _sinkActor;
202+
private final int _maximumAttempts;
203+
private final Period _baseBackoff;
204+
private final Period _maximumDelay;
169205

170206
private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
171207
private static final AsyncHttpClient CLIENT;
@@ -244,6 +280,18 @@ public B setSpreadPeriod(final Period value) {
244280
return self();
245281
}
246282

283+
/**
284+
* Sets the maximum number of attempts of the http requests. Optional. Cannot be null.
285+
* Default is 1. Minimum is 1.
286+
*
287+
* @param value the maximum number of attempts of the http requests.
288+
* @return this builder
289+
*/
290+
public B setMaximumAttempts(final Integer value) {
291+
_maximumAttempts = value;
292+
return self();
293+
}
294+
247295
/**
248296
* Sets the maximum pending queue size. Optional Cannot be null.
249297
* Default is 25000. Minimum is 1.
@@ -256,6 +304,30 @@ public B setMaximumQueueSize(final Integer value) {
256304
return self();
257305
}
258306

307+
/**
308+
* Sets the base backoff period. Optional Cannot be null.
309+
* Default is 50 milliseconds.
310+
*
311+
* @param value the base backoff period
312+
* @return this builder
313+
*/
314+
public B setBaseBackoff(final Period value) {
315+
_baseBackoff = value;
316+
return self();
317+
}
318+
319+
/**
320+
* Sets the maximum delay for retries. Optional Cannot be null.
321+
* Default is 60 seconds.
322+
*
323+
* @param value the maximum delay for retries
324+
* @return this builder
325+
*/
326+
public B setMaximumDelay(final Period value) {
327+
_maximumDelay = value;
328+
return self();
329+
}
330+
259331
/**
260332
* Protected constructor for subclasses.
261333
*
@@ -281,5 +353,28 @@ protected Builder(final Function<B, S> targetConstructor) {
281353
@JacksonInject
282354
@NotNull
283355
private MetricsFactory _metricsFactory;
356+
@NotNull
357+
@Min(1)
358+
private Integer _maximumAttempts = 1;
359+
@NotNull
360+
private Period _baseBackoff = Period.millis(50);
361+
@NotNull
362+
@CheckWith(CheckPeriod.class)
363+
private Period _maximumDelay = Period.seconds(60);
364+
365+
366+
private static final class CheckPeriod implements CheckWithCheck.SimpleCheck {
367+
368+
private static final long serialVersionUID = -6924010227680984149L;
369+
370+
@Override
371+
public boolean isSatisfied(final Object validatedObject, final Object value) {
372+
if (!(validatedObject instanceof HttpPostSink.Builder)) {
373+
return false;
374+
}
375+
final HttpPostSink.Builder<?, ?> builder = (HttpPostSink.Builder<?, ?>) validatedObject;
376+
return builder._baseBackoff.getMillis() >= 0 && builder._maximumDelay.getMillis() >= 0;
377+
}
378+
}
284379
}
285380
}

0 commit comments

Comments
 (0)