Skip to content

Commit 8f47278

Browse files
Emit a metric of samples received by the aggregator. (#236)
Co-authored-by: Brandon Arp <[email protected]>
1 parent d7e6b7a commit 8f47278

File tree

1 file changed

+31
-0
lines changed

1 file changed

+31
-0
lines changed

src/main/java/com/arpnetworking/metrics/mad/Aggregator.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,19 @@
2929
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
3030
import com.arpnetworking.metrics.mad.model.Record;
3131
import com.arpnetworking.metrics.mad.model.statistics.Statistic;
32+
import com.arpnetworking.metrics.mad.model.statistics.StatisticFactory;
3233
import com.arpnetworking.steno.LogValueMapFactory;
3334
import com.arpnetworking.steno.Logger;
3435
import com.arpnetworking.steno.LoggerFactory;
36+
import com.arpnetworking.tsdcore.model.CalculatedValue;
3537
import com.arpnetworking.tsdcore.model.DefaultKey;
3638
import com.arpnetworking.tsdcore.model.Key;
3739
import com.arpnetworking.tsdcore.sinks.Sink;
3840
import com.arpnetworking.utility.Launchable;
3941
import com.google.common.cache.CacheBuilder;
4042
import com.google.common.cache.CacheLoader;
4143
import com.google.common.cache.LoadingCache;
44+
import com.google.common.collect.ImmutableList;
4245
import com.google.common.collect.ImmutableMap;
4346
import com.google.common.collect.ImmutableSet;
4447
import com.google.common.collect.Lists;
@@ -122,6 +125,33 @@ public void notify(final Observable observable, final Object event) {
122125

123126
final Record record = (Record) event;
124127
final Key key = new DefaultKey(record.getDimensions());
128+
129+
_periodicMetrics.recordGauge(
130+
"aggregator/samples",
131+
record.getMetrics().values().stream()
132+
.map(m -> {
133+
final ImmutableList<CalculatedValue<?>> countStatistic = m.getStatistics().get(
134+
STATISTIC_FACTORY.getStatistic("count"));
135+
if (countStatistic != null) {
136+
return countStatistic.stream()
137+
.map(s -> s.getValue().getValue())
138+
.reduce(Double::sum)
139+
.orElse(0.0d)
140+
+
141+
m.getValues().size();
142+
}
143+
if (!m.getStatistics().isEmpty()) {
144+
LOGGER.warn()
145+
.setMessage("Received record with statistics but with no count")
146+
.addData("record", record)
147+
.addData("key", key)
148+
.log();
149+
}
150+
return (double) m.getValues().size();
151+
})
152+
.reduce(Double::sum)
153+
.orElse(0.0d));
154+
125155
LOGGER.trace()
126156
.setMessage("Sending record to aggregation actor")
127157
.addData("record", record)
@@ -251,6 +281,7 @@ public Optional<ImmutableSet<Statistic>> load(final String metric) throws Except
251281
private final LoadingCache<String, Optional<ImmutableSet<Statistic>>> _cachedDependentStatistics;
252282
private final Map<Key, List<ActorRef>> _periodWorkerActors = Maps.newConcurrentMap();
253283

284+
private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory();
254285
private static final Timeout SHUTDOWN_TIMEOUT = Timeout.apply(1, TimeUnit.SECONDS);
255286
private static final Logger LOGGER = LoggerFactory.getLogger(Aggregator.class);
256287

0 commit comments

Comments
 (0)