From 5a50c8f75f4c07a91c6bcd2f213059a9fc85e015 Mon Sep 17 00:00:00 2001 From: Brandon Arp Date: Sat, 26 Nov 2016 21:44:50 -0800 Subject: [PATCH 1/2] create actor builder, add cluster join actor --- config/config.conf | 1 + .../com/arpnetworking/akka/ActorBuilder.java | 46 +++++++ .../akka/NonJoiningClusterJoiner.java | 90 ++++++++++++++ .../clusteraggregator/GuiceModule.java | 57 ++++----- .../arpnetworking/clusteraggregator/Main.java | 5 + .../clusteraggregator/Status.java | 110 ++++++++--------- .../ClusterAggregatorConfiguration.java | 30 ++++- .../akka/ActorBuilderDeserializer.java | 69 +++++++++++ .../com/arpnetworking/utility/CastMapper.java | 7 +- .../utility/CollectFutureBuilder.java | 112 ------------------ .../configuration/ActorBuilderTest.java | 52 ++++++++ 11 files changed, 368 insertions(+), 211 deletions(-) create mode 100644 src/main/java/com/arpnetworking/akka/ActorBuilder.java create mode 100644 src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java create mode 100644 src/main/java/com/arpnetworking/configuration/jackson/akka/ActorBuilderDeserializer.java delete mode 100644 src/main/java/com/arpnetworking/utility/CollectFutureBuilder.java create mode 100644 src/test/java/com/arpnetworking/clusteraggregator/configuration/ActorBuilderTest.java diff --git a/config/config.conf b/config/config.conf index 1464ea1c..b0070a25 100644 --- a/config/config.conf +++ b/config/config.conf @@ -12,6 +12,7 @@ jvmMetricsCollectionInterval="PT.5S" maxConnectionTimeout="PT2M" minConnectionTimeout="PT1M" clusterHostSuffix=".cluster" +clusterJoinActor.type="com.arpnetworking.akka.NonJoiningClusterJoiner" rebalanceConfiguration { maxParallel=100 threshold=500 diff --git a/src/main/java/com/arpnetworking/akka/ActorBuilder.java b/src/main/java/com/arpnetworking/akka/ActorBuilder.java new file mode 100644 index 00000000..71b6ca76 --- /dev/null +++ b/src/main/java/com/arpnetworking/akka/ActorBuilder.java @@ -0,0 +1,46 @@ +/** + * Copyright 2016 InscopeMetrics, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arpnetworking.akka; + +import akka.actor.Props; +import com.arpnetworking.commons.builder.OvalBuilder; + +import java.util.function.Function; + +/** + * Builder for actors. + * + * @param The type of the builder + * @author Brandon Arp (brandon dot arp at inscopemetrics dot com) + */ +public abstract class ActorBuilder> extends OvalBuilder { + /** + * Protected constructor. + * + * @param createProps method to create a {@link Props} from the {@link ActorBuilder} + */ + protected ActorBuilder(final Function createProps) { + super(createProps); + } + + /** + * Called by setters to always return appropriate subclass of + * {@link ActorBuilder}, even from setters of base class. + * + * @return instance with correct {@link ActorBuilder} class type. + */ + protected abstract B self(); +} diff --git a/src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java b/src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java new file mode 100644 index 00000000..87168f9e --- /dev/null +++ b/src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java @@ -0,0 +1,90 @@ +/** + * Copyright 2016 Inscope Metrics, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arpnetworking.akka; + +import akka.actor.Props; +import akka.actor.UntypedActor; +import com.arpnetworking.steno.Logger; +import com.arpnetworking.steno.LoggerFactory; + +/** + * Actor that does not attempt to join a cluster. + * + * @author Brandon Arp (brandon dot arp at inscopemetrics dot com) + */ +public final class NonJoiningClusterJoiner extends UntypedActor { + /** + * Static factory method for creating a {@link Props} to create a {@link NonJoiningClusterJoiner} actor. + * + * @return a new {@link Props} + */ + public static Props props() { + return Props.create(NonJoiningClusterJoiner.class); + } + + /** + * Static factory method for creating a {@link Props} to create a {@link NonJoiningClusterJoiner} actor from a + * {@link Builder}. + * + * @param builder Builder to create the Props from + * @return a new {@link Props} + */ + private static Props props(final Builder builder) { + return props(); + } + + /** + * Public constructor. + */ + public NonJoiningClusterJoiner() { + LOGGER.info() + .setMessage("NonJoiningClusterJoiner starting up") + .log(); + } + + + /** + * {@inheritDoc} + */ + @Override + public void onReceive(final Object message) throws Exception { + unhandled(message); + } + + private static final Logger LOGGER = LoggerFactory.getLogger(NonJoiningClusterJoiner.class); + + /** + * Implementation of the {@link com.arpnetworking.commons.builder.Builder} pattern for a {@link NonJoiningClusterJoiner}. + * + * @author Brandon Arp (brandon dot arp at inscopemetrics dot com) + */ + public static class Builder extends ActorBuilder { + /** + * Public constructor. + */ + public Builder() { + super(NonJoiningClusterJoiner::props); + } + + /** + * {@inheritDoc} + */ + @Override + public Builder self() { + return this; + } + } +} diff --git a/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java b/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java index cf1439fb..425f5526 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java @@ -164,54 +164,39 @@ private ActorSystem provideActorSystem(@Named("akka-config") final Config akkaCo @Named("cluster-emitter") @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice private ActorRef provideClusterEmitter(final Injector injector, final ActorSystem system) { + return launchEmitter(injector, system, _configuration.getClusterPipelineConfiguration(), "cluster-emitter-configurator"); + } + + @Provides + @Singleton + @Named("host-emitter") + @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice + private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) { + return launchEmitter(injector, system, _configuration.getHostPipelineConfiguration(), "host-emitter-configurator"); + } + + private ActorRef launchEmitter(final Injector injector, final ActorSystem system, final File pipelineFile, final String name) { final ActorRef emitterConfigurationProxy = system.actorOf( ConfigurableActorProxy.props(new RoundRobinEmitterFactory()), - "cluster-emitter-configurator"); + name); final ActorConfigurator configurator = new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class); final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector); - final File configurationFile = _configuration.getClusterPipelineConfiguration(); final Builder sourceBuilder; - if (configurationFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) { + if (pipelineFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) { sourceBuilder = new HoconFileSource.Builder() .setObjectMapper(objectMapper) - .setFile(configurationFile); + .setFile(pipelineFile); } else { sourceBuilder = new JsonNodeFileSource.Builder() .setObjectMapper(objectMapper) - .setFile(configurationFile); + .setFile(pipelineFile); } final DynamicConfiguration configuration = new DynamicConfiguration.Builder() .setObjectMapper(objectMapper) .addSourceBuilder(sourceBuilder) - .addTrigger(new FileTrigger.Builder().setFile(_configuration.getClusterPipelineConfiguration()).build()) - .addListener(configurator) - .build(); - - configuration.launch(); - - return emitterConfigurationProxy; - } - - @Provides - @Singleton - @Named("host-emitter") - @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice - private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) { - final ActorRef emitterConfigurationProxy = system.actorOf( - ConfigurableActorProxy.props(new RoundRobinEmitterFactory()), - "host-emitter-configurator"); - final ActorConfigurator configurator = - new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class); - final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector); - final DynamicConfiguration configuration = new DynamicConfiguration.Builder() - .setObjectMapper(objectMapper) - .addSourceBuilder( - new JsonNodeFileSource.Builder() - .setObjectMapper(objectMapper) - .setFile(_configuration.getHostPipelineConfiguration())) - .addTrigger(new FileTrigger.Builder().setFile(_configuration.getHostPipelineConfiguration()).build()) + .addTrigger(new FileTrigger.Builder().setFile(pipelineFile).build()) .addListener(configurator) .build(); @@ -259,6 +244,14 @@ private ActorRef provideTcpServer(final Injector injector, final ActorSystem sys return system.actorOf(GuiceActorCreator.props(injector, AggClientServer.class), "tcp-server"); } + @Provides + @Singleton + @Named("cluster-joiner") + @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice + private ActorRef provideClusterJoiner(final ActorSystem system, final ClusterAggregatorConfiguration config) { + return system.actorOf(config.getClusterJoinActor(), "cluster-joiner"); + } + @Provides @Singleton @Named("aggregator-lifecycle") diff --git a/src/main/java/com/arpnetworking/clusteraggregator/Main.java b/src/main/java/com/arpnetworking/clusteraggregator/Main.java index b2280a67..f9647198 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/Main.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/Main.java @@ -196,6 +196,11 @@ private void launchActors(final Injector injector) { .log(); injector.getInstance(Key.get(ActorRef.class, Names.named("jvm-metrics-collector"))); + LOGGER.info() + .setMessage("Launching cluster joiner") + .log(); + injector.getInstance(Key.get(ActorRef.class, Names.named("cluster-joiner"))); + LOGGER.info() .setMessage("Launching http server") .log(); diff --git a/src/main/java/com/arpnetworking/clusteraggregator/Status.java b/src/main/java/com/arpnetworking/clusteraggregator/Status.java index 3685be58..ca5c41a8 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/Status.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/Status.java @@ -21,9 +21,7 @@ import akka.actor.UntypedActor; import akka.cluster.Cluster; import akka.cluster.MemberStatus; -import akka.dispatch.OnComplete; -import akka.dispatch.Recover; -import akka.pattern.Patterns; +import akka.pattern.PatternsCS; import akka.remote.AssociationErrorEvent; import akka.util.Timeout; import com.arpnetworking.clusteraggregator.models.BookkeeperData; @@ -31,15 +29,14 @@ import com.arpnetworking.clusteraggregator.models.PeriodMetrics; import com.arpnetworking.clusteraggregator.models.StatusResponse; import com.arpnetworking.utility.CastMapper; -import com.arpnetworking.utility.CollectFutureBuilder; import org.joda.time.Period; -import scala.concurrent.ExecutionContextExecutor; -import scala.concurrent.Future; -import scala.runtime.AbstractFunction0; import scala.util.Failure; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.function.Function; /** * Periodically polls the cluster status and caches the result. @@ -108,80 +105,69 @@ public void onReceive(final Object message) throws Exception { _quarantined = true; } } else if (message instanceof HealthRequest) { - final ExecutionContextExecutor executor = getContext().dispatcher(); - final Future stateFuture = Patterns + final CompletionStage stateFuture = PatternsCS .ask( _clusterStatusCache, new ClusterStatusCache.GetRequest(), Timeout.apply(3, TimeUnit.SECONDS)) - .map(CAST_MAPPER, executor); - stateFuture.onComplete( - new OnComplete() { - @Override - public void onComplete(final Throwable failure, final ClusterStatusCache.StatusResponse success) { - final boolean healthy = _cluster.readView().self().status() == MemberStatus.up() && !_quarantined; - sender.tell(healthy, getSelf()); - } - }, - executor); + .thenApply(CAST_MAPPER); + stateFuture.whenComplete( + (statusResponse, throwable) -> { + final boolean healthy = MemberStatus.up().equals(_cluster.readView().self().status()) && !_quarantined; + sender.tell(healthy, getSelf()); + }); } else { unhandled(message); } } private void processStatusRequest(final ActorRef sender) { - final ExecutionContextExecutor executor = getContext().dispatcher(); // Call the bookkeeper - final Future bookkeeperFuture = Patterns.ask( + final CompletableFuture bookkeeperFuture = PatternsCS.ask( _metricsBookkeeper, new MetricsRequest(), Timeout.apply(3, TimeUnit.SECONDS)) - .map(new CastMapper<>(), executor) - .recover(new AsNullRecovery<>(), executor); - final Future clusterStateFuture = - Patterns.ask( + .thenApply(new CastMapper()) + .exceptionally(new AsNullRecovery<>()) + .toCompletableFuture(); + + final CompletableFuture clusterStateFuture = + PatternsCS.ask( _clusterStatusCache, new ClusterStatusCache.GetRequest(), Timeout.apply(3, TimeUnit.SECONDS)) - .map(CAST_MAPPER, executor) - .recover(new AsNullRecovery<>(), executor); + .thenApply(CAST_MAPPER) + .exceptionally(new AsNullRecovery<>()) + .toCompletableFuture(); - final Future> localMetricsFuture = - Patterns.ask( + final CompletableFuture> localMetricsFuture = + PatternsCS.ask( _localMetrics, new MetricsRequest(), Timeout.apply(3, TimeUnit.SECONDS)) - .map(new CastMapper<>(), executor) - .recover(new AsNullRecovery<>(), executor); - - final Future future = new CollectFutureBuilder() - .addFuture(bookkeeperFuture) - .addFuture(clusterStateFuture) - .addFuture(localMetricsFuture) - .map(new AbstractFunction0() { - @Override - public StatusResponse apply() { - return new StatusResponse.Builder() - .setClusterMetrics(bookkeeperFuture.value().get().get()) - .setClusterState(clusterStateFuture.value().get().get()) - .setLocalMetrics(localMetricsFuture.value().get().get()) + .thenApply(new CastMapper>()) + .exceptionally(new AsNullRecovery<>()) + .toCompletableFuture(); + + CompletableFuture.allOf( + bookkeeperFuture, + clusterStateFuture, + localMetricsFuture) + .thenApply( + (v) -> new StatusResponse.Builder() + .setClusterMetrics(bookkeeperFuture.getNow(null)) + .setClusterState(clusterStateFuture.getNow(null)) + .setLocalMetrics(localMetricsFuture.getNow(null)) .setLocalAddress(_cluster.selfAddress()) - .build(); - } - }) - .build(executor); - future.onComplete( - new OnComplete() { - @Override - public void onComplete(final Throwable failure, final StatusResponse success) { - if (failure != null) { - sender.tell(new Failure(failure), getSelf()); - } else { - sender.tell(success, getSelf()); - } - } - }, - executor); + .build()) + .whenComplete( + (result, failure) -> { + if (failure != null) { + sender.tell(new Failure(failure), getSelf()); + } else { + sender.tell(result, getSelf()); + } + }); } private boolean _quarantined = false; @@ -191,11 +177,11 @@ public void onComplete(final Throwable failure, final StatusResponse success) { private final ActorRef _clusterStatusCache; private final ActorRef _localMetrics; - private static final CastMapper CAST_MAPPER = new CastMapper<>(); + private static final CastMapper CAST_MAPPER = new CastMapper<>(); - private static class AsNullRecovery extends Recover { + private static class AsNullRecovery implements Function { @Override - public T recover(final Throwable failure) { + public T apply(final Throwable failure) { return null; } } diff --git a/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java b/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java index 729223d2..3cc93516 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java @@ -15,11 +15,15 @@ */ package com.arpnetworking.clusteraggregator.configuration; +import akka.actor.Props; +import com.arpnetworking.akka.NonJoiningClusterJoiner; import com.arpnetworking.commons.builder.OvalBuilder; import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory; +import com.arpnetworking.configuration.jackson.akka.ActorBuilderDeserializer; import com.arpnetworking.utility.InterfaceDatabase; import com.arpnetworking.utility.ReflectionsDatabase; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.MoreObjects; import com.google.common.collect.Maps; import net.sf.oval.constraint.NotEmpty; @@ -43,7 +47,11 @@ public final class ClusterAggregatorConfiguration { * @return An ObjectMapper for TsdAggregator configuration. */ public static ObjectMapper createObjectMapper() { - return ObjectMapperFactory.getInstance(); + final ObjectMapper objectMapper = ObjectMapperFactory.createInstance(); + final SimpleModule module = new SimpleModule(); + module.addDeserializer(Props.class, new ActorBuilderDeserializer(objectMapper)); + objectMapper.registerModule(module); + return objectMapper; } public String getMonitoringCluster() { @@ -114,6 +122,10 @@ public String getClusterHostSuffix() { return _clusterHostSuffix; } + public Props getClusterJoinActor() { + return _clusterJoinActor; + } + /** * {@inheritDoc} */ @@ -138,6 +150,7 @@ public String toString() { .add("RebalanceConfiguration", _rebalanceConfiguration) .add("ClusterHostSuffix", _clusterHostSuffix) .add("DatabaseConfigurations", _databaseConfigurations) + .add("ClusterJoinActor", _clusterJoinActor) .toString(); } @@ -159,6 +172,7 @@ private ClusterAggregatorConfiguration(final Builder builder) { _rebalanceConfiguration = builder._rebalanceConfiguration; _clusterHostSuffix = builder._clusterHostSuffix; _databaseConfigurations = Maps.newHashMap(builder._databaseConfigurations); + _clusterJoinActor = builder._clusterJoinActor; } private final String _monitoringCluster; @@ -177,6 +191,7 @@ private ClusterAggregatorConfiguration(final Builder builder) { private final Period _jvmMetricsCollectionInterval; private final RebalanceConfiguration _rebalanceConfiguration; private final String _clusterHostSuffix; + private final Props _clusterJoinActor; private final Map _databaseConfigurations; private static final InterfaceDatabase INTERFACE_DATABASE = ReflectionsDatabase.newInstance(); @@ -390,6 +405,17 @@ public Builder setDatabaseConfigurations(final MapBuilder. + */ + public Builder setClusterJoinActor(final Props value) { + _clusterJoinActor = value; + return this; + } + @NotNull @NotEmpty private String _monitoringCluster; @@ -429,6 +455,8 @@ public Builder setDatabaseConfigurations(final Map _databaseConfigurations; } } diff --git a/src/main/java/com/arpnetworking/configuration/jackson/akka/ActorBuilderDeserializer.java b/src/main/java/com/arpnetworking/configuration/jackson/akka/ActorBuilderDeserializer.java new file mode 100644 index 00000000..0f3f96ba --- /dev/null +++ b/src/main/java/com/arpnetworking/configuration/jackson/akka/ActorBuilderDeserializer.java @@ -0,0 +1,69 @@ +/** + * Copyright 2016 InscopeMetrics, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arpnetworking.configuration.jackson.akka; + +import akka.actor.Props; +import com.arpnetworking.commons.builder.Builder; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.TextNode; + +import java.io.IOException; + +/** + * Deserializer that will create an ActorBuilder for the given actor, then create a Props from Guice. + * + * @author Brandon Arp (brandon dot arp at inscopemetrics dot com) + */ +public class ActorBuilderDeserializer extends JsonDeserializer { + /** + * Public constructor. + * + * @param mapper the {@link ObjectMapper} to use to deserialize the {@link Builder} + */ + public ActorBuilderDeserializer(final ObjectMapper mapper) { + _mapper = mapper; + } + + @Override + public Props deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + final TreeNode treeNode = p.readValueAsTree(); + final String type = ((TextNode) treeNode.get("type")).textValue(); + try { + final Class clazz = Class.forName(type); + final Class> builder = getBuilderForClass(clazz); + final Builder value = _mapper.readValue(treeNode.traverse(), builder); + return value.build(); + } catch (final ClassNotFoundException e) { + throw new JsonMappingException(p, String.format("Unable to find class %s referenced by Props type", type)); + } + } + + @SuppressWarnings("unchecked") + private static Class> getBuilderForClass(final Class clazz) + throws ClassNotFoundException { + return (Class>) (Class.forName( + clazz.getName() + "$Builder", + true, // initialize + clazz.getClassLoader())); + } + + private final ObjectMapper _mapper; +} diff --git a/src/main/java/com/arpnetworking/utility/CastMapper.java b/src/main/java/com/arpnetworking/utility/CastMapper.java index 9bf2036e..cf61379e 100644 --- a/src/main/java/com/arpnetworking/utility/CastMapper.java +++ b/src/main/java/com/arpnetworking/utility/CastMapper.java @@ -16,22 +16,21 @@ package com.arpnetworking.utility; -import akka.dispatch.Mapper; +import java.util.function.Function; /** * Map method that just casts to another class. * - * @param Input type * @param Output type * @author Brandon Arp (brandonarp at gmail dot com) */ -public class CastMapper extends Mapper { +public class CastMapper implements Function { /** * {@inheritDoc} */ @Override @SuppressWarnings("unchecked") - public R apply(final T parameter) { + public R apply(final Object parameter) { return (R) parameter; } } diff --git a/src/main/java/com/arpnetworking/utility/CollectFutureBuilder.java b/src/main/java/com/arpnetworking/utility/CollectFutureBuilder.java deleted file mode 100644 index 42379e68..00000000 --- a/src/main/java/com/arpnetworking/utility/CollectFutureBuilder.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Copyright 2014 Groupon.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.arpnetworking.utility; - -import akka.dispatch.Futures; -import akka.dispatch.OnComplete; -import com.google.common.collect.Lists; -import scala.Function0; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.runtime.AbstractFunction0; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Collects futures and provides them in a combined promise. - * - * @param Return future type - * @author Brandon Arp (brandonarp at gmail dot com) - */ -public final class CollectFutureBuilder { - /** - * Method to create a {@code } from the completed {@link scala.concurrent.Future}s. - * - * @param callback Callback function - * @return this builder - */ - public CollectFutureBuilder map(final Function0 callback) { - _callback = callback; - return this; - } - - /** - * Registers a {@link scala.concurrent.Future} in the collection. A future must be registered in order - * to be waited on. - * - * @param future Future to register - * @return this builder - */ - public CollectFutureBuilder addFuture(final Future future) { - _futures.add(future); - return this; - } - - /** - * Sets the list of {@link scala.concurrent.Future}s to wait on. - * - * @param futures The list of futures - * @return this builder - */ - public CollectFutureBuilder setFutures(final List> futures) { - _futures.clear(); - _futures.addAll(futures); - return this; - } - - /** - * Builds the final future. - * - * @param context context to execute the futures on - * @return the new future - */ - @SuppressWarnings("unchecked") - public Future build(final ExecutionContext context) { - final Promise result = Futures.promise(); - final AtomicInteger latch = new AtomicInteger(_futures.size()); - - final OnComplete onComplete = new OnComplete() { - @Override - public void onComplete(final Throwable failure, final Object success) { - if (failure != null) { - result.failure(failure); - } - final int count = latch.decrementAndGet(); - if (count == 0) { - result.success(_callback.apply()); - } - } - }; - - for (final Future future : _futures) { - ((Future) future).onComplete(onComplete, context); - } - - return result.future(); - } - - private Function0 _callback = new AbstractFunction0() { - @Override - public T apply() { - return null; - } - }; - - private final List> _futures = Lists.newArrayList(); -} - diff --git a/src/test/java/com/arpnetworking/clusteraggregator/configuration/ActorBuilderTest.java b/src/test/java/com/arpnetworking/clusteraggregator/configuration/ActorBuilderTest.java new file mode 100644 index 00000000..10b0a32f --- /dev/null +++ b/src/test/java/com/arpnetworking/clusteraggregator/configuration/ActorBuilderTest.java @@ -0,0 +1,52 @@ +/** + * Copyright 2016 InscopeMetrics, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.arpnetworking.clusteraggregator.configuration; + +import akka.actor.Props; +import com.arpnetworking.akka.NonJoiningClusterJoiner; +import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory; +import com.arpnetworking.configuration.jackson.akka.ActorBuilderDeserializer; +import com.arpnetworking.utility.BaseActorTest; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.intellij.lang.annotations.Language; +import org.junit.Test; + +import java.io.IOException; + +/** + * @author Brandon Arp (brandon dot arp at inscopemetrics dot com) + */ +public class ActorBuilderTest extends BaseActorTest { + @Test + public void testBuild() { + final NonJoiningClusterJoiner.Builder builder = new NonJoiningClusterJoiner.Builder(); + builder.build(); + } + + @Test + public void testPolyDeserialize() throws IOException { + final ObjectMapper mapper = ObjectMapperFactory.createInstance(); + final SimpleModule module = new SimpleModule(); + module.addDeserializer(Props.class, new ActorBuilderDeserializer(mapper)); + mapper.registerModule(module); + + @Language("JSON") final String data = "{\n" + + " \"type\": \"com.arpnetworking.akka.NonJoiningClusterJoiner\"\n" + + "}"; + final Props props = mapper.readValue(data, Props.class); + } +} From 290c558e6716a1b0256e3425f97a82feaecbdf01 Mon Sep 17 00:00:00 2001 From: Brandon Arp Date: Tue, 31 Jan 2017 22:54:07 -0800 Subject: [PATCH 2/2] PR feedback, move the actor builder closer to desired --- .../com/arpnetworking/akka/ActorBuilder.java | 26 +++++++++--- .../akka/NonJoiningClusterJoiner.java | 41 ++++--------------- .../clusteraggregator/GuiceModule.java | 2 +- .../ClusterAggregatorConfiguration.java | 13 +++--- .../akka/ActorBuilderDeserializer.java | 17 ++++---- .../configuration/ActorBuilderTest.java | 13 ++++-- 6 files changed, 56 insertions(+), 56 deletions(-) diff --git a/src/main/java/com/arpnetworking/akka/ActorBuilder.java b/src/main/java/com/arpnetworking/akka/ActorBuilder.java index 71b6ca76..58af2cb6 100644 --- a/src/main/java/com/arpnetworking/akka/ActorBuilder.java +++ b/src/main/java/com/arpnetworking/akka/ActorBuilder.java @@ -15,8 +15,10 @@ */ package com.arpnetworking.akka; -import akka.actor.Props; +import akka.actor.Actor; +import akka.japi.Creator; import com.arpnetworking.commons.builder.OvalBuilder; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.function.Function; @@ -24,16 +26,18 @@ * Builder for actors. * * @param The type of the builder + * @param type of the object to be built * @author Brandon Arp (brandon dot arp at inscopemetrics dot com) */ -public abstract class ActorBuilder> extends OvalBuilder { +@SuppressFBWarnings("SE_NO_SUITABLE_CONSTRUCTOR") +public abstract class ActorBuilder, S extends Actor> extends OvalBuilder implements Creator { /** - * Protected constructor. + * Protected constructor for subclasses. * - * @param createProps method to create a {@link Props} from the {@link ActorBuilder} + * @param targetConstructor The constructor for the concrete type to be created by this builder. */ - protected ActorBuilder(final Function createProps) { - super(createProps); + protected ActorBuilder(final Function targetConstructor) { + super(targetConstructor); } /** @@ -43,4 +47,14 @@ protected ActorBuilder(final Function createProps) { * @return instance with correct {@link ActorBuilder} class type. */ protected abstract B self(); + + /** + * {@inheritDoc} + */ + @Override + public S create() throws Exception { + return build(); + } + + private static final long serialVersionUID = 1L; } diff --git a/src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java b/src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java index 87168f9e..d409548f 100644 --- a/src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java +++ b/src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java @@ -15,7 +15,6 @@ */ package com.arpnetworking.akka; -import akka.actor.Props; import akka.actor.UntypedActor; import com.arpnetworking.steno.Logger; import com.arpnetworking.steno.LoggerFactory; @@ -27,43 +26,19 @@ */ public final class NonJoiningClusterJoiner extends UntypedActor { /** - * Static factory method for creating a {@link Props} to create a {@link NonJoiningClusterJoiner} actor. - * - * @return a new {@link Props} - */ - public static Props props() { - return Props.create(NonJoiningClusterJoiner.class); - } - - /** - * Static factory method for creating a {@link Props} to create a {@link NonJoiningClusterJoiner} actor from a - * {@link Builder}. - * - * @param builder Builder to create the Props from - * @return a new {@link Props} + * {@inheritDoc} */ - private static Props props(final Builder builder) { - return props(); + @Override + public void onReceive(final Object message) throws Exception { + unhandled(message); } - /** - * Public constructor. - */ - public NonJoiningClusterJoiner() { + private NonJoiningClusterJoiner(final Builder builder) { LOGGER.info() .setMessage("NonJoiningClusterJoiner starting up") .log(); } - - /** - * {@inheritDoc} - */ - @Override - public void onReceive(final Object message) throws Exception { - unhandled(message); - } - private static final Logger LOGGER = LoggerFactory.getLogger(NonJoiningClusterJoiner.class); /** @@ -71,12 +46,12 @@ public void onReceive(final Object message) throws Exception { * * @author Brandon Arp (brandon dot arp at inscopemetrics dot com) */ - public static class Builder extends ActorBuilder { + public static class Builder extends ActorBuilder { /** * Public constructor. */ public Builder() { - super(NonJoiningClusterJoiner::props); + super(NonJoiningClusterJoiner::new); } /** @@ -86,5 +61,7 @@ public Builder() { public Builder self() { return this; } + + private static final long serialVersionUID = 1L; } } diff --git a/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java b/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java index 425f5526..6f674371 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java @@ -249,7 +249,7 @@ private ActorRef provideTcpServer(final Injector injector, final ActorSystem sys @Named("cluster-joiner") @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice private ActorRef provideClusterJoiner(final ActorSystem system, final ClusterAggregatorConfiguration config) { - return system.actorOf(config.getClusterJoinActor(), "cluster-joiner"); + return system.actorOf(Props.create(config.getClusterJoinActor()), "cluster-joiner"); } @Provides diff --git a/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java b/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java index 3cc93516..cc32b741 100644 --- a/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java +++ b/src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java @@ -15,7 +15,8 @@ */ package com.arpnetworking.clusteraggregator.configuration; -import akka.actor.Props; +import akka.actor.Actor; +import com.arpnetworking.akka.ActorBuilder; import com.arpnetworking.akka.NonJoiningClusterJoiner; import com.arpnetworking.commons.builder.OvalBuilder; import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory; @@ -49,7 +50,7 @@ public final class ClusterAggregatorConfiguration { public static ObjectMapper createObjectMapper() { final ObjectMapper objectMapper = ObjectMapperFactory.createInstance(); final SimpleModule module = new SimpleModule(); - module.addDeserializer(Props.class, new ActorBuilderDeserializer(objectMapper)); + module.addDeserializer(ActorBuilder.class, new ActorBuilderDeserializer(objectMapper)); objectMapper.registerModule(module); return objectMapper; } @@ -122,7 +123,7 @@ public String getClusterHostSuffix() { return _clusterHostSuffix; } - public Props getClusterJoinActor() { + public ActorBuilder getClusterJoinActor() { return _clusterJoinActor; } @@ -191,7 +192,7 @@ private ClusterAggregatorConfiguration(final Builder builder) { private final Period _jvmMetricsCollectionInterval; private final RebalanceConfiguration _rebalanceConfiguration; private final String _clusterHostSuffix; - private final Props _clusterJoinActor; + private final ActorBuilder _clusterJoinActor; private final Map _databaseConfigurations; private static final InterfaceDatabase INTERFACE_DATABASE = ReflectionsDatabase.newInstance(); @@ -411,7 +412,7 @@ public Builder setDatabaseConfigurations(final MapBuilder. */ - public Builder setClusterJoinActor(final Props value) { + public Builder setClusterJoinActor(final ActorBuilder value) { _clusterJoinActor = value; return this; } @@ -456,7 +457,7 @@ public Builder setClusterJoinActor(final Props value) { @NotNull private String _clusterHostSuffix = ""; @NotNull - private Props _clusterJoinActor = new NonJoiningClusterJoiner.Builder().build(); + private ActorBuilder _clusterJoinActor = new NonJoiningClusterJoiner.Builder(); private Map _databaseConfigurations; } } diff --git a/src/main/java/com/arpnetworking/configuration/jackson/akka/ActorBuilderDeserializer.java b/src/main/java/com/arpnetworking/configuration/jackson/akka/ActorBuilderDeserializer.java index 0f3f96ba..dfdd8392 100644 --- a/src/main/java/com/arpnetworking/configuration/jackson/akka/ActorBuilderDeserializer.java +++ b/src/main/java/com/arpnetworking/configuration/jackson/akka/ActorBuilderDeserializer.java @@ -15,7 +15,8 @@ */ package com.arpnetworking.configuration.jackson.akka; -import akka.actor.Props; +import akka.actor.Actor; +import com.arpnetworking.akka.ActorBuilder; import com.arpnetworking.commons.builder.Builder; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.TreeNode; @@ -32,7 +33,7 @@ * * @author Brandon Arp (brandon dot arp at inscopemetrics dot com) */ -public class ActorBuilderDeserializer extends JsonDeserializer { +public class ActorBuilderDeserializer extends JsonDeserializer> { /** * Public constructor. * @@ -43,23 +44,23 @@ public ActorBuilderDeserializer(final ObjectMapper mapper) { } @Override - public Props deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { + public ActorBuilder, ? extends Actor> deserialize(final JsonParser p, final DeserializationContext ctxt) + throws IOException { final TreeNode treeNode = p.readValueAsTree(); final String type = ((TextNode) treeNode.get("type")).textValue(); try { final Class clazz = Class.forName(type); - final Class> builder = getBuilderForClass(clazz); - final Builder value = _mapper.readValue(treeNode.traverse(), builder); - return value.build(); + final Class, ? extends Actor>> builder = getBuilderForClass(clazz); + return _mapper.readValue(treeNode.traverse(), builder); } catch (final ClassNotFoundException e) { throw new JsonMappingException(p, String.format("Unable to find class %s referenced by Props type", type)); } } @SuppressWarnings("unchecked") - private static Class> getBuilderForClass(final Class clazz) + private static Class, ? extends Actor>> getBuilderForClass(final Class clazz) throws ClassNotFoundException { - return (Class>) (Class.forName( + return (Class, ? extends Actor>>) (Class.forName( clazz.getName() + "$Builder", true, // initialize clazz.getClassLoader())); diff --git a/src/test/java/com/arpnetworking/clusteraggregator/configuration/ActorBuilderTest.java b/src/test/java/com/arpnetworking/clusteraggregator/configuration/ActorBuilderTest.java index 10b0a32f..76a0b4fe 100644 --- a/src/test/java/com/arpnetworking/clusteraggregator/configuration/ActorBuilderTest.java +++ b/src/test/java/com/arpnetworking/clusteraggregator/configuration/ActorBuilderTest.java @@ -15,7 +15,10 @@ */ package com.arpnetworking.clusteraggregator.configuration; +import akka.actor.ActorRef; +import akka.actor.PoisonPill; import akka.actor.Props; +import com.arpnetworking.akka.ActorBuilder; import com.arpnetworking.akka.NonJoiningClusterJoiner; import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory; import com.arpnetworking.configuration.jackson.akka.ActorBuilderDeserializer; @@ -34,19 +37,23 @@ public class ActorBuilderTest extends BaseActorTest { @Test public void testBuild() { final NonJoiningClusterJoiner.Builder builder = new NonJoiningClusterJoiner.Builder(); - builder.build(); + + final ActorRef ref = getSystem().actorOf(Props.create(builder)); + ref.tell(PoisonPill.getInstance(), ActorRef.noSender()); } @Test public void testPolyDeserialize() throws IOException { final ObjectMapper mapper = ObjectMapperFactory.createInstance(); final SimpleModule module = new SimpleModule(); - module.addDeserializer(Props.class, new ActorBuilderDeserializer(mapper)); + module.addDeserializer(ActorBuilder.class, new ActorBuilderDeserializer(mapper)); mapper.registerModule(module); @Language("JSON") final String data = "{\n" + " \"type\": \"com.arpnetworking.akka.NonJoiningClusterJoiner\"\n" + "}"; - final Props props = mapper.readValue(data, Props.class); + final ActorBuilder builder = mapper.readValue(data, ActorBuilder.class); + final ActorRef ref = getSystem().actorOf(Props.create(builder)); + ref.tell(PoisonPill.getInstance(), ActorRef.noSender()); } }