Skip to content

Commit eff7641

Browse files
authored
Add start/stop/restart metrics to bucket aggregator actor. (#230)
1 parent 703f2e8 commit eff7641

File tree

6 files changed

+60
-3
lines changed

6 files changed

+60
-3
lines changed

src/main/java/com/arpnetworking/metrics/mad/Aggregator.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.arpnetworking.commons.observer.Observable;
2727
import com.arpnetworking.commons.observer.Observer;
2828
import com.arpnetworking.logback.annotations.LogValue;
29+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
2930
import com.arpnetworking.metrics.mad.model.Record;
3031
import com.arpnetworking.metrics.mad.model.statistics.Statistic;
3132
import com.arpnetworking.steno.LogValueMapFactory;
@@ -167,7 +168,7 @@ private List<ActorRef> createActors(final Key key) {
167168
.setDependentStatistics(_cachedDependentStatistics)
168169
.setPeriod(period)
169170
.setSink(_sink);
170-
periodWorkerList.add(_actorSystem.actorOf(Props.create(PeriodWorker.class, period, bucketBuilder)));
171+
periodWorkerList.add(_actorSystem.actorOf(Props.create(PeriodWorker.class, period, bucketBuilder, _periodicMetrics)));
171172
}
172173
LOGGER.debug()
173174
.setMessage("Created period worker actors")
@@ -187,6 +188,7 @@ private ImmutableSet<Statistic> computeDependentStatistics(final ImmutableSet<St
187188

188189
private Aggregator(final Builder builder) {
189190
_actorSystem = builder._actorSystem;
191+
_periodicMetrics = builder._periodicMetrics;
190192
_periods = ImmutableSet.copyOf(builder._periods);
191193
_sink = builder._sink;
192194
_specifiedCounterStatistics = ImmutableSet.copyOf(builder._counterStatistics);
@@ -235,6 +237,7 @@ public Optional<ImmutableSet<Statistic>> load(final String metric) throws Except
235237
}
236238

237239
private final ActorSystem _actorSystem;
240+
private final PeriodicMetrics _periodicMetrics;
238241
private final ImmutableSet<Duration> _periods;
239242
private final Sink _sink;
240243
private final ImmutableSet<Statistic> _specifiedTimerStatistics;
@@ -275,6 +278,17 @@ public Builder setActorSystem(final ActorSystem value) {
275278
return this;
276279
}
277280

281+
/**
282+
* Set the {@link PeriodicMetrics}. Cannot be null.
283+
*
284+
* @param value The {@link PeriodicMetrics}.
285+
* @return This {@link Builder} instance.
286+
*/
287+
public Builder setPeriodicMetrics(final PeriodicMetrics value) {
288+
_periodicMetrics = value;
289+
return this;
290+
}
291+
278292
/**
279293
* Set the sink. Cannot be null or empty.
280294
*
@@ -345,6 +359,8 @@ public Builder setStatistics(final Map<String, Set<Statistic>> value) {
345359
@NotNull
346360
private ActorSystem _actorSystem;
347361
@NotNull
362+
private PeriodicMetrics _periodicMetrics;
363+
@NotNull
348364
private Sink _sink;
349365
@NotNull
350366
private Set<Duration> _periods;

src/main/java/com/arpnetworking/metrics/mad/Main.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,8 @@ private PeriodicMetrics providePeriodicMetrics(
630630
.build();
631631
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
632632
r -> new Thread(r, "PeriodicMetricsCloser"));
633-
executor.scheduleAtFixedRate(periodicMetrics, 500, 500, TimeUnit.MILLISECONDS);
633+
final long offsetMillis = 1250 - (System.currentTimeMillis() % 1000);;
634+
executor.scheduleAtFixedRate(periodicMetrics, offsetMillis, 1000, TimeUnit.MILLISECONDS);
634635

635636
// Register to shutdown the executor when the Guice stack is shutdown.
636637
lifecycle.registerShutdown(() -> {

src/main/java/com/arpnetworking/metrics/mad/PeriodWorker.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import akka.actor.AbstractActor;
1919
import akka.actor.Cancellable;
2020
import com.arpnetworking.logback.annotations.LogValue;
21+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
2122
import com.arpnetworking.metrics.mad.model.Record;
2223
import com.arpnetworking.steno.LogValueMapFactory;
2324
import com.arpnetworking.steno.Logger;
@@ -46,9 +47,13 @@
4647
* Public constructor. Since this is an {@code Actor} this method should not be
4748
* called directly, but instead you should use {@code Props}.
4849
*/
49-
PeriodWorker(final Duration period, final Bucket.Builder bucketBuilder) {
50+
PeriodWorker(
51+
final Duration period,
52+
final Bucket.Builder bucketBuilder,
53+
final PeriodicMetrics periodicMetrics) {
5054
_period = period;
5155
_bucketBuilder = bucketBuilder;
56+
_periodicMetrics = periodicMetrics;
5257
_hasRotateScheduled = false;
5358
}
5459

@@ -60,6 +65,7 @@ public void preStart() {
6065
// creating a race condition. Note that period worker clean-up did not exist
6166
// in the previous implementation either (not that it shouldn't; just one
6267
// problem at a time).
68+
_periodicMetrics.recordCounter("actors/period_worker/started", 1);
6369
}
6470

6571
@Override
@@ -73,6 +79,12 @@ public void postStop() {
7379
.addData("cancelResult", cancelResult)
7480
.log();
7581
}
82+
_periodicMetrics.recordCounter("actors/period_worker/stopped", 1);
83+
}
84+
85+
@Override
86+
public void preRestart(final Throwable reason, final Optional<Object> message) throws Exception {
87+
_periodicMetrics.recordCounter("actors/period_worker/restarted", 1);
7688
}
7789

7890
@Override
@@ -259,6 +271,7 @@ private void processRecord(final Record record) {
259271
private final Bucket.Builder _bucketBuilder;
260272
private final NavigableMap<ZonedDateTime, Bucket> _bucketsByStart = new TreeMap<>();
261273
private final NavigableMap<ZonedDateTime, List<Bucket>> _bucketsByExpiration = new TreeMap<>();
274+
private final PeriodicMetrics _periodicMetrics;
262275

263276
private static final Logger LOGGER = LoggerFactory.getLogger(PeriodWorker.class);
264277
private static final Duration MINIMUM_ROTATION_CHECK_INTERVAL = Duration.ofMillis(100);

src/main/java/com/arpnetworking/metrics/mad/Pipeline.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public synchronized void launch() {
6666

6767
final Aggregator aggregator = new Aggregator.Builder()
6868
.setActorSystem(_pipelineConfiguration.getActorSystem())
69+
.setPeriodicMetrics(_pipelineConfiguration.getPeriodicMetrics())
6970
.setPeriods(_pipelineConfiguration.getPeriods())
7071
.setTimerStatistics(_pipelineConfiguration.getTimerStatistics())
7172
.setCounterStatistics(_pipelineConfiguration.getCounterStatistics())

src/main/java/com/arpnetworking/metrics/mad/configuration/PipelineConfiguration.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.arpnetworking.logback.annotations.Loggable;
2222
import com.arpnetworking.metrics.common.kafka.ConsumerDeserializer;
2323
import com.arpnetworking.metrics.common.sources.Source;
24+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
2425
import com.arpnetworking.metrics.mad.model.statistics.Statistic;
2526
import com.arpnetworking.metrics.mad.model.statistics.StatisticDeserializer;
2627
import com.arpnetworking.metrics.mad.model.statistics.StatisticFactory;
@@ -86,6 +87,10 @@ public ActorSystem getActorSystem() {
8687
return _actorSystem;
8788
}
8889

90+
public PeriodicMetrics getPeriodicMetrics() {
91+
return _periodicMetrics;
92+
}
93+
8994
public String getName() {
9095
return _name;
9196
}
@@ -123,6 +128,7 @@ public String toString() {
123128
return MoreObjects.toStringHelper(this)
124129
.add("id", Integer.toHexString(System.identityHashCode(this)))
125130
.add("ActorSystem", _actorSystem)
131+
.add("PeriodicMetrics", _periodicMetrics)
126132
.add("Name", _name)
127133
.add("Sources", _sources)
128134
.add("Sinks", _sinks)
@@ -135,6 +141,7 @@ public String toString() {
135141

136142
private PipelineConfiguration(final Builder builder) {
137143
_actorSystem = builder._actorSystem;
144+
_periodicMetrics = builder._periodicMetrics;
138145
_name = builder._name;
139146
_sources = ImmutableList.copyOf(builder._sources);
140147
_sinks = ImmutableList.copyOf(builder._sinks);
@@ -146,6 +153,7 @@ private PipelineConfiguration(final Builder builder) {
146153
}
147154

148155
private final ActorSystem _actorSystem;
156+
private final PeriodicMetrics _periodicMetrics;
149157
private final String _name;
150158
private final ImmutableList<Source> _sources;
151159
private final ImmutableList<Sink> _sinks;
@@ -183,6 +191,17 @@ public Builder setActorSystem(final ActorSystem value) {
183191
return this;
184192
}
185193

194+
/**
195+
* The {@link PeriodicMetrics}. Cannot be null.
196+
*
197+
* @param value The {@link PeriodicMetrics}.
198+
* @return This instance of {@link Builder}.
199+
*/
200+
public Builder setPeriodicMetrics(final PeriodicMetrics value) {
201+
_periodicMetrics = value;
202+
return this;
203+
}
204+
186205
/**
187206
* The name of the pipeline. Cannot be null or empty.
188207
*
@@ -280,6 +299,9 @@ public Builder setStatistics(final Map<String, Set<Statistic>> value) {
280299
@JacksonInject
281300
private ActorSystem _actorSystem;
282301
@NotNull
302+
@JacksonInject
303+
private PeriodicMetrics _periodicMetrics;
304+
@NotNull
283305
@NotEmpty
284306
private String _name;
285307
@NotNull

src/test/java/com/arpnetworking/metrics/mad/AggregatorTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import akka.actor.ActorSystem;
1919
import com.arpnetworking.commons.observer.Observable;
2020
import com.arpnetworking.commons.observer.Observer;
21+
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
2122
import com.arpnetworking.metrics.mad.model.AggregatedData;
2223
import com.arpnetworking.metrics.mad.model.DefaultMetric;
2324
import com.arpnetworking.metrics.mad.model.DefaultQuantity;
@@ -68,6 +69,7 @@ public void setUp() {
6869
_actorSystem = ActorSystem.create();
6970
_aggregator = new Aggregator.Builder()
7071
.setActorSystem(_actorSystem)
72+
.setPeriodicMetrics(_periodicMetrics)
7173
.setSink(_sink)
7274
.setCounterStatistics(Collections.singleton(MAX_STATISTIC))
7375
.setTimerStatistics(Collections.singleton(MAX_STATISTIC))
@@ -404,6 +406,8 @@ private List<AggregatedData> getCapturedData(
404406
private ArgumentCaptor<PeriodicData> _periodicDataCaptor;
405407
@Mock
406408
private Sink _sink;
409+
@Mock
410+
private PeriodicMetrics _periodicMetrics;
407411

408412
private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory();
409413
private static final Statistic MAX_STATISTIC = STATISTIC_FACTORY.getStatistic("max");

0 commit comments

Comments
 (0)