Skip to content

Commit 5806fda

Browse files
authored
Create actor pools to parallelize ingest of metrics from a source (#262)
* add a pool argument for sources * add inflight and queue size metrics to http post
1 parent e816a3e commit 5806fda

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

src/main/java/com/arpnetworking/metrics/common/sources/ActorSource.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import akka.actor.PoisonPill;
2121
import akka.actor.Props;
2222
import akka.pattern.Patterns;
23+
import akka.routing.RoundRobinPool;
2324
import com.arpnetworking.steno.Logger;
2425
import com.arpnetworking.steno.LoggerFactory;
2526
import com.fasterxml.jackson.annotation.JacksonInject;
27+
import net.sf.oval.constraint.Min;
2628
import net.sf.oval.constraint.NotEmpty;
2729
import net.sf.oval.constraint.NotNull;
2830

@@ -41,7 +43,7 @@ public abstract class ActorSource extends BaseSource {
4143
@Override
4244
public void start() {
4345
if (_actor == null) {
44-
_actor = _actorSystem.actorOf(createProps(), _actorName);
46+
_actor = _actorSystem.actorOf(new RoundRobinPool(_poolSize).props(createProps()), _actorName);
4547
}
4648
}
4749

@@ -110,12 +112,14 @@ protected ActorSource(final Builder<?, ? extends ActorSource> builder) {
110112
super(builder);
111113
_actorName = builder._actorName;
112114
_actorSystem = builder._actorSystem;
115+
_poolSize = builder._poolSize;
113116
}
114117

115118
private ActorRef _actor = null;
116119

117120
private final String _actorName;
118121
private final ActorSystem _actorSystem;
122+
private final int _poolSize;
119123

120124
private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(1);
121125
private static final Logger LOGGER = LoggerFactory.getLogger(ActorSource.class);
@@ -159,9 +163,22 @@ public final B setActorSystem(final ActorSystem value) {
159163
return self();
160164
}
161165

166+
/**
167+
* Sets the actor pool size.
168+
* @param value Number of actors in the pool
169+
* @return This instance of {@link Builder}
170+
*/
171+
public final B setPoolSize(final Integer value) {
172+
_poolSize = value;
173+
return self();
174+
}
175+
162176
@NotNull
163177
@NotEmpty
164178
private String _actorName;
179+
@NotNull
180+
@Min(1)
181+
private Integer _poolSize = 1;
165182
@JacksonInject
166183
private ActorSystem _actorSystem;
167184
}

src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSinkActor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ public HttpPostSinkActor(
106106
_evictedRequestsName = "sinks/http_post/" + sink.getMetricSafeName() + "/evicted_requests";
107107
_requestLatencyName = "sinks/http_post/" + sink.getMetricSafeName() + "/request_latency";
108108
_inQueueLatencyName = "sinks/http_post/" + sink.getMetricSafeName() + "/queue_time";
109+
_pendingRequestsQueueSizeName = "sinks/http_post/" + sink.getMetricSafeName() + "/queue_size";
110+
_inflightRequestsCountName = "sinks/http_post/" + sink.getMetricSafeName() + "/inflight_count";
109111
_requestSuccessName = "sinks/http_post/" + sink.getMetricSafeName() + "/success";
110112
_responseStatusName = "sinks/http_post/" + sink.getMetricSafeName() + "/status";
111113
_samplesDroppedName = "sinks/http_post/" + sink.getMetricSafeName() + "/samples_dropped";
@@ -240,6 +242,7 @@ private void processEmitAggregation(final EmitAggregation emitMessage) {
240242
.addData("dataSize", periodicData.getData().size())
241243
.addContext("actor", self())
242244
.log();
245+
_periodicMetrics.recordGauge(_inflightRequestsCountName, _inflightRequestsCount);
243246

244247
if (!periodicData.getData().isEmpty()) {
245248
final Collection<RequestEntry.Builder> requestEntryBuilders = _sink.createRequests(_client, periodicData);
@@ -262,6 +265,8 @@ private void processEmitAggregation(final EmitAggregation emitMessage) {
262265
.log();
263266
}
264267

268+
_periodicMetrics.recordGauge(_pendingRequestsQueueSizeName, _pendingRequests.size());
269+
265270
if (_spreadingDelayMillis > 0) {
266271
// If we don't currently have anything in-flight, we'll need to wait the spreading duration.
267272
if (!_waiting && pendingWasEmpty) {
@@ -362,6 +367,8 @@ private void fireNextRequest() {
362367
private final String _evictedRequestsName;
363368
private final String _requestLatencyName;
364369
private final String _inQueueLatencyName;
370+
private final String _pendingRequestsQueueSizeName;
371+
private final String _inflightRequestsCountName;
365372
private final String _requestSuccessName;
366373
private final String _responseStatusName;
367374
private final String _samplesDroppedName;

0 commit comments

Comments
 (0)