Skip to content

Commit 3f927d8

Browse files
authored
Add akka shard metrics. (#142)
* Add akka shard metrics. * Addressed code review feedback.
1 parent 745dbfc commit 3f927d8

File tree

4 files changed

+92
-10
lines changed

4 files changed

+92
-10
lines changed

src/main/java/com/arpnetworking/clusteraggregator/ClusterStatusCache.java

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,29 @@
1818

1919
import akka.actor.AbstractActor;
2020
import akka.actor.ActorRef;
21+
import akka.actor.ActorSystem;
22+
import akka.actor.Address;
2123
import akka.actor.Cancellable;
2224
import akka.actor.Props;
2325
import akka.actor.Scheduler;
2426
import akka.cluster.Cluster;
2527
import akka.cluster.ClusterEvent;
28+
import akka.cluster.sharding.ClusterSharding;
29+
import akka.cluster.sharding.ShardRegion;
2630
import com.arpnetworking.clusteraggregator.models.ShardAllocation;
2731
import com.arpnetworking.metrics.Metrics;
2832
import com.arpnetworking.metrics.MetricsFactory;
33+
import com.arpnetworking.steno.Logger;
34+
import com.arpnetworking.steno.LoggerFactory;
2935
import com.arpnetworking.utility.ParallelLeastShardAllocationStrategy;
3036
import com.google.common.collect.ArrayListMultimap;
37+
import com.google.common.collect.Maps;
3138
import com.google.common.collect.Multimaps;
3239
import com.google.common.collect.Sets;
3340
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
3441
import scala.compat.java8.OptionConverters;
3542
import scala.concurrent.duration.Duration;
43+
import scala.concurrent.duration.FiniteDuration;
3644

3745
import java.io.Serializable;
3846
import java.util.Collection;
@@ -55,22 +63,33 @@ public class ClusterStatusCache extends AbstractActor {
5563
/**
5664
* Creates a {@link akka.actor.Props} for use in Akka.
5765
*
58-
* @param cluster The cluster to reference.
66+
*
67+
* @param system The Akka {@link ActorSystem}.
68+
* @param pollInterval The {@link java.time.Duration} for polling state.
5969
* @param metricsFactory A {@link MetricsFactory} to use for metrics creation.
6070
* @return A new {@link akka.actor.Props}
6171
*/
62-
public static Props props(final Cluster cluster, final MetricsFactory metricsFactory) {
63-
return Props.create(ClusterStatusCache.class, cluster, metricsFactory);
72+
public static Props props(
73+
final ActorSystem system,
74+
final java.time.Duration pollInterval,
75+
final MetricsFactory metricsFactory) {
76+
return Props.create(ClusterStatusCache.class, system, pollInterval, metricsFactory);
6477
}
6578

6679
/**
6780
* Public constructor.
6881
*
69-
* @param cluster {@link akka.cluster.Cluster} whose state is cached
82+
* @param system The Akka {@link ActorSystem}.
83+
* @param pollInterval The {@link java.time.Duration} for polling state.
7084
* @param metricsFactory A {@link MetricsFactory} to use for metrics creation.
7185
*/
72-
public ClusterStatusCache(final Cluster cluster, final MetricsFactory metricsFactory) {
73-
_cluster = cluster;
86+
public ClusterStatusCache(
87+
final ActorSystem system,
88+
final java.time.Duration pollInterval,
89+
final MetricsFactory metricsFactory) {
90+
_cluster = Cluster.get(system);
91+
_sharding = ClusterSharding.get(system);
92+
_pollInterval = pollInterval;
7493
_metricsFactory = metricsFactory;
7594
}
7695

@@ -81,7 +100,7 @@ public void preStart() {
81100
.scheduler();
82101
_pollTimer = scheduler.schedule(
83102
Duration.apply(0, TimeUnit.SECONDS),
84-
Duration.apply(10, TimeUnit.SECONDS),
103+
Duration.apply(_pollInterval.toMillis(), TimeUnit.MILLISECONDS),
85104
getSelf(),
86105
POLL,
87106
getContext().system().dispatcher(),
@@ -109,13 +128,50 @@ public Receive createReceive() {
109128
}
110129
}
111130
})
131+
.match(ShardRegion.ClusterShardingStats.class, shardingStats -> {
132+
LOGGER.debug()
133+
.setMessage("Received shard statistics")
134+
.addData("regionCount", shardingStats.getRegions().size())
135+
.log();
136+
final Map<String, Integer> shardsPerAddress = Maps.newHashMap();
137+
final Map<String, Long> actorsPerAddress = Maps.newHashMap();
138+
for (final Map.Entry<Address, ShardRegion.ShardRegionStats> entry : shardingStats.getRegions().entrySet()) {
139+
final String address = entry.getKey().hostPort();
140+
shardsPerAddress.put(address, entry.getValue().getStats().size());
141+
for (final Object stat : entry.getValue().getStats().values()) {
142+
if (stat instanceof Number) {
143+
final long currentActorCount = actorsPerAddress.getOrDefault(address, 0L);
144+
actorsPerAddress.put(
145+
address,
146+
((Number) stat).longValue() + currentActorCount);
147+
}
148+
}
149+
}
150+
for (final Map.Entry<String, Integer> entry : shardsPerAddress.entrySet()) {
151+
try (Metrics metrics = _metricsFactory.create()) {
152+
final Long actorCount = actorsPerAddress.get(entry.getKey());
153+
metrics.addAnnotation("address", entry.getKey());
154+
metrics.setGauge("akka/cluster/shards", entry.getValue());
155+
metrics.setGauge("akka/cluster/actors", actorCount);
156+
}
157+
}
158+
})
112159
.match(GetRequest.class, message -> sendResponse(getSender()))
113160
.match(ParallelLeastShardAllocationStrategy.RebalanceNotification.class, rebalanceNotification -> {
114161
_rebalanceState = Optional.of(rebalanceNotification);
115162
})
116163
.matchEquals(POLL, message -> {
117164
if (self().equals(sender())) {
118165
_cluster.sendCurrentClusterState(getSelf());
166+
for (final String shardTypeName : _sharding.getShardTypeNames()) {
167+
LOGGER.debug()
168+
.setMessage("Requesting shard statistics")
169+
.addData("shardType", shardTypeName)
170+
.log();
171+
_sharding.shardRegion(shardTypeName).tell(
172+
new ShardRegion.GetClusterShardingStats(FiniteDuration.fromNanos(_pollInterval.toNanos())),
173+
self());
174+
}
119175
} else {
120176
unhandled(message);
121177
}
@@ -131,7 +187,6 @@ private void sendResponse(final ActorRef sender) {
131187
}
132188

133189
private static String hostFromActorRef(final ActorRef shardRegion) {
134-
135190
return OptionConverters.toJava(
136191
shardRegion.path()
137192
.address()
@@ -140,13 +195,16 @@ private static String hostFromActorRef(final ActorRef shardRegion) {
140195
}
141196

142197
private final Cluster _cluster;
198+
private final ClusterSharding _sharding;
199+
private final java.time.Duration _pollInterval;
143200
private final MetricsFactory _metricsFactory;
144201
private Optional<ClusterEvent.CurrentClusterState> _clusterState = Optional.empty();
145202
@Nullable
146203
private Cancellable _pollTimer;
147204
private Optional<ParallelLeastShardAllocationStrategy.RebalanceNotification> _rebalanceState = Optional.empty();
148205

149206
private static final String POLL = "poll";
207+
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterStatusCache.class);
150208

151209
/**
152210
* Request to get a cluster status.

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,12 @@ private ActorRef provideStatusCache(
245245
@Named("periodic-statistics") final ActorRef periodicStats,
246246
final MetricsFactory metricsFactory) {
247247
final Cluster cluster = Cluster.get(system);
248-
final ActorRef clusterStatusCache = system.actorOf(ClusterStatusCache.props(cluster, metricsFactory), "cluster-status");
248+
final ActorRef clusterStatusCache = system.actorOf(
249+
ClusterStatusCache.props(
250+
system,
251+
_configuration.getClusterStatusInterval(),
252+
metricsFactory),
253+
"cluster-status");
249254
return system.actorOf(Status.props(cluster, clusterStatusCache, periodicStats), "status");
250255
}
251256

src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ public boolean getCalculateClusterAggregations() {
162162
return _calculateClusterAggregations;
163163
}
164164

165+
public Duration getClusterStatusInterval() {
166+
return _clusterStatusInterval;
167+
}
168+
165169
@Override
166170
public String toString() {
167171
return MoreObjects.toStringHelper(this)
@@ -223,6 +227,7 @@ private ClusterAggregatorConfiguration(final Builder builder) {
223227
_clusterHostSuffix = builder._clusterHostSuffix;
224228
_calculateClusterAggregations = builder._calculateClusterAggregations;
225229
_databaseConfigurations = Maps.newHashMap(builder._databaseConfigurations);
230+
_clusterStatusInterval = builder._clusterStatusInterval;
226231
}
227232

228233
private final String _monitoringCluster;
@@ -252,6 +257,7 @@ private ClusterAggregatorConfiguration(final Builder builder) {
252257
private final String _clusterHostSuffix;
253258
private final boolean _calculateClusterAggregations;
254259
private final Map<String, DatabaseConfiguration> _databaseConfigurations;
260+
private final Duration _clusterStatusInterval;
255261

256262
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
257263

@@ -614,6 +620,17 @@ public boolean validateAggregatorLivelinessTimeout(final Duration aggregatorLive
614620
return aggregatorLivelinessTimeout.compareTo(_reaggregationTimeout) > 0;
615621
}
616622

623+
/**
624+
* Interval for polling cluster status. Optional. Defaults to 10 seconds.
625+
*
626+
* @param value interval for polling cluster status.
627+
* @return This instance of {@link Builder}.
628+
*/
629+
public Builder setClusterStatusInterval(final Duration value) {
630+
_clusterStatusInterval = value;
631+
return this;
632+
}
633+
617634
@NotNull
618635
@NotEmpty
619636
private String _monitoringCluster;
@@ -679,5 +696,7 @@ public boolean validateAggregatorLivelinessTimeout(final Duration aggregatorLive
679696
private Boolean _calculateClusterAggregations = true;
680697
@NotNull
681698
private Map<String, DatabaseConfiguration> _databaseConfigurations = Maps.newHashMap();
699+
@NotNull
700+
private Duration _clusterStatusInterval = Duration.ofSeconds(10);
682701
}
683702
}

src/test/java/com/arpnetworking/tsdcore/sinks/KairosDbSinkTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public void testPostFailure() throws InterruptedException, IOException {
133133
_kairosDbSinkBuilder.setMaximumAttempts(2).setBaseBackoff(Duration.ofMillis(1)).build()
134134
.recordAggregateData(createPeriodicData(10L));
135135

136-
Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(
136+
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAsserted(
137137
() -> _wireMock.verifyThat(2, WireMock.postRequestedFor(WireMock.urlEqualTo(PATH)))
138138
);
139139

0 commit comments

Comments
 (0)