2121import akka .actor .UntypedActor ;
2222import akka .cluster .Cluster ;
2323import akka .cluster .MemberStatus ;
24- import akka .dispatch .OnComplete ;
25- import akka .dispatch .Recover ;
26- import akka .pattern .Patterns ;
24+ import akka .pattern .PatternsCS ;
2725import akka .remote .AssociationErrorEvent ;
2826import akka .util .Timeout ;
2927import com .arpnetworking .clusteraggregator .models .BookkeeperData ;
3028import com .arpnetworking .clusteraggregator .models .MetricsRequest ;
3129import com .arpnetworking .clusteraggregator .models .PeriodMetrics ;
3230import com .arpnetworking .clusteraggregator .models .StatusResponse ;
3331import com .arpnetworking .utility .CastMapper ;
34- import com .arpnetworking .utility .CollectFutureBuilder ;
3532import org .joda .time .Period ;
36- import scala .concurrent .ExecutionContextExecutor ;
37- import scala .concurrent .Future ;
38- import scala .runtime .AbstractFunction0 ;
3933import scala .util .Failure ;
4034
4135import java .util .Map ;
36+ import java .util .concurrent .CompletableFuture ;
37+ import java .util .concurrent .CompletionStage ;
4238import java .util .concurrent .TimeUnit ;
39+ import java .util .function .Function ;
4340
4441/**
4542 * Periodically polls the cluster status and caches the result.
@@ -108,80 +105,69 @@ public void onReceive(final Object message) throws Exception {
108105 _quarantined = true ;
109106 }
110107 } else if (message instanceof HealthRequest ) {
111- final ExecutionContextExecutor executor = getContext ().dispatcher ();
112- final Future <ClusterStatusCache .StatusResponse > stateFuture = Patterns
108+ final CompletionStage <ClusterStatusCache .StatusResponse > stateFuture = PatternsCS
113109 .ask (
114110 _clusterStatusCache ,
115111 new ClusterStatusCache .GetRequest (),
116112 Timeout .apply (3 , TimeUnit .SECONDS ))
117- .map (CAST_MAPPER , executor );
118- stateFuture .onComplete (
119- new OnComplete <ClusterStatusCache .StatusResponse >() {
120- @ Override
121- public void onComplete (final Throwable failure , final ClusterStatusCache .StatusResponse success ) {
122- final boolean healthy = _cluster .readView ().self ().status () == MemberStatus .up () && !_quarantined ;
123- sender .tell (healthy , getSelf ());
124- }
125- },
126- executor );
113+ .thenApply (CAST_MAPPER );
114+ stateFuture .whenComplete (
115+ (statusResponse , throwable ) -> {
116+ final boolean healthy = _cluster .readView ().self ().status () == MemberStatus .up () && !_quarantined ;
117+ sender .tell (healthy , getSelf ());
118+ });
127119 } else {
128120 unhandled (message );
129121 }
130122 }
131123
132124 private void processStatusRequest (final ActorRef sender ) {
133- final ExecutionContextExecutor executor = getContext ().dispatcher ();
134125 // Call the bookkeeper
135- final Future <BookkeeperData > bookkeeperFuture = Patterns .ask (
126+ final CompletableFuture <BookkeeperData > bookkeeperFuture = PatternsCS .ask (
136127 _metricsBookkeeper ,
137128 new MetricsRequest (),
138129 Timeout .apply (3 , TimeUnit .SECONDS ))
139- .map (new CastMapper <>(), executor )
140- .recover (new AsNullRecovery <>(), executor );
141- final Future <ClusterStatusCache .StatusResponse > clusterStateFuture =
142- Patterns .ask (
130+ .thenApply (new CastMapper <BookkeeperData >())
131+ .exceptionally (new AsNullRecovery <>())
132+ .toCompletableFuture ();
133+
134+ final CompletableFuture <ClusterStatusCache .StatusResponse > clusterStateFuture =
135+ PatternsCS .ask (
143136 _clusterStatusCache ,
144137 new ClusterStatusCache .GetRequest (),
145138 Timeout .apply (3 , TimeUnit .SECONDS ))
146- .map (CAST_MAPPER , executor )
147- .recover (new AsNullRecovery <>(), executor );
139+ .thenApply (CAST_MAPPER )
140+ .exceptionally (new AsNullRecovery <>())
141+ .toCompletableFuture ();
148142
149- final Future <Map <Period , PeriodMetrics >> localMetricsFuture =
150- Patterns .ask (
143+ final CompletableFuture <Map <Period , PeriodMetrics >> localMetricsFuture =
144+ PatternsCS .ask (
151145 _localMetrics ,
152146 new MetricsRequest (),
153147 Timeout .apply (3 , TimeUnit .SECONDS ))
154- .map (new CastMapper <>(), executor )
155- .recover (new AsNullRecovery <>(), executor );
156-
157- final Future <StatusResponse > future = new CollectFutureBuilder <StatusResponse >()
158- .addFuture (bookkeeperFuture )
159- .addFuture (clusterStateFuture )
160- .addFuture (localMetricsFuture )
161- .map (new AbstractFunction0 <StatusResponse >() {
162- @ Override
163- public StatusResponse apply () {
164- return new StatusResponse .Builder ()
165- .setClusterMetrics (bookkeeperFuture .value ().get ().get ())
166- .setClusterState (clusterStateFuture .value ().get ().get ())
167- .setLocalMetrics (localMetricsFuture .value ().get ().get ())
148+ .thenApply (new CastMapper <Map <Period , PeriodMetrics >>())
149+ .exceptionally (new AsNullRecovery <>())
150+ .toCompletableFuture ();
151+
152+ CompletableFuture .allOf (
153+ bookkeeperFuture ,
154+ clusterStateFuture ,
155+ localMetricsFuture )
156+ .thenApply (
157+ (v ) -> new StatusResponse .Builder ()
158+ .setClusterMetrics (bookkeeperFuture .getNow (null ))
159+ .setClusterState (clusterStateFuture .getNow (null ))
160+ .setLocalMetrics (localMetricsFuture .getNow (null ))
168161 .setLocalAddress (_cluster .selfAddress ())
169- .build ();
170- }
171- })
172- .build (executor );
173- future .onComplete (
174- new OnComplete <StatusResponse >() {
175- @ Override
176- public void onComplete (final Throwable failure , final StatusResponse success ) {
177- if (failure != null ) {
178- sender .tell (new Failure <StatusResponse >(failure ), getSelf ());
179- } else {
180- sender .tell (success , getSelf ());
181- }
182- }
183- },
184- executor );
162+ .build ())
163+ .whenComplete (
164+ (result , failure ) -> {
165+ if (failure != null ) {
166+ sender .tell (new Failure <StatusResponse >(failure ), getSelf ());
167+ } else {
168+ sender .tell (result , getSelf ());
169+ }
170+ });
185171 }
186172
187173 private boolean _quarantined = false ;
@@ -191,11 +177,11 @@ public void onComplete(final Throwable failure, final StatusResponse success) {
191177 private final ActorRef _clusterStatusCache ;
192178 private final ActorRef _localMetrics ;
193179
194- private static final CastMapper <Object , ClusterStatusCache .StatusResponse > CAST_MAPPER = new CastMapper <>();
180+ private static final CastMapper <ClusterStatusCache .StatusResponse > CAST_MAPPER = new CastMapper <>();
195181
196- private static class AsNullRecovery <T > extends Recover < T > {
182+ private static class AsNullRecovery <T > implements Function < Throwable , T > {
197183 @ Override
198- public T recover (final Throwable failure ) {
184+ public T apply (final Throwable failure ) {
199185 return null ;
200186 }
201187 }
0 commit comments