Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ jvmMetricsCollectionInterval="PT.5S"
maxConnectionTimeout="PT2M"
minConnectionTimeout="PT1M"
clusterHostSuffix=".cluster"
clusterJoinActor.type="com.arpnetworking.akka.NonJoiningClusterJoiner"
rebalanceConfiguration {
maxParallel=100
threshold=500
Expand Down
60 changes: 60 additions & 0 deletions src/main/java/com/arpnetworking/akka/ActorBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Copyright 2016 InscopeMetrics, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arpnetworking.akka;

import akka.actor.Actor;
import akka.japi.Creator;
import com.arpnetworking.commons.builder.OvalBuilder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.function.Function;

/**
* Builder for actors.
*
* @param <B> The type of the builder
* @param <S> type of the object to be built
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
*/
@SuppressFBWarnings("SE_NO_SUITABLE_CONSTRUCTOR")
public abstract class ActorBuilder<B extends ActorBuilder<B, S>, S extends Actor> extends OvalBuilder<S> implements Creator<S> {
/**
* Protected constructor for subclasses.
*
* @param targetConstructor The constructor for the concrete type to be created by this builder.
*/
protected ActorBuilder(final Function<B, S> targetConstructor) {
super(targetConstructor);
}

/**
* Called by setters to always return appropriate subclass of
* {@link ActorBuilder}, even from setters of base class.
*
* @return instance with correct {@link ActorBuilder} class type.
*/
protected abstract B self();

/**
* {@inheritDoc}
*/
@Override
public S create() throws Exception {
return build();
}

private static final long serialVersionUID = 1L;
}
67 changes: 67 additions & 0 deletions src/main/java/com/arpnetworking/akka/NonJoiningClusterJoiner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Copyright 2016 Inscope Metrics, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.arpnetworking.akka;

import akka.actor.UntypedActor;
import com.arpnetworking.steno.Logger;
import com.arpnetworking.steno.LoggerFactory;

/**
* Actor that does not attempt to join a cluster.
*
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
*/
public final class NonJoiningClusterJoiner extends UntypedActor {
/**
* {@inheritDoc}
*/
@Override
public void onReceive(final Object message) throws Exception {
unhandled(message);
}

private NonJoiningClusterJoiner(final Builder builder) {
LOGGER.info()
.setMessage("NonJoiningClusterJoiner starting up")
.log();
}

private static final Logger LOGGER = LoggerFactory.getLogger(NonJoiningClusterJoiner.class);

/**
* Implementation of the {@link com.arpnetworking.commons.builder.Builder} pattern for a {@link NonJoiningClusterJoiner}.
*
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
*/
public static class Builder extends ActorBuilder<Builder, NonJoiningClusterJoiner> {
/**
* Public constructor.
*/
public Builder() {
super(NonJoiningClusterJoiner::new);
}

/**
* {@inheritDoc}
*/
@Override
public Builder self() {
return this;
}

private static final long serialVersionUID = 1L;
}
}
57 changes: 25 additions & 32 deletions src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,54 +164,39 @@ private ActorSystem provideActorSystem(@Named("akka-config") final Config akkaCo
@Named("cluster-emitter")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
private ActorRef provideClusterEmitter(final Injector injector, final ActorSystem system) {
return launchEmitter(injector, system, _configuration.getClusterPipelineConfiguration(), "cluster-emitter-configurator");
}

@Provides
@Singleton
@Named("host-emitter")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) {
return launchEmitter(injector, system, _configuration.getHostPipelineConfiguration(), "host-emitter-configurator");
}

private ActorRef launchEmitter(final Injector injector, final ActorSystem system, final File pipelineFile, final String name) {
final ActorRef emitterConfigurationProxy = system.actorOf(
ConfigurableActorProxy.props(new RoundRobinEmitterFactory()),
"cluster-emitter-configurator");
name);
final ActorConfigurator<EmitterConfiguration> configurator =
new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class);
final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector);
final File configurationFile = _configuration.getClusterPipelineConfiguration();
final Builder<? extends JsonNodeSource> sourceBuilder;
if (configurationFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) {
if (pipelineFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) {
sourceBuilder = new HoconFileSource.Builder()
.setObjectMapper(objectMapper)
.setFile(configurationFile);
.setFile(pipelineFile);
} else {
sourceBuilder = new JsonNodeFileSource.Builder()
.setObjectMapper(objectMapper)
.setFile(configurationFile);
.setFile(pipelineFile);
}

final DynamicConfiguration configuration = new DynamicConfiguration.Builder()
.setObjectMapper(objectMapper)
.addSourceBuilder(sourceBuilder)
.addTrigger(new FileTrigger.Builder().setFile(_configuration.getClusterPipelineConfiguration()).build())
.addListener(configurator)
.build();

configuration.launch();

return emitterConfigurationProxy;
}

@Provides
@Singleton
@Named("host-emitter")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) {
final ActorRef emitterConfigurationProxy = system.actorOf(
ConfigurableActorProxy.props(new RoundRobinEmitterFactory()),
"host-emitter-configurator");
final ActorConfigurator<EmitterConfiguration> configurator =
new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class);
final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector);
final DynamicConfiguration configuration = new DynamicConfiguration.Builder()
.setObjectMapper(objectMapper)
.addSourceBuilder(
new JsonNodeFileSource.Builder()
.setObjectMapper(objectMapper)
.setFile(_configuration.getHostPipelineConfiguration()))
.addTrigger(new FileTrigger.Builder().setFile(_configuration.getHostPipelineConfiguration()).build())
.addTrigger(new FileTrigger.Builder().setFile(pipelineFile).build())
.addListener(configurator)
.build();

Expand Down Expand Up @@ -259,6 +244,14 @@ private ActorRef provideTcpServer(final Injector injector, final ActorSystem sys
return system.actorOf(GuiceActorCreator.props(injector, AggClientServer.class), "tcp-server");
}

@Provides
@Singleton
@Named("cluster-joiner")
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
private ActorRef provideClusterJoiner(final ActorSystem system, final ClusterAggregatorConfiguration config) {
return system.actorOf(Props.create(config.getClusterJoinActor()), "cluster-joiner");
}

@Provides
@Singleton
@Named("aggregator-lifecycle")
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/arpnetworking/clusteraggregator/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ private void launchActors(final Injector injector) {
.log();
injector.getInstance(Key.get(ActorRef.class, Names.named("jvm-metrics-collector")));

LOGGER.info()
.setMessage("Launching cluster joiner")
.log();
injector.getInstance(Key.get(ActorRef.class, Names.named("cluster-joiner")));

LOGGER.info()
.setMessage("Launching http server")
.log();
Expand Down
Loading