21
21
import akka .actor .UntypedActor ;
22
22
import akka .cluster .Cluster ;
23
23
import akka .cluster .MemberStatus ;
24
- import akka .dispatch .OnComplete ;
25
- import akka .dispatch .Recover ;
26
- import akka .pattern .Patterns ;
24
+ import akka .pattern .PatternsCS ;
27
25
import akka .remote .AssociationErrorEvent ;
28
26
import akka .util .Timeout ;
29
27
import com .arpnetworking .clusteraggregator .models .BookkeeperData ;
30
28
import com .arpnetworking .clusteraggregator .models .MetricsRequest ;
31
29
import com .arpnetworking .clusteraggregator .models .PeriodMetrics ;
32
30
import com .arpnetworking .clusteraggregator .models .StatusResponse ;
33
31
import com .arpnetworking .utility .CastMapper ;
34
- import com .arpnetworking .utility .CollectFutureBuilder ;
35
32
import org .joda .time .Period ;
36
- import scala .concurrent .ExecutionContextExecutor ;
37
- import scala .concurrent .Future ;
38
- import scala .runtime .AbstractFunction0 ;
39
33
import scala .util .Failure ;
40
34
41
35
import java .util .Map ;
36
+ import java .util .concurrent .CompletableFuture ;
37
+ import java .util .concurrent .CompletionStage ;
42
38
import java .util .concurrent .TimeUnit ;
39
+ import java .util .function .Function ;
43
40
44
41
/**
45
42
* Periodically polls the cluster status and caches the result.
@@ -108,80 +105,69 @@ public void onReceive(final Object message) throws Exception {
108
105
_quarantined = true ;
109
106
}
110
107
} else if (message instanceof HealthRequest ) {
111
- final ExecutionContextExecutor executor = getContext ().dispatcher ();
112
- final Future <ClusterStatusCache .StatusResponse > stateFuture = Patterns
108
+ final CompletionStage <ClusterStatusCache .StatusResponse > stateFuture = PatternsCS
113
109
.ask (
114
110
_clusterStatusCache ,
115
111
new ClusterStatusCache .GetRequest (),
116
112
Timeout .apply (3 , TimeUnit .SECONDS ))
117
- .map (CAST_MAPPER , executor );
118
- stateFuture .onComplete (
119
- new OnComplete <ClusterStatusCache .StatusResponse >() {
120
- @ Override
121
- public void onComplete (final Throwable failure , final ClusterStatusCache .StatusResponse success ) {
122
- final boolean healthy = _cluster .readView ().self ().status () == MemberStatus .up () && !_quarantined ;
123
- sender .tell (healthy , getSelf ());
124
- }
125
- },
126
- executor );
113
+ .thenApply (CAST_MAPPER );
114
+ stateFuture .whenComplete (
115
+ (statusResponse , throwable ) -> {
116
+ final boolean healthy = _cluster .readView ().self ().status () == MemberStatus .up () && !_quarantined ;
117
+ sender .tell (healthy , getSelf ());
118
+ });
127
119
} else {
128
120
unhandled (message );
129
121
}
130
122
}
131
123
132
124
private void processStatusRequest (final ActorRef sender ) {
133
- final ExecutionContextExecutor executor = getContext ().dispatcher ();
134
125
// Call the bookkeeper
135
- final Future <BookkeeperData > bookkeeperFuture = Patterns .ask (
126
+ final CompletableFuture <BookkeeperData > bookkeeperFuture = PatternsCS .ask (
136
127
_metricsBookkeeper ,
137
128
new MetricsRequest (),
138
129
Timeout .apply (3 , TimeUnit .SECONDS ))
139
- .map (new CastMapper <>(), executor )
140
- .recover (new AsNullRecovery <>(), executor );
141
- final Future <ClusterStatusCache .StatusResponse > clusterStateFuture =
142
- Patterns .ask (
130
+ .thenApply (new CastMapper <BookkeeperData >())
131
+ .exceptionally (new AsNullRecovery <>())
132
+ .toCompletableFuture ();
133
+
134
+ final CompletableFuture <ClusterStatusCache .StatusResponse > clusterStateFuture =
135
+ PatternsCS .ask (
143
136
_clusterStatusCache ,
144
137
new ClusterStatusCache .GetRequest (),
145
138
Timeout .apply (3 , TimeUnit .SECONDS ))
146
- .map (CAST_MAPPER , executor )
147
- .recover (new AsNullRecovery <>(), executor );
139
+ .thenApply (CAST_MAPPER )
140
+ .exceptionally (new AsNullRecovery <>())
141
+ .toCompletableFuture ();
148
142
149
- final Future <Map <Period , PeriodMetrics >> localMetricsFuture =
150
- Patterns .ask (
143
+ final CompletableFuture <Map <Period , PeriodMetrics >> localMetricsFuture =
144
+ PatternsCS .ask (
151
145
_localMetrics ,
152
146
new MetricsRequest (),
153
147
Timeout .apply (3 , TimeUnit .SECONDS ))
154
- .map (new CastMapper <>(), executor )
155
- .recover (new AsNullRecovery <>(), executor );
156
-
157
- final Future <StatusResponse > future = new CollectFutureBuilder <StatusResponse >()
158
- .addFuture (bookkeeperFuture )
159
- .addFuture (clusterStateFuture )
160
- .addFuture (localMetricsFuture )
161
- .map (new AbstractFunction0 <StatusResponse >() {
162
- @ Override
163
- public StatusResponse apply () {
164
- return new StatusResponse .Builder ()
165
- .setClusterMetrics (bookkeeperFuture .value ().get ().get ())
166
- .setClusterState (clusterStateFuture .value ().get ().get ())
167
- .setLocalMetrics (localMetricsFuture .value ().get ().get ())
148
+ .thenApply (new CastMapper <Map <Period , PeriodMetrics >>())
149
+ .exceptionally (new AsNullRecovery <>())
150
+ .toCompletableFuture ();
151
+
152
+ CompletableFuture .allOf (
153
+ bookkeeperFuture ,
154
+ clusterStateFuture ,
155
+ localMetricsFuture )
156
+ .thenApply (
157
+ (v ) -> new StatusResponse .Builder ()
158
+ .setClusterMetrics (bookkeeperFuture .getNow (null ))
159
+ .setClusterState (clusterStateFuture .getNow (null ))
160
+ .setLocalMetrics (localMetricsFuture .getNow (null ))
168
161
.setLocalAddress (_cluster .selfAddress ())
169
- .build ();
170
- }
171
- })
172
- .build (executor );
173
- future .onComplete (
174
- new OnComplete <StatusResponse >() {
175
- @ Override
176
- public void onComplete (final Throwable failure , final StatusResponse success ) {
177
- if (failure != null ) {
178
- sender .tell (new Failure <StatusResponse >(failure ), getSelf ());
179
- } else {
180
- sender .tell (success , getSelf ());
181
- }
182
- }
183
- },
184
- executor );
162
+ .build ())
163
+ .whenComplete (
164
+ (result , failure ) -> {
165
+ if (failure != null ) {
166
+ sender .tell (new Failure <StatusResponse >(failure ), getSelf ());
167
+ } else {
168
+ sender .tell (result , getSelf ());
169
+ }
170
+ });
185
171
}
186
172
187
173
private boolean _quarantined = false ;
@@ -191,11 +177,11 @@ public void onComplete(final Throwable failure, final StatusResponse success) {
191
177
private final ActorRef _clusterStatusCache ;
192
178
private final ActorRef _localMetrics ;
193
179
194
- private static final CastMapper <Object , ClusterStatusCache .StatusResponse > CAST_MAPPER = new CastMapper <>();
180
+ private static final CastMapper <ClusterStatusCache .StatusResponse > CAST_MAPPER = new CastMapper <>();
195
181
196
- private static class AsNullRecovery <T > extends Recover < T > {
182
+ private static class AsNullRecovery <T > implements Function < Throwable , T > {
197
183
@ Override
198
- public T recover (final Throwable failure ) {
184
+ public T apply (final Throwable failure ) {
199
185
return null ;
200
186
}
201
187
}
0 commit comments