Skip to content

Commit bbcee86

Browse files
committed
clean up status actor
1 parent 1db7922 commit bbcee86

File tree

6 files changed

+58
-193
lines changed

6 files changed

+58
-193
lines changed

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -351,14 +351,6 @@ private ActorRef provideGracefulShutdownActor(final ActorSystem system, final In
351351
return system.actorOf(GuiceActorCreator.props(injector, GracefulShutdownActor.class), "graceful-shutdown");
352352
}
353353

354-
// @Provides
355-
// @Singleton
356-
// @Named("cluster-joiner")
357-
// @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
358-
// private ActorRef provideClusterJoiner(final ActorSystem system, final Injector injector) {
359-
// return system.actorOf(GuiceActorCreator.props(injector, NonJoiningClusterJoiner.class), "cluster-joiner");
360-
// }
361-
362354
@Provides
363355
@Named("cluster-host-suffix")
364356
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice

src/main/java/com/arpnetworking/clusteraggregator/Main.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,19 @@
1717

1818
import akka.actor.ActorRef;
1919
import akka.actor.ActorSystem;
20-
import akka.actor.Props;
2120
import ch.qos.logback.classic.LoggerContext;
2221
import com.arpnetworking.clusteraggregator.configuration.ClusterAggregatorConfiguration;
2322
import com.arpnetworking.commons.builder.Builder;
2423
import com.arpnetworking.configuration.jackson.DynamicConfiguration;
2524
import com.arpnetworking.configuration.jackson.HoconFileSource;
2625
import com.arpnetworking.configuration.jackson.JsonNodeFileSource;
2726
import com.arpnetworking.configuration.jackson.JsonNodeSource;
28-
import com.arpnetworking.configuration.jackson.akka.ActorBuilderDeserializer;
2927
import com.arpnetworking.configuration.triggers.FileTrigger;
3028
import com.arpnetworking.steno.Logger;
3129
import com.arpnetworking.utility.Configurator;
3230
import com.arpnetworking.utility.Database;
3331
import com.arpnetworking.utility.Launchable;
3432
import com.fasterxml.jackson.databind.ObjectMapper;
35-
import com.fasterxml.jackson.databind.module.SimpleModule;
3633
import com.google.common.collect.Lists;
3734
import com.google.inject.Guice;
3835
import com.google.inject.Injector;
@@ -100,9 +97,6 @@ public static void main(final String[] args) {
10097
final File configurationFile = new File(args[0]);
10198
configurator = Optional.of(new Configurator<>(Main::new, ClusterAggregatorConfiguration.class));
10299
final ObjectMapper objectMapper = ClusterAggregatorConfiguration.createObjectMapper();
103-
final SimpleModule module = new SimpleModule();
104-
module.addDeserializer(Props.class, new ActorBuilderDeserializer(objectMapper));
105-
objectMapper.registerModule(module);
106100
configuration = Optional.of(new DynamicConfiguration.Builder()
107101
.setObjectMapper(objectMapper)
108102
.addSourceBuilder(getFileSourceBuilder(configurationFile, objectMapper))

src/main/java/com/arpnetworking/clusteraggregator/Status.java

Lines changed: 48 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,22 @@
2121
import akka.actor.UntypedActor;
2222
import akka.cluster.Cluster;
2323
import akka.cluster.MemberStatus;
24-
import akka.dispatch.OnComplete;
25-
import akka.dispatch.Recover;
26-
import akka.pattern.Patterns;
24+
import akka.pattern.PatternsCS;
2725
import akka.remote.AssociationErrorEvent;
2826
import akka.util.Timeout;
2927
import com.arpnetworking.clusteraggregator.models.BookkeeperData;
3028
import com.arpnetworking.clusteraggregator.models.MetricsRequest;
3129
import com.arpnetworking.clusteraggregator.models.PeriodMetrics;
3230
import com.arpnetworking.clusteraggregator.models.StatusResponse;
3331
import com.arpnetworking.utility.CastMapper;
34-
import com.arpnetworking.utility.CollectFutureBuilder;
3532
import org.joda.time.Period;
36-
import scala.concurrent.ExecutionContextExecutor;
37-
import scala.concurrent.Future;
38-
import scala.runtime.AbstractFunction0;
3933
import scala.util.Failure;
4034

4135
import java.util.Map;
36+
import java.util.concurrent.CompletableFuture;
37+
import java.util.concurrent.CompletionStage;
4238
import java.util.concurrent.TimeUnit;
39+
import java.util.function.Function;
4340

4441
/**
4542
* Periodically polls the cluster status and caches the result.
@@ -108,80 +105,69 @@ public void onReceive(final Object message) throws Exception {
108105
_quarantined = true;
109106
}
110107
} else if (message instanceof HealthRequest) {
111-
final ExecutionContextExecutor executor = getContext().dispatcher();
112-
final Future<ClusterStatusCache.StatusResponse> stateFuture = Patterns
108+
final CompletionStage<ClusterStatusCache.StatusResponse> stateFuture = PatternsCS
113109
.ask(
114110
_clusterStatusCache,
115111
new ClusterStatusCache.GetRequest(),
116112
Timeout.apply(3, TimeUnit.SECONDS))
117-
.map(CAST_MAPPER, executor);
118-
stateFuture.onComplete(
119-
new OnComplete<ClusterStatusCache.StatusResponse>() {
120-
@Override
121-
public void onComplete(final Throwable failure, final ClusterStatusCache.StatusResponse success) {
122-
final boolean healthy = _cluster.readView().self().status() == MemberStatus.up() && !_quarantined;
123-
sender.tell(healthy, getSelf());
124-
}
125-
},
126-
executor);
113+
.thenApply(CAST_MAPPER);
114+
stateFuture.whenComplete(
115+
(statusResponse, throwable) -> {
116+
final boolean healthy = _cluster.readView().self().status() == MemberStatus.up() && !_quarantined;
117+
sender.tell(healthy, getSelf());
118+
});
127119
} else {
128120
unhandled(message);
129121
}
130122
}
131123

132124
private void processStatusRequest(final ActorRef sender) {
133-
final ExecutionContextExecutor executor = getContext().dispatcher();
134125
// Call the bookkeeper
135-
final Future<BookkeeperData> bookkeeperFuture = Patterns.ask(
126+
final CompletableFuture<BookkeeperData> bookkeeperFuture = PatternsCS.ask(
136127
_metricsBookkeeper,
137128
new MetricsRequest(),
138129
Timeout.apply(3, TimeUnit.SECONDS))
139-
.map(new CastMapper<>(), executor)
140-
.recover(new AsNullRecovery<>(), executor);
141-
final Future<ClusterStatusCache.StatusResponse> clusterStateFuture =
142-
Patterns.ask(
130+
.thenApply(new CastMapper<BookkeeperData>())
131+
.exceptionally(new AsNullRecovery<>())
132+
.toCompletableFuture();
133+
134+
final CompletableFuture<ClusterStatusCache.StatusResponse> clusterStateFuture =
135+
PatternsCS.ask(
143136
_clusterStatusCache,
144137
new ClusterStatusCache.GetRequest(),
145138
Timeout.apply(3, TimeUnit.SECONDS))
146-
.map(CAST_MAPPER, executor)
147-
.recover(new AsNullRecovery<>(), executor);
139+
.thenApply(CAST_MAPPER)
140+
.exceptionally(new AsNullRecovery<>())
141+
.toCompletableFuture();
148142

149-
final Future<Map<Period, PeriodMetrics>> localMetricsFuture =
150-
Patterns.ask(
143+
final CompletableFuture<Map<Period, PeriodMetrics>> localMetricsFuture =
144+
PatternsCS.ask(
151145
_localMetrics,
152146
new MetricsRequest(),
153147
Timeout.apply(3, TimeUnit.SECONDS))
154-
.map(new CastMapper<>(), executor)
155-
.recover(new AsNullRecovery<>(), executor);
156-
157-
final Future<StatusResponse> future = new CollectFutureBuilder<StatusResponse>()
158-
.addFuture(bookkeeperFuture)
159-
.addFuture(clusterStateFuture)
160-
.addFuture(localMetricsFuture)
161-
.map(new AbstractFunction0<StatusResponse>() {
162-
@Override
163-
public StatusResponse apply() {
164-
return new StatusResponse.Builder()
165-
.setClusterMetrics(bookkeeperFuture.value().get().get())
166-
.setClusterState(clusterStateFuture.value().get().get())
167-
.setLocalMetrics(localMetricsFuture.value().get().get())
148+
.thenApply(new CastMapper<Map<Period, PeriodMetrics>>())
149+
.exceptionally(new AsNullRecovery<>())
150+
.toCompletableFuture();
151+
152+
CompletableFuture.allOf(
153+
bookkeeperFuture,
154+
clusterStateFuture,
155+
localMetricsFuture)
156+
.thenApply(
157+
(v) -> new StatusResponse.Builder()
158+
.setClusterMetrics(bookkeeperFuture.getNow(null))
159+
.setClusterState(clusterStateFuture.getNow(null))
160+
.setLocalMetrics(localMetricsFuture.getNow(null))
168161
.setLocalAddress(_cluster.selfAddress())
169-
.build();
170-
}
171-
})
172-
.build(executor);
173-
future.onComplete(
174-
new OnComplete<StatusResponse>() {
175-
@Override
176-
public void onComplete(final Throwable failure, final StatusResponse success) {
177-
if (failure != null) {
178-
sender.tell(new Failure<StatusResponse>(failure), getSelf());
179-
} else {
180-
sender.tell(success, getSelf());
181-
}
182-
}
183-
},
184-
executor);
162+
.build())
163+
.whenComplete(
164+
(result, failure) -> {
165+
if (failure != null) {
166+
sender.tell(new Failure<StatusResponse>(failure), getSelf());
167+
} else {
168+
sender.tell(result, getSelf());
169+
}
170+
});
185171
}
186172

187173
private boolean _quarantined = false;
@@ -191,11 +177,11 @@ public void onComplete(final Throwable failure, final StatusResponse success) {
191177
private final ActorRef _clusterStatusCache;
192178
private final ActorRef _localMetrics;
193179

194-
private static final CastMapper<Object, ClusterStatusCache.StatusResponse> CAST_MAPPER = new CastMapper<>();
180+
private static final CastMapper<ClusterStatusCache.StatusResponse> CAST_MAPPER = new CastMapper<>();
195181

196-
private static class AsNullRecovery<T> extends Recover<T> {
182+
private static class AsNullRecovery<T> implements Function<Throwable, T> {
197183
@Override
198-
public T recover(final Throwable failure) {
184+
public T apply(final Throwable failure) {
199185
return null;
200186
}
201187
}

src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import com.arpnetworking.akka.NonJoiningClusterJoiner;
2020
import com.arpnetworking.commons.builder.OvalBuilder;
2121
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
22+
import com.arpnetworking.configuration.jackson.akka.ActorBuilderDeserializer;
2223
import com.arpnetworking.utility.InterfaceDatabase;
2324
import com.arpnetworking.utility.ReflectionsDatabase;
2425
import com.fasterxml.jackson.databind.ObjectMapper;
26+
import com.fasterxml.jackson.databind.module.SimpleModule;
2527
import com.google.common.base.MoreObjects;
2628
import com.google.common.collect.Maps;
2729
import net.sf.oval.constraint.NotEmpty;
@@ -45,7 +47,11 @@ public final class ClusterAggregatorConfiguration {
4547
* @return An <code>ObjectMapper</code> for TsdAggregator configuration.
4648
*/
4749
public static ObjectMapper createObjectMapper() {
48-
return ObjectMapperFactory.createInstance();
50+
final ObjectMapper objectMapper = ObjectMapperFactory.createInstance();
51+
final SimpleModule module = new SimpleModule();
52+
module.addDeserializer(Props.class, new ActorBuilderDeserializer(objectMapper));
53+
objectMapper.registerModule(module);
54+
return objectMapper;
4955
}
5056

5157
public String getMonitoringCluster() {

src/main/java/com/arpnetworking/utility/CastMapper.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,21 @@
1616

1717
package com.arpnetworking.utility;
1818

19-
import akka.dispatch.Mapper;
19+
import java.util.function.Function;
2020

2121
/**
2222
* Map method that just casts to another class.
2323
*
24-
* @param <T> Input type
2524
* @param <R> Output type
2625
* @author Brandon Arp (brandonarp at gmail dot com)
2726
*/
28-
public class CastMapper<T, R> extends Mapper<T, R> {
27+
public class CastMapper<R> implements Function<Object, R> {
2928
/**
3029
* {@inheritDoc}
3130
*/
3231
@Override
3332
@SuppressWarnings("unchecked")
34-
public R apply(final T parameter) {
33+
public R apply(final Object parameter) {
3534
return (R) parameter;
3635
}
3736
}

src/main/java/com/arpnetworking/utility/CollectFutureBuilder.java

Lines changed: 0 additions & 112 deletions
This file was deleted.

0 commit comments

Comments
 (0)