Skip to content

Commit 60d5a9e

Browse files
author
Matthew Hayter
committed
Add a metric for the Akka cluster members count and is_leader.
1 parent c53b7e4 commit 60d5a9e

File tree

2 files changed

+23
-6
lines changed

2 files changed

+23
-6
lines changed

src/main/java/com/arpnetworking/clusteraggregator/ClusterStatusCache.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import akka.cluster.Cluster;
2525
import akka.cluster.ClusterEvent;
2626
import com.arpnetworking.clusteraggregator.models.ShardAllocation;
27+
import com.arpnetworking.metrics.Metrics;
28+
import com.arpnetworking.metrics.MetricsFactory;
2729
import com.arpnetworking.utility.ParallelLeastShardAllocationStrategy;
2830
import com.google.common.base.Optional;
2931
import com.google.common.collect.ArrayListMultimap;
@@ -48,23 +50,27 @@
4850
* @author Brandon Arp (barp at groupon dot com)
4951
*/
5052
public class ClusterStatusCache extends UntypedActor {
53+
5154
/**
5255
* Creates a {@link akka.actor.Props} for use in Akka.
5356
*
5457
* @param cluster The cluster to reference.
58+
* @param metricsFactory A <code>MetricsFactory</code> to use for metrics creation.
5559
* @return A new {@link akka.actor.Props}
5660
*/
57-
public static Props props(final Cluster cluster) {
58-
return Props.create(ClusterStatusCache.class, cluster);
61+
public static Props props(final Cluster cluster, final MetricsFactory metricsFactory) {
62+
return Props.create(ClusterStatusCache.class, cluster, metricsFactory);
5963
}
6064

6165
/**
6266
* Public constructor.
6367
*
6468
* @param cluster {@link akka.cluster.Cluster} whose state is cached
69+
* @param metricsFactory A <code>MetricsFactory</code> to use for metrics creation.
6570
*/
66-
public ClusterStatusCache(final Cluster cluster) {
71+
public ClusterStatusCache(final Cluster cluster, final MetricsFactory metricsFactory) {
6772
_cluster = cluster;
73+
_metricsFactory = metricsFactory;
6874
}
6975

7076
/**
@@ -102,7 +108,16 @@ public void onReceive(final Object message) throws Exception {
102108
if (!getSender().equals(getSelf())) {
103109
// Public messages
104110
if (message instanceof ClusterEvent.CurrentClusterState) {
105-
_clusterState = Optional.of((ClusterEvent.CurrentClusterState) message);
111+
final ClusterEvent.CurrentClusterState clusterState = (ClusterEvent.CurrentClusterState) message;
112+
_clusterState = Optional.of(clusterState);
113+
114+
final Metrics metrics = _metricsFactory.create();
115+
metrics.setGauge("akka/members_count", clusterState.members().size());
116+
if (_cluster.selfAddress().equals(clusterState.getLeader())) {
117+
metrics.setGauge("akka/is_leader", 1);
118+
} else {
119+
metrics.setGauge("akka/is_leader", 0);
120+
}
106121
} else if (message instanceof GetRequest) {
107122
sendResponse(getSender());
108123
} else if (message instanceof ParallelLeastShardAllocationStrategy.RebalanceNotification) {
@@ -137,6 +152,7 @@ private static String hostFromActorRef(final ActorRef shardRegion) {
137152
}
138153

139154
private final Cluster _cluster;
155+
private final MetricsFactory _metricsFactory;
140156
private Optional<ClusterEvent.CurrentClusterState> _clusterState = Optional.absent();
141157
@Nullable
142158
private Cancellable _pollTimer;

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,10 @@ private ActorRef provideBookkeeperProxy(final ActorSystem system) {
211211
private ActorRef provideStatusCache(
212212
final ActorSystem system,
213213
@Named("bookkeeper-proxy") final ActorRef bookkeeperProxy,
214-
@Named("periodic-statistics") final ActorRef periodicStats) {
214+
@Named("periodic-statistics") final ActorRef periodicStats,
215+
final MetricsFactory metricsFactory) {
215216
final Cluster cluster = Cluster.get(system);
216-
final ActorRef clusterStatusCache = system.actorOf(ClusterStatusCache.props(cluster), "cluster-status");
217+
final ActorRef clusterStatusCache = system.actorOf(ClusterStatusCache.props(cluster, metricsFactory), "cluster-status");
217218
return system.actorOf(Status.props(bookkeeperProxy, cluster, clusterStatusCache, periodicStats), "status");
218219
}
219220

0 commit comments

Comments
 (0)