Skip to content

Commit 5511d25

Browse files
authored
Merge pull request #116 from abbeyqy/emitEvictionMetric
Emit eviction metric
2 parents 3f8a125 + 82905c9 commit 5511d25

File tree

5 files changed

+73
-11
lines changed

5 files changed

+73
-11
lines changed

src/main/java/com/arpnetworking/tsdcore/sinks/HttpPostSink.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import akka.http.javadsl.model.HttpMethods;
2222
import akka.http.javadsl.model.MediaTypes;
2323
import com.arpnetworking.logback.annotations.LogValue;
24+
import com.arpnetworking.metrics.MetricsFactory;
2425
import com.arpnetworking.steno.LogValueMapFactory;
2526
import com.arpnetworking.steno.Logger;
2627
import com.arpnetworking.steno.LoggerFactory;
@@ -153,7 +154,13 @@ protected HttpPostSink(final Builder<?, ?> builder) {
153154
_aysncHttpClientUri = Uri.create(_uri.toString());
154155

155156
_sinkActor = builder._actorSystem.actorOf(
156-
HttpSinkActor.props(CLIENT, this, builder._maximumConcurrency, builder._maximumQueueSize, builder._spreadPeriod));
157+
HttpSinkActor.props(
158+
CLIENT,
159+
this,
160+
builder._maximumConcurrency,
161+
builder._maximumQueueSize,
162+
builder._spreadPeriod,
163+
builder._metricsFactory));
157164
}
158165

159166
private final URI _uri;
@@ -201,6 +208,18 @@ public B setActorSystem(final ActorSystem value) {
201208
return self();
202209
}
203210

211+
/**
212+
* Instance of <code>MetricsFactory</code>. Cannot be null. This field
213+
* may be injected automatically by Jackson/Guice if setup to do so.
214+
*
215+
* @param value Instance of <code>MetricsFactory</code>.
216+
* @return This instance of <code>Builder</code>.
217+
*/
218+
public B setMetricsFactory(final MetricsFactory value) {
219+
_metricsFactory = value;
220+
return self();
221+
}
222+
204223
/**
205224
* Sets the maximum concurrency of the http requests. Optional. Cannot be null.
206225
* Default is 1. Minimum is 1.
@@ -259,5 +278,8 @@ protected Builder(final Function<B, S> targetConstructor) {
259278
private Integer _maximumQueueSize = 25000;
260279
@NotNull
261280
private Period _spreadPeriod = Period.ZERO;
281+
@JacksonInject
282+
@NotNull
283+
private MetricsFactory _metricsFactory;
262284
}
263285
}

src/main/java/com/arpnetworking/tsdcore/sinks/HttpSinkActor.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import akka.http.javadsl.model.StatusCodes;
2121
import akka.pattern.PatternsCS;
2222
import com.arpnetworking.logback.annotations.LogValue;
23+
import com.arpnetworking.metrics.Metrics;
24+
import com.arpnetworking.metrics.MetricsFactory;
2325
import com.arpnetworking.steno.LogValueMapFactory;
2426
import com.arpnetworking.steno.Logger;
2527
import com.arpnetworking.steno.LoggerFactory;
@@ -57,15 +59,17 @@ public class HttpSinkActor extends AbstractActor {
5759
* @param maximumConcurrency Maximum number of concurrent requests.
5860
* @param maximumQueueSize Maximum number of pending requests.
5961
* @param spreadPeriod Maximum time to delay sending new aggregates to spread load.
62+
* @param metricsFactory metrics factory to record metrics.
6063
* @return A new Props
6164
*/
6265
public static Props props(
6366
final AsyncHttpClient client,
6467
final HttpPostSink sink,
6568
final int maximumConcurrency,
6669
final int maximumQueueSize,
67-
final Period spreadPeriod) {
68-
return Props.create(HttpSinkActor.class, client, sink, maximumConcurrency, maximumQueueSize, spreadPeriod);
70+
final Period spreadPeriod,
71+
final MetricsFactory metricsFactory) {
72+
return Props.create(HttpSinkActor.class, client, sink, maximumConcurrency, maximumQueueSize, spreadPeriod, metricsFactory);
6973
}
7074

