Skip to content

Commit 10291e2

Browse files
authored
add and use periodic metrics from incubator extra (#57)
1 parent 40fdcf5 commit 10291e2

13 files changed

+111
-179
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
<metrics.aggregator.protocol.version>1.0.3</metrics.aggregator.protocol.version>
100100
<metrics.client.version>0.6.0</metrics.client.version>
101101
<metrics.client.http.version>0.6.0</metrics.client.http.version>
102+
<metrics.client.incubator.version>0.6.0</metrics.client.incubator.version>
102103
<metrics.generator.version>1.1.0</metrics.generator.version>
103104
<metrics.jvm.extra.version>0.4.2</metrics.jvm.extra.version>
104105
<mockito.version>1.10.19</mockito.version>
@@ -418,6 +419,11 @@
418419
<artifactId>apache-http-sink-extra</artifactId>
419420
<version>${metrics.client.http.version}</version>
420421
</dependency>
422+
<dependency>
423+
<groupId>com.arpnetworking.metrics.extras</groupId>
424+
<artifactId>incubator-extra</artifactId>
425+
<version>${metrics.client.incubator.version}</version>
426+
</dependency>
421427
<dependency>
422428
<groupId>com.arpnetworking.metrics.extras</groupId>
423429
<artifactId>jvm-extra</artifactId>

src/main/java/com/arpnetworking/http/Routes.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,10 @@
3838
import akka.stream.javadsl.Source;
3939
import akka.util.ByteString;
4040
import akka.util.Timeout;
41-
import com.arpnetworking.metrics.Counter;
42-
import com.arpnetworking.metrics.Metrics;
43-
import com.arpnetworking.metrics.MetricsFactory;
44-
import com.arpnetworking.metrics.Timer;
41+
import com.arpnetworking.metrics.Units;
4542
import com.arpnetworking.metrics.common.sources.ClientHttpSourceV1;
4643
import com.arpnetworking.metrics.common.sources.CollectdHttpSourceV1;
44+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
4745
import com.arpnetworking.metrics.mad.actors.Status;
4846
import com.arpnetworking.metrics.proxy.actors.Connection;
4947
import com.arpnetworking.metrics.proxy.models.messages.Connect;
@@ -54,6 +52,7 @@
5452
import com.arpnetworking.steno.Logger;
5553
import com.arpnetworking.steno.LoggerFactory;
5654
import com.google.common.base.Charsets;
55+
import com.google.common.base.Stopwatch;
5756
import com.google.common.io.Resources;
5857
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
5958
import scala.compat.java8.FutureConverters;
@@ -78,19 +77,19 @@ public final class Routes implements Function<HttpRequest, CompletionStage<HttpR
7877
* Public constructor.
7978
*
8079
* @param actorSystem Instance of <code>ActorSystem</code>.
81-
* @param metricsFactory Instance of <code>MetricsFactory</code>.
80+
* @param metrics Instance of <code>PeriodicMetrics</code>.
8281
* @param healthCheckPath The path for the health check.
8382
* @param statusPath The path for the status.
8483
* @param supplementalRoutes List of supplemental routes in priority order.
8584
*/
8685
public Routes(
8786
final ActorSystem actorSystem,
88-
final MetricsFactory metricsFactory,
87+
final PeriodicMetrics metrics,
8988
final String healthCheckPath,
9089
final String statusPath,
9190
final List<SupplementalRoutes> supplementalRoutes) {
9291
_actorSystem = actorSystem;
93-
_metricsFactory = metricsFactory;
92+
_metrics = metrics;
9493
_healthCheckPath = healthCheckPath;
9594
_statusPath = statusPath;
9695
_supplementalRoutes = supplementalRoutes;
@@ -111,10 +110,8 @@ public Flow<HttpRequest, HttpResponse, NotUsed> flow() {
111110
*/
112111
@Override
113112
public CompletionStage<HttpResponse> apply(final HttpRequest request) {
114-
final Metrics metrics = _metricsFactory.create();
115-
final Timer timer = metrics.createTimer(createMetricName(request, REQUEST_METRIC));
116-
final Counter bodySize = metrics.createCounter(createMetricName(request, BODY_SIZE_METRIC));
117-
bodySize.increment(request.entity().getContentLengthOption().orElse(0L));
113+
final Stopwatch requestTimer = Stopwatch.createStarted();
114+
_metrics.recordCounter(createMetricName(request, BODY_SIZE_METRIC), request.entity().getContentLengthOption().orElse(0L));
118115
// TODO(vkoskela): Add a request UUID and include in MDC. [MAI-462]
119116
LOGGER.trace()
120117
.setEvent("http.in.start")
@@ -124,8 +121,11 @@ public CompletionStage<HttpResponse> apply(final HttpRequest request) {
124121
.log();
125122
return process(request).<HttpResponse>whenComplete(
126123
(response, failure) -> {
127-
timer.close();
128-
metrics.close();
124+
requestTimer.stop();
125+
_metrics.recordTimer(
126+
createMetricName(request, REQUEST_METRIC),
127+
requestTimer.elapsed(TimeUnit.NANOSECONDS),
128+
Optional.of(Units.NANOSECOND));
129129
final LogBuilder log = LOGGER.trace()
130130
.setEvent("http.in")
131131
.addData("method", request.method())
@@ -217,7 +217,7 @@ private CompletionStage<HttpResponse> getHttpResponseForTelemetry(
217217
final akka.http.impl.engine.ws.UpgradeToWebSocketLowLevel lowLevelUpgradeToWebSocketHeader =
218218
(akka.http.impl.engine.ws.UpgradeToWebSocketLowLevel) upgradeToWebSocketHeader.get();
219219

220-
final ActorRef connection = _actorSystem.actorOf(Connection.props(_metricsFactory, messageProcessorsFactory));
220+
final ActorRef connection = _actorSystem.actorOf(Connection.props(_metrics, messageProcessorsFactory));
221221
final Sink<Message, ?> inChannel = Sink.actorRef(connection, PoisonPill.getInstance());
222222
final Source<Message, ActorRef> outChannel = Source.<Message>actorRef(TELEMETRY_BUFFER_SIZE, OverflowStrategy.dropBuffer())
223223
.<ActorRef>mapMaterializedValue(channel -> {
@@ -270,7 +270,7 @@ private String createMetricName(final HttpRequest request, final String actionPa
270270
@SuppressFBWarnings("SE_BAD_FIELD")
271271
private final ActorSystem _actorSystem;
272272
@SuppressFBWarnings("SE_BAD_FIELD")
273-
private final MetricsFactory _metricsFactory;
273+
private final PeriodicMetrics _metrics;
274274
private final String _healthCheckPath;
275275
private final String _statusPath;
276276
@SuppressFBWarnings("SE_BAD_FIELD")

src/main/java/com/arpnetworking/metrics/mad/Main.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import com.arpnetworking.metrics.MetricsFactory;
3838
import com.arpnetworking.metrics.impl.ApacheHttpSink;
3939
import com.arpnetworking.metrics.impl.TsdMetricsFactory;
40+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
41+
import com.arpnetworking.metrics.incubator.impl.TsdPeriodicMetrics;
4042
import com.arpnetworking.metrics.jvm.JvmMetricsRunnable;
4143
import com.arpnetworking.metrics.mad.actors.Status;
4244
import com.arpnetworking.metrics.mad.configuration.AggregatorConfiguration;
@@ -55,8 +57,10 @@
5557
import com.google.inject.AbstractModule;
5658
import com.google.inject.Guice;
5759
import com.google.inject.Injector;
60+
import com.google.inject.Provides;
5861
import com.typesafe.config.Config;
5962
import com.typesafe.config.ConfigFactory;
63+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
6064
import scala.concurrent.Await;
6165
import scala.concurrent.Future;
6266
import scala.concurrent.duration.Duration;
@@ -70,6 +74,7 @@
7074
import java.util.Map;
7175
import java.util.Optional;
7276
import java.util.Set;
77+
import java.util.concurrent.CompletableFuture;
7378
import java.util.concurrent.CompletionStage;
7479
import java.util.concurrent.Executors;
7580
import java.util.concurrent.ScheduledExecutorService;
@@ -227,7 +232,7 @@ private void launchActors(final Injector injector) {
227232
final Materializer materializer = ActorMaterializer.create(actorSystem);
228233
final Routes routes = new Routes(
229234
actorSystem,
230-
injector.getInstance(MetricsFactory.class),
235+
injector.getInstance(PeriodicMetrics.class),
231236
_configuration.getHttpHealthCheckPath(),
232237
_configuration.getHttpStatusPath(),
233238
supplementalHttpRoutes);
@@ -500,6 +505,26 @@ public void configure() {
500505
bind(LifecycleRegistration.class).toInstance(_shutdown);
501506
}
502507

508+
@Provides
509+
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD")
510+
private PeriodicMetrics providePeriodicMetrics(
511+
final MetricsFactory metricsFactory,
512+
final LifecycleRegistration lifecycle) {
513+
final TsdPeriodicMetrics periodicMetrics = new TsdPeriodicMetrics.Builder()
514+
.setMetricsFactory(metricsFactory)
515+
.build();
516+
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
517+
r -> new Thread(r, "PeriodicMetricsCloser"));
518+
executor.scheduleAtFixedRate(periodicMetrics, 500, 500, TimeUnit.MILLISECONDS);
519+
520+
// Register to shutdown the executor when the Guice stack is shutdown.
521+
lifecycle.registerShutdown(() -> {
522+
executor.shutdown();
523+
return CompletableFuture.completedFuture(null);
524+
});
525+
return periodicMetrics;
526+
}
527+
503528
private final ActorSystem _actorSystem;
504529
private final MetricsFactory _metricsFactory;
505530
private final AppShutdown _shutdown;

src/main/java/com/arpnetworking/metrics/proxy/actors/Connection.java

Lines changed: 14 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,14 @@
1616
package com.arpnetworking.metrics.proxy.actors;
1717

1818
import akka.actor.ActorRef;
19-
import akka.actor.Cancellable;
2019
import akka.actor.PoisonPill;
2120
import akka.actor.Props;
2221
import akka.actor.UntypedActor;
23-
import akka.dispatch.ExecutionContexts;
2422
import akka.http.javadsl.model.ws.Message;
2523
import akka.http.javadsl.model.ws.TextMessage;
2624
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
2725
import com.arpnetworking.logback.annotations.LogValue;
28-
import com.arpnetworking.metrics.Metrics;
29-
import com.arpnetworking.metrics.MetricsFactory;
26+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
3027
import com.arpnetworking.metrics.proxy.models.messages.Command;
3128
import com.arpnetworking.metrics.proxy.models.messages.Connect;
3229
import com.arpnetworking.metrics.proxy.models.protocol.MessageProcessorsFactory;
@@ -38,10 +35,8 @@
3835
import com.fasterxml.jackson.databind.ObjectMapper;
3936
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
4037
import com.fasterxml.jackson.databind.node.ObjectNode;
41-
import scala.concurrent.duration.FiniteDuration;
4238

4339
import java.util.List;
44-
import java.util.concurrent.TimeUnit;
4540

4641
/**
4742
* Actor class to hold the state for a single connection.
@@ -52,37 +47,29 @@ public class Connection extends UntypedActor {
5247
/**
5348
* Public constructor.
5449
*
55-
* @param metricsFactory Instance of <code>MetricsFactory</code>.
50+
* @param metrics Instance of <code>PeriodicMetrics</code>.
5651
* @param processorsFactory Factory for producing the protocol's <code>MessagesProcessor</code>
5752
*/
5853
public Connection(
59-
final MetricsFactory metricsFactory,
54+
final PeriodicMetrics metrics,
6055
final MessageProcessorsFactory processorsFactory) {
61-
_metricsFactory = metricsFactory;
62-
_instrument = context().system().scheduler().schedule(
63-
new FiniteDuration(0, TimeUnit.SECONDS), // Initial delay
64-
new FiniteDuration(500, TimeUnit.MILLISECONDS), // Interval
65-
getSelf(),
66-
"instrument",
67-
ExecutionContexts.global(),
68-
getSelf());
69-
_messageProcessors = processorsFactory.create(this);
70-
_metrics = createMetrics();
56+
_metrics = metrics;
57+
_messageProcessors = processorsFactory.create(this, metrics);
7158
}
7259

7360
/**
7461
* Factory for creating a <code>Props</code> with strong typing.
7562
*
76-
* @param metricsFactory Instance of <code>MetricsFactory</code>.
63+
* @param metrics Instance of <code>PeriodicMetrics</code>.
7764
* @param messageProcessorsFactory Factory to create a <code>Metrics</code> object.
7865
* @return a new Props object to create a <code>ConnectionContext</code>.
7966
*/
8067
public static Props props(
81-
final MetricsFactory metricsFactory,
68+
final PeriodicMetrics metrics,
8269
final MessageProcessorsFactory messageProcessorsFactory) {
8370
return Props.create(
8471
Connection.class,
85-
metricsFactory,
72+
metrics,
8673
messageProcessorsFactory);
8774
}
8875

@@ -97,10 +84,7 @@ public void onReceive(final Object message) throws Exception {
9784
.addData("data", message)
9885
.addData("channel", _channel)
9986
.log();
100-
if ("instrument".equals(message)) {
101-
periodicInstrumentation();
102-
return;
103-
} else if (message instanceof Connect) {
87+
if (message instanceof Connect) {
10488
final Connect connect = (Connect) message;
10589
_telemetry = connect.getTelemetry();
10690
_channel = connect.getChannel();
@@ -142,9 +126,9 @@ public void onReceive(final Object message) throws Exception {
142126
}
143127
}
144128
if (!messageProcessed) {
145-
_metrics.incrementCounter(UNKNOWN_COUNTER);
129+
_metrics.recordCounter(UNKNOWN_COUNTER, 1);
146130
if (message instanceof Command) {
147-
_metrics.incrementCounter(UNKONOWN_COMMAND_COUNTER);
131+
_metrics.recordCounter(UNKNOWN_COMMAND_COUNTER, 1);
148132
LOGGER.warn()
149133
.setMessage("Unable to process message")
150134
.addData("reason", "unsupported command")
@@ -153,7 +137,7 @@ public void onReceive(final Object message) throws Exception {
153137
.log();
154138
unhandled(message);
155139
} else {
156-
_metrics.incrementCounter("Actors/Connection/UNKNOWN");
140+
_metrics.recordCounter("Actors/Connection/UNKNOWN", 1);
157141
LOGGER.warn()
158142
.setMessage("Unable to process message")
159143
.addData("reason", "unsupported message")
@@ -165,15 +149,6 @@ public void onReceive(final Object message) throws Exception {
165149
}
166150
}
167151

168-
/**
169-
* {@inheritDoc}
170-
*/
171-
@Override
172-
public void postStop() throws Exception {
173-
_instrument.cancel();
174-
super.postStop();
175-
}
176-
177152
/**
178153
* Sends a json object to the connected client.
179154
*
@@ -236,32 +211,14 @@ public String toString() {
236211
return toLogValue().toString();
237212
}
238213

239-
private Metrics createMetrics() {
240-
final Metrics metrics = _metricsFactory.create();
241-
metrics.resetCounter(UNKONOWN_COMMAND_COUNTER);
242-
metrics.resetCounter(UNKNOWN_COUNTER);
243-
for (final MessagesProcessor messageProcessor : _messageProcessors) {
244-
messageProcessor.initializeMetrics(metrics);
245-
}
246-
return metrics;
247-
}
248-
249-
private void periodicInstrumentation() {
250-
_metrics.close();
251-
_metrics = createMetrics();
252-
}
253-
254214
private ActorRef _telemetry;
255215
private ActorRef _channel;
256216

257-
private final MetricsFactory _metricsFactory;
258-
private final Cancellable _instrument;
217+
private final PeriodicMetrics _metrics;
259218
private final List<MessagesProcessor> _messageProcessors;
260219

261-
private Metrics _metrics;
262-
263220
private static final String METRICS_PREFIX = "actors/connection/";
264-
private static final String UNKONOWN_COMMAND_COUNTER = METRICS_PREFIX + "command/UNKNOWN";
221+
private static final String UNKNOWN_COMMAND_COUNTER = METRICS_PREFIX + "command/UNKNOWN";
265222
private static final String UNKNOWN_COUNTER = METRICS_PREFIX + "UNKNOWN";
266223

267224
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();

src/main/java/com/arpnetworking/metrics/proxy/models/protocol/MessageProcessorsFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.arpnetworking.metrics.proxy.models.protocol;
1818

19+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
1920
import com.arpnetworking.metrics.proxy.actors.Connection;
2021

2122
import java.util.List;
@@ -31,7 +32,8 @@ public interface MessageProcessorsFactory {
3132
* Create a list of <code>MessagesProcessor</code> that define a protocol.
3233
*
3334
* @param connection the connection context to use for message processing
35+
* @param metrics {@link PeriodicMetrics} instance to record metrics to
3436
* @return a list of <code>MessagesProcessor</code>
3537
*/
36-
List<MessagesProcessor> create(final Connection connection);
38+
List<MessagesProcessor> create(final Connection connection, final PeriodicMetrics metrics);
3739
}

src/main/java/com/arpnetworking/metrics/proxy/models/protocol/MessagesProcessor.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package com.arpnetworking.metrics.proxy.models.protocol;
1818

19-
import com.arpnetworking.metrics.Metrics;
20-
2119
/**
2220
* Defines simple interface for a message processor.
2321
*
@@ -31,11 +29,4 @@ public interface MessagesProcessor {
3129
* @return true if the message was processed, otherwise false
3230
*/
3331
boolean handleMessage(final Object message);
34-
35-
/**
36-
* Initializes Processor-owned periodic metrics counters to zero.
37-
*
38-
* @param metrics Metrics to reset counts in.
39-
*/
40-
void initializeMetrics(final Metrics metrics);
4132
}

0 commit comments

Comments
 (0)