|
24 | 24 | import akka.cluster.Cluster;
|
25 | 25 | import akka.cluster.ClusterEvent;
|
26 | 26 | import com.arpnetworking.clusteraggregator.models.ShardAllocation;
|
| 27 | +import com.arpnetworking.metrics.Metrics; |
| 28 | +import com.arpnetworking.metrics.MetricsFactory; |
27 | 29 | import com.arpnetworking.utility.ParallelLeastShardAllocationStrategy;
|
28 | 30 | import com.google.common.base.Optional;
|
29 | 31 | import com.google.common.collect.ArrayListMultimap;
|
|
48 | 50 | * @author Brandon Arp (barp at groupon dot com)
|
49 | 51 | */
|
50 | 52 | public class ClusterStatusCache extends UntypedActor {
|
| 53 | + |
51 | 54 | /**
|
52 | 55 | * Creates a {@link akka.actor.Props} for use in Akka.
|
53 | 56 | *
|
54 | 57 | * @param cluster The cluster to reference.
|
| 58 | + * @param metricsFactory A <code>MetricsFactory</code> to use for metrics creation. |
55 | 59 | * @return A new {@link akka.actor.Props}
|
56 | 60 | */
|
57 |
| - public static Props props(final Cluster cluster) { |
58 |
| - return Props.create(ClusterStatusCache.class, cluster); |
| 61 | + public static Props props(final Cluster cluster, final MetricsFactory metricsFactory) { |
| 62 | + return Props.create(ClusterStatusCache.class, cluster, metricsFactory); |
59 | 63 | }
|
60 | 64 |
|
61 | 65 | /**
|
62 | 66 | * Public constructor.
|
63 | 67 | *
|
64 | 68 | * @param cluster {@link akka.cluster.Cluster} whose state is cached
|
| 69 | + * @param metricsFactory A <code>MetricsFactory</code> to use for metrics creation. |
65 | 70 | */
|
66 |
| - public ClusterStatusCache(final Cluster cluster) { |
| 71 | + public ClusterStatusCache(final Cluster cluster, final MetricsFactory metricsFactory) { |
67 | 72 | _cluster = cluster;
|
| 73 | + _metricsFactory = metricsFactory; |
68 | 74 | }
|
69 | 75 |
|
70 | 76 | /**
|
@@ -102,7 +108,17 @@ public void onReceive(final Object message) throws Exception {
|
102 | 108 | if (!getSender().equals(getSelf())) {
|
103 | 109 | // Public messages
|
104 | 110 | if (message instanceof ClusterEvent.CurrentClusterState) {
|
105 |
| - _clusterState = Optional.of((ClusterEvent.CurrentClusterState) message); |
| 111 | + final ClusterEvent.CurrentClusterState clusterState = (ClusterEvent.CurrentClusterState) message; |
| 112 | + _clusterState = Optional.of(clusterState); |
| 113 | + |
| 114 | + try (final Metrics metrics = _metricsFactory.create()) { |
| 115 | + metrics.setGauge("akka/members_count", clusterState.members().size()); |
| 116 | + if (_cluster.selfAddress().equals(clusterState.getLeader())) { |
| 117 | + metrics.setGauge("akka/is_leader", 1); |
| 118 | + } else { |
| 119 | + metrics.setGauge("akka/is_leader", 0); |
| 120 | + } |
| 121 | + } |
106 | 122 | } else if (message instanceof GetRequest) {
|
107 | 123 | sendResponse(getSender());
|
108 | 124 | } else if (message instanceof ParallelLeastShardAllocationStrategy.RebalanceNotification) {
|
@@ -137,6 +153,7 @@ private static String hostFromActorRef(final ActorRef shardRegion) {
|
137 | 153 | }
|
138 | 154 |
|
139 | 155 | private final Cluster _cluster;
|
| 156 | + private final MetricsFactory _metricsFactory; |
140 | 157 | private Optional<ClusterEvent.CurrentClusterState> _clusterState = Optional.absent();
|
141 | 158 | @Nullable
|
142 | 159 | private Cancellable _pollTimer;
|
|
0 commit comments