7175
/**
@@ -76,22 +80,27 @@ public static Props props(
7680
* @param maximumConcurrency Maximum number of concurrent requests.
7781
* @param maximumQueueSize Maximum number of pending requests.
7882
* @param spreadPeriod Maximum time to delay sending new aggregates to spread load.
83+
* @param metricsFactory metrics factory to record metrics.
7984
*/
8085
public HttpSinkActor(
8186
final AsyncHttpClient client,
8287
final HttpPostSink sink,
8388
final int maximumConcurrency,
8489
final int maximumQueueSize,
85-
final Period spreadPeriod) {
90+
final Period spreadPeriod,
91+
final MetricsFactory metricsFactory) {
8692
_client = client;
8793
_sink = sink;
8894
_maximumConcurrency = maximumConcurrency;
8995
_pendingRequests = EvictingQueue.create(maximumQueueSize);
96+
_metricsFactory = metricsFactory;
9097
if (Period.ZERO.equals(spreadPeriod)) {
9198
_spreadingDelayMillis = 0;
9299
} else {
93100
_spreadingDelayMillis = new Random().nextInt((int) spreadPeriod.toStandardDuration().getMillis());
94101
}
102+
103+
_evictedRequestsName = "sinks/http_post/" + _sink.getMetricSafeName() + "/evicted_requests";
95104
}
96105

