|
15 | 15 | */ |
16 | 16 | package com.arpnetworking.tsdcore.sinks; |
17 | 17 |
|
18 | | -import akka.actor.AbstractActor; |
| 18 | +import akka.actor.AbstractActorWithTimers; |
19 | 19 | import akka.actor.Props; |
20 | 20 | import akka.pattern.Patterns; |
21 | 21 | import com.arpnetworking.logback.annotations.LogValue; |
|
52 | 52 | * |
53 | 53 | * @author Brandon Arp (brandon dot arp at inscopemetrics dot io) |
54 | 54 | */ |
55 | | -public class HttpPostSinkActor extends AbstractActor { |
| 55 | +public class HttpPostSinkActor extends AbstractActorWithTimers { |
56 | 56 | /** |
57 | 57 | * Factory method to create a Props. |
58 | 58 | * |
@@ -112,6 +112,7 @@ public HttpPostSinkActor( |
112 | 112 | _responseStatusName = "sinks/http_post/" + sink.getMetricSafeName() + "/status"; |
113 | 113 | _samplesDroppedName = "sinks/http_post/" + sink.getMetricSafeName() + "/samples_dropped"; |
114 | 114 | _samplesSentName = "sinks/http_post/" + sink.getMetricSafeName() + "/samples_sent"; |
| 115 | + timers().startTimerAtFixedRate("metrics", SampleMetrics.INSTANCE, Duration.ofSeconds(1), Duration.ofSeconds(1)); |
115 | 116 | } |
116 | 117 |
|
117 | 118 | @Override |
@@ -179,9 +180,19 @@ public Receive createReceive() { |
179 | 180 | .match(PostRejected.class, this::processRejectedRequest) |
180 | 181 | .match(PostFailure.class, this::processFailedRequest) |
181 | 182 | .match(WaitTimeExpired.class, this::waitTimeExpired) |
| 183 | + .match(SampleMetrics.class, this::sampleMetrics) |
182 | 184 | .build(); |
183 | 185 | } |
184 | 186 |
|
| 187 | + private void sampleMetrics(final SampleMetrics ignored) { |
| 188 | + _periodicMetrics.recordGauge(_inflightRequestsCountName, _inflightRequestsCount); |
| 189 | + _periodicMetrics.recordGauge(_pendingRequestsQueueSizeName, _pendingRequests.size()); |
| 190 | + _periodicMetrics.recordCounter(_samplesSentName, 0); |
| 191 | + _periodicMetrics.recordCounter(_evictedRequestsName, 0); |
| 192 | + _periodicMetrics.recordCounter(_requestSuccessName, 0); |
| 193 | + _periodicMetrics.recordCounter(_samplesDroppedName, 0); |
| 194 | + } |
| 195 | + |
185 | 196 | private void waitTimeExpired(final WaitTimeExpired ignored) { |
186 | 197 | LOGGER.debug() |
187 | 198 | .setMessage("Received WaitTimeExpired message") |
@@ -489,4 +500,9 @@ public void onThrowable(final Throwable throwable) { |
489 | 500 |
|
490 | 501 | private final CompletableFuture<Response> _promise; |
491 | 502 | } |
| 503 | + |
| 504 | + private static final class SampleMetrics { |
| 505 | + private SampleMetrics() { } |
| 506 | + private static final SampleMetrics INSTANCE = new SampleMetrics(); |
| 507 | + } |
492 | 508 | } |
0 commit comments