Skip to content

Commit c21925a

Browse files
authored
Actor reap (#238)
* Reap period worker actors after idle period. * Convert PeriodWorker to timers(). * Add super start/stop. * Call parent start/stop methods. * Add comment about idle worker period.
1 parent 8f47278 commit c21925a

File tree

17 files changed

+438
-201
lines changed

17 files changed

+438
-201
lines changed

config/pipelines/pipeline.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ name=example_pipeline
99
# "PT1M"
1010
#]
1111

12+
# Idle Timeout
13+
# ~~~~
14+
#idleTimeout="PT5M"
15+
1216
# Statistics
1317
# ~~~~
1418
#timerStatistics=[

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171

7272
<properties>
7373
<!--Dependency versions-->
74-
<akka.version>2.5.16</akka.version>
74+
<akka.version>2.5.32</akka.version>
7575
<akka.http.version>10.1.5</akka.http.version>
7676
<apache.httpclient.version>4.5.6</apache.httpclient.version>
7777
<apache.httpcore.version>4.4.10</apache.httpcore.version>
@@ -99,6 +99,7 @@
9999
<oval.version>1.90</oval.version>
100100
<protobuf.version>3.8.0</protobuf.version>
101101
<scala.version>2.11</scala.version>
102+
<scala.java8.compat.version>0.7.0</scala.java8.compat.version>
102103
<scala.library.version>2.11.12</scala.library.version>
103104
<slf4j.version>1.7.25</slf4j.version>
104105
<snappy.version>1.1.7.2</snappy.version>
@@ -758,6 +759,11 @@
758759
<artifactId>scala-library</artifactId>
759760
<version>${scala.library.version}</version>
760761
</dependency>
762+
<dependency>
763+
<groupId>org.scala-lang.modules</groupId>
764+
<artifactId>scala-java8-compat_${scala.version}</artifactId>
765+
<version>${scala.java8.compat.version}</version>
766+
</dependency>
761767
<dependency>
762768
<groupId>org.apache.httpcomponents</groupId>
763769
<artifactId>httpclient</artifactId>

src/main/java/com/arpnetworking/http/Routes.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import com.google.common.collect.ImmutableList;
6060
import com.google.common.io.Resources;
6161
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
62+
import scala.compat.java8.FutureConverters;
6263
import scala.concurrent.duration.FiniteDuration;
6364

6465
import java.util.Objects;
@@ -251,8 +252,9 @@ private CompletionStage<HttpResponse> process(final HttpRequest request) {
251252
}
252253

253254
private CompletionStage<HttpResponse> dispatchHttpRequest(final HttpRequest request, final String actorName) {
254-
final CompletionStage<ActorRef> refFuture = _actorSystem.actorSelection(actorName)
255-
.resolveOneCS(FiniteDuration.create(1, TimeUnit.SECONDS));
255+
final CompletionStage<ActorRef> refFuture = FutureConverters.toJava(
256+
_actorSystem.actorSelection(actorName)
257+
.resolveOne(FiniteDuration.create(1, TimeUnit.SECONDS)));
256258
return refFuture.thenCompose(
257259
ref -> {
258260
final CompletableFuture<HttpResponse> response = new CompletableFuture<>();

src/main/java/com/arpnetworking/metrics/common/sources/HttpSource.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,14 @@
4242
import com.arpnetworking.http.RequestReply;
4343
import com.arpnetworking.metrics.common.parsers.Parser;
4444
import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException;
45+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
46+
import com.arpnetworking.metrics.mad.model.Metric;
4547
import com.arpnetworking.metrics.mad.model.Record;
48+
import com.arpnetworking.metrics.mad.model.statistics.StatisticFactory;
4649
import com.arpnetworking.steno.Logger;
4750
import com.arpnetworking.steno.LoggerFactory;
51+
import com.arpnetworking.tsdcore.model.CalculatedValue;
52+
import com.fasterxml.jackson.annotation.JacksonInject;
4853
import com.google.common.collect.ImmutableMultimap;
4954
import net.sf.oval.constraint.NotNull;
5055

@@ -74,8 +79,10 @@ protected Props createProps() {
7479
protected HttpSource(final Builder<?, ? extends HttpSource> builder) {
7580
super(builder);
7681
_parser = builder._parser;
82+
_periodicMetrics = builder._periodicMetrics;
7783
}
7884

85+
private final PeriodicMetrics _periodicMetrics;
7986
private final Parser<List<Record>, com.arpnetworking.metrics.mad.model.HttpRequest> _parser;
8087

8188
/**
@@ -98,6 +105,7 @@ public Receive createReceive() {
98105
.match(RequestReply.class, requestReply -> {
99106
// TODO(barp): Fix the ugly HttpRequest cast here due to java vs scala dsl
100107
akka.stream.javadsl.Source.single(requestReply.getRequest())
108+
.log("http source stream failure")
101109
.via(_processGraph)
102110
.toMat(_sink, Keep.right())
103111
.run(_materializer)
@@ -127,6 +135,8 @@ public Receive createReceive() {
127135
* @param source The {@link HttpSource} to send notifications through.
128136
*/
129137
/* package private */ Actor(final HttpSource source) {
138+
_periodicMetrics = source._periodicMetrics;
139+
_metricSafeName = source.getMetricSafeName();
130140
_parser = source._parser;
131141
_sink = Sink.foreach(source::notify);
132142
_materializer = ActorMaterializer.create(
@@ -193,14 +203,36 @@ private static com.arpnetworking.metrics.mad.model.HttpRequest mapModel(
193203

194204
private List<Record> parseRecords(final com.arpnetworking.metrics.mad.model.HttpRequest request)
195205
throws ParsingException {
196-
return _parser.parse(request);
206+
final List<Record> records = _parser.parse(request);
207+
long samples = 0;
208+
for (final Record record : records) {
209+
for (final Metric metric : record.getMetrics().values()) {
210+
samples += metric.getValues().size();
211+
final List<CalculatedValue<?>> countStatistic =
212+
metric.getStatistics().get(STATISTIC_FACTORY.getStatistic("count"));
213+
if (countStatistic != null) {
214+
samples += countStatistic.stream()
215+
.map(s -> s.getValue().getValue())
216+
.reduce(Double::sum)
217+
.orElse(0.0d);
218+
}
219+
}
220+
}
221+
_periodicMetrics.recordGauge(
222+
String.format("sources/http/%s/metric_samples", _metricSafeName),
223+
samples);
224+
225+
return records;
197226
}
198227

228+
private final PeriodicMetrics _periodicMetrics;
229+
private final String _metricSafeName;
199230
private final Sink<Record, CompletionStage<Done>> _sink;
200231
private final Parser<List<Record>, com.arpnetworking.metrics.mad.model.HttpRequest> _parser;
201232
private final Materializer _materializer;
202233
private final Graph<FlowShape<HttpRequest, Record>, NotUsed> _processGraph;
203234

235+
private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory();
204236
private static final Logger BAD_REQUEST_LOGGER =
205237
LoggerFactory.getRateLimitLogger(HttpSource.class, Duration.ofSeconds(30));
206238
}
@@ -235,7 +267,22 @@ public B setParser(final Parser<List<Record>, com.arpnetworking.metrics.mad.mode
235267
return self();
236268
}
237269

270+
/**
271+
* Sets the periodic metrics instance.
272+
*
273+
* @param value The periodic metrics.
274+
* @return This instance of {@link Builder}
275+
*/
276+
public final B setPeriodicMetrics(final PeriodicMetrics value) {
277+
_periodicMetrics = value;
278+
return self();
279+
}
280+
238281
@NotNull
239282
private Parser<List<Record>, com.arpnetworking.metrics.mad.model.HttpRequest> _parser;
283+
284+
@NotNull
285+
@JacksonInject
286+
private PeriodicMetrics _periodicMetrics;
240287
}
241288
}

0 commit comments

Comments
 (0)