97106
/**
@@ -108,6 +117,7 @@ public Object toLogValue() {
108117
.put("waiting", _waiting)
109118
.put("inflightRequestsCount", _inflightRequestsCount)
110119
.put("pendingRequestsCount", _pendingRequests.size())
120+
.put("metricsFactory", _metricsFactory)
111121
.build();
112122
}
113123

@@ -200,6 +210,10 @@ private void processEmitAggregation(final EmitAggregation emitMessage) {
200210
}
201211

202212
if (evicted > 0) {
213+
// TODO(qinyanl): Convert to periodic metric in the future.
214+
try (Metrics metrics = _metricsFactory.create()) {
215+
metrics.incrementCounter(_evictedRequestsName, evicted);
216+
}
203217
EVICTED_LOGGER.warn()
204218
.setMessage("Evicted data from HTTP sink queue")
205219
.addData("sink", _sink)
@@ -272,6 +286,9 @@ public void postStop() throws Exception {
272286
private final AsyncHttpClient _client;
273287
private final HttpPostSink _sink;
274288
private final int _spreadingDelayMillis;
289+
private final MetricsFactory _metricsFactory;
290+
291+
private final String _evictedRequestsName;
275292

276293
private static final Logger LOGGER = LoggerFactory.getLogger(HttpPostSink.class);
277294
private static final Logger EVICTED_LOGGER = LoggerFactory.getRateLimitLogger(HttpPostSink.class, Duration.ofSeconds(30));

src/test/java/com/arpnetworking/tsdcore/sinks/InfluxDbSinkTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.arpnetworking.tsdcore.sinks;
1717

1818
import akka.actor.ActorSystem;
19+
import com.arpnetworking.metrics.MetricsFactory;
20+
import com.arpnetworking.metrics.impl.TsdMetricsFactory;
1921
import com.arpnetworking.test.TestBeanFactory;
2022
import com.arpnetworking.tsdcore.model.AggregatedData;
2123
import com.arpnetworking.tsdcore.model.PeriodicData;
@@ -43,6 +45,9 @@ public final class InfluxDbSinkTest {
4345
private InfluxDbSink.Builder _influxBuilder;
4446
private static final ActorSystem ACTOR_SYSTEM = ActorSystem.apply();
4547
private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory();
48+
private static final MetricsFactory METRICS_FACTORY = TsdMetricsFactory.newInstance(
49+
"mock_metrics_factory",
50+
"mock_metrics_factory");
4651

4752
@AfterClass
4853
public static void afterClass() {
@@ -55,7 +60,8 @@ public void testSerializeWithTwoValue() throws Exception {
5560
_influxBuilder = new InfluxDbSink.Builder()
5661
.setName("monitord_sink_test")
5762
.setActorSystem(ACTOR_SYSTEM)
58-
.setUri(URI.create("http://localhost:8888"));
63+
.setUri(URI.create("http://localhost:8888"))
64+
.setMetricsFactory(METRICS_FACTORY);
5965

6066

6167
final String service = "service-testSerializeMerge";
@@ -112,7 +118,8 @@ public void testSerializeWithTwoMetrics() throws Exception {
112118
_influxBuilder = new InfluxDbSink.Builder()
113119
.setName("monitord_sink_test")
114120
.setActorSystem(ACTOR_SYSTEM)
115-
.setUri(URI.create("http://localhost:8888"));
121+
.setUri(URI.create("http://localhost:8888"))
122+
.setMetricsFactory(METRICS_FACTORY);
116123

117124

118125
final String service = "service-testSerializeMerge";
@@ -171,7 +178,8 @@ public void testSerializeEscapeCharacters() throws Exception {
171178
_influxBuilder = new InfluxDbSink.Builder()
172179
.setName("monitord_sink_test")
173180
.setActorSystem(ACTOR_SYSTEM)
174-
.setUri(URI.create("http://localhost:8888"));
181+
.setUri(URI.create("http://localhost:8888"))
182+
.setMetricsFactory(METRICS_FACTORY);
175183

176184

177185
final String service = "service test,Serialize=Merge";

src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import akka.http.javadsl.model.MediaTypes;
1919
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
20+
import com.arpnetworking.metrics.Metrics;
21+
import com.arpnetworking.metrics.MetricsFactory;
2022
import com.arpnetworking.tsdcore.model.AggregatedData;
2123
import com.arpnetworking.tsdcore.model.Condition;
2224
import com.arpnetworking.tsdcore.model.FQDSN;
@@ -37,6 +39,8 @@
3739
import org.junit.Assert;
3840
import org.junit.Before;
3941
import org.junit.Test;
42+
import org.mockito.Mock;
43+
import org.mockito.Mockito;
4044

4145
import java.io.IOException;
4246
import java.net.URI;
@@ -55,11 +59,12 @@ public void setUp() {
5559
_wireMockServer = new WireMockServer(0);
5660
_wireMockServer.start();
5761
_wireMock = new WireMock(_wireMockServer.port());
58-
5962
_kairosDbSinkBuilder = new KairosDbSink.Builder()
6063
.setName("kairosdb_sink_test")
6164
.setActorSystem(getSystem())
62-
.setUri(URI.create("http://localhost:" + _wireMockServer.port() + PATH));
65+
.setUri(URI.create("http://localhost:" + _wireMockServer.port() + PATH))
66+
.setMetricsFactory(_mockMetricsFactory);
67+
Mockito.doReturn(_mockMetrics).when(_mockMetricsFactory).create();
6368
}
6469

6570
@After
@@ -113,7 +118,6 @@ public void testPost() throws InterruptedException, IOException {
113118
.setPeriod(Period.minutes(1))
114119
.setStart(start)
115120
.build());
116-
117121
// Allow the request/response to complete
118122
Thread.sleep(1000);
119123

@@ -137,4 +141,9 @@ public void testPost() throws InterruptedException, IOException {
137141
private static final String PATH = "/kairos/post/path";
138142
private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory();
139143
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
144+
145+
@Mock
146+
private Metrics _mockMetrics;
147+
@Mock
148+
private MetricsFactory _mockMetricsFactory;
140149
}

src/test/java/com/arpnetworking/tsdcore/sinks/MonitordSinkTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.arpnetworking.tsdcore.sinks;
1717

1818
import akka.actor.ActorSystem;
19+
import com.arpnetworking.metrics.MetricsFactory;
20+
import com.arpnetworking.metrics.impl.TsdMetricsFactory;
1921
import com.arpnetworking.test.TestBeanFactory;
2022
import com.arpnetworking.tsdcore.model.AggregatedData;
2123
import com.arpnetworking.tsdcore.model.PeriodicData;
@@ -46,7 +48,8 @@ public void before() {
4648
_monitordSinkBuilder = new MonitordSink.Builder()
4749
.setName("monitord_sink_test")
4850
.setActorSystem(ACTOR_SYSTEM)
49-
.setUri(URI.create("http://localhost:8888"));
51+
.setUri(URI.create("http://localhost:8888"))
52+
.setMetricsFactory(METRICS_FACTORY);
5053
}
5154

5255
@Test
@@ -168,4 +171,7 @@ public void testSerializeNoMergePeriod() {
168171

169172
private MonitordSink.Builder _monitordSinkBuilder;
170173
private static final ActorSystem ACTOR_SYSTEM = ActorSystem.apply();
174+
private static final MetricsFactory METRICS_FACTORY = TsdMetricsFactory.newInstance(
175+
"mock_metrics_factory",
176+
"mock_metrics_factory");
171177
}

0 commit comments

Comments
 (0)