Skip to content

Commit 1db7922

Browse files
committed
create actor builder, add cluster join actor
1 parent 9bae531 commit 1db7922

File tree

8 files changed

+318
-33
lines changed

8 files changed

+318
-33
lines changed

config/config.hocon

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ jvmMetricsCollectionInterval="PT.5S"
1212
maxConnectionTimeout="PT2M"
1313
minConnectionTimeout="PT1M"
1414
clusterHostSuffix=".cluster"
15+
clusterJoinActor.type="com.arpnetworking.akka.NonJoiningClusterJoiner"
1516
rebalanceConfiguration {
1617
maxParallel=100
1718
threshold=500
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright 2016 InscopeMetrics, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.akka;
17+
18+
import akka.actor.Props;
19+
import com.arpnetworking.commons.builder.OvalBuilder;
20+
21+
import java.util.function.Function;
22+
23+
/**
24+
* Builder for actors.
25+
*
26+
* @param <B> The type of the builder
27+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
28+
*/
29+
public abstract class ActorBuilder<B extends ActorBuilder<B>> extends OvalBuilder<Props> {
30+
/**
31+
* Protected constructor.
32+
*
33+
* @param createProps method to create a {@link Props} from the {@link ActorBuilder}
34+
*/
35+
protected ActorBuilder(final Function<B, Props> createProps) {
36+
super(createProps);
37+
}
38+
39+
/**
40+
* Called by setters to always return appropriate subclass of
41+
* {@link ActorBuilder}, even from setters of base class.
42+
*
43+
* @return instance with correct {@link ActorBuilder} class type.
44+
*/
45+
protected abstract B self();
46+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Copyright 2016 Inscope Metrics, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.akka;
17+
18+
import akka.actor.Props;
19+
import akka.actor.UntypedActor;
20+
import com.arpnetworking.steno.Logger;
21+
import com.arpnetworking.steno.LoggerFactory;
22+
23+
/**
24+
* Actor that does not attempt to join a cluster.
25+
*
26+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
27+
*/
28+
public class NonJoiningClusterJoiner extends UntypedActor {
29+
/**
30+
* Static factory method for creating a {@link Props} to create a {@link NonJoiningClusterJoiner} actor.
31+
*
32+
* @return a new {@link Props}
33+
*/
34+
public static Props props() {
35+
return Props.create(NonJoiningClusterJoiner.class);
36+
}
37+
38+
/**
39+
* Static factory method for creating a {@link Props} to create a {@link NonJoiningClusterJoiner} actor from a
40+
* {@link Builder}.
41+
*
42+
* @param builder Builder to create the Props from
43+
* @return a new {@link Props}
44+
*/
45+
private static Props props(final Builder builder) {
46+
return props();
47+
}
48+
49+
/**
50+
* Public constructor.
51+
*/
52+
public NonJoiningClusterJoiner() {
53+
LOGGER.info()
54+
.setMessage("NonJoiningClusterJoiner starting up")
55+
.log();
56+
}
57+
58+
@Override
59+
public void onReceive(final Object message) throws Exception {
60+
unhandled(message);
61+
}
62+
63+
private static final Logger LOGGER = LoggerFactory.getLogger(NonJoiningClusterJoiner.class);
64+
65+
/**
66+
* Implementation of the {@link com.arpnetworking.commons.builder.Builder} pattern for a {@link NonJoiningClusterJoiner}.
67+
*
68+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
69+
*/
70+
public static class Builder extends ActorBuilder<Builder> {
71+
/**
72+
* Public constructor.
73+
*/
74+
public Builder() {
75+
super(NonJoiningClusterJoiner::props);
76+
}
77+
78+
@Override
79+
public Builder self() {
80+
return this;
81+
}
82+
}
83+
}

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -164,54 +164,39 @@ private ActorSystem provideActorSystem(@Named("akka-config") final Config akkaCo
164164
@Named("cluster-emitter")
165165
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
166166
private ActorRef provideClusterEmitter(final Injector injector, final ActorSystem system) {
167+
return launchEmitter(injector, system, _configuration.getClusterPipelineConfiguration(), "cluster-emitter-configurator");
168+
}
169+
170+
@Provides
171+
@Singleton
172+
@Named("host-emitter")
173+
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
174+
private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) {
175+
return launchEmitter(injector, system, _configuration.getHostPipelineConfiguration(), "host-emitter-configurator");
176+
}
177+
178+
private ActorRef launchEmitter(final Injector injector, final ActorSystem system, final File pipelineFile, final String name) {
167179
final ActorRef emitterConfigurationProxy = system.actorOf(
168180
ConfigurableActorProxy.props(new RoundRobinEmitterFactory()),
169-
"cluster-emitter-configurator");
181+
name);
170182
final ActorConfigurator<EmitterConfiguration> configurator =
171183
new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class);
172184
final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector);
173-
final File configurationFile = _configuration.getClusterPipelineConfiguration();
174185
final Builder<? extends JsonNodeSource> sourceBuilder;
175-
if (configurationFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) {
186+
if (pipelineFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) {
176187
sourceBuilder = new HoconFileSource.Builder()
177188
.setObjectMapper(objectMapper)
178-
.setFile(configurationFile);
189+
.setFile(pipelineFile);
179190
} else {
180191
sourceBuilder = new JsonNodeFileSource.Builder()
181192
.setObjectMapper(objectMapper)
182-
.setFile(configurationFile);
193+
.setFile(pipelineFile);
183194
}
184195

185196
final DynamicConfiguration configuration = new DynamicConfiguration.Builder()
186197
.setObjectMapper(objectMapper)
187198
.addSourceBuilder(sourceBuilder)
188-
.addTrigger(new FileTrigger.Builder().setFile(_configuration.getClusterPipelineConfiguration()).build())
189-
.addListener(configurator)
190-
.build();
191-
192-
configuration.launch();
193-
194-
return emitterConfigurationProxy;
195-
}
196-
197-
@Provides
198-
@Singleton
199-
@Named("host-emitter")
200-
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
201-
private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) {
202-
final ActorRef emitterConfigurationProxy = system.actorOf(
203-
ConfigurableActorProxy.props(new RoundRobinEmitterFactory()),
204-
"host-emitter-configurator");
205-
final ActorConfigurator<EmitterConfiguration> configurator =
206-
new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class);
207-
final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector);
208-
final DynamicConfiguration configuration = new DynamicConfiguration.Builder()
209-
.setObjectMapper(objectMapper)
210-
.addSourceBuilder(
211-
new JsonNodeFileSource.Builder()
212-
.setObjectMapper(objectMapper)
213-
.setFile(_configuration.getHostPipelineConfiguration()))
214-
.addTrigger(new FileTrigger.Builder().setFile(_configuration.getHostPipelineConfiguration()).build())
199+
.addTrigger(new FileTrigger.Builder().setFile(pipelineFile).build())
215200
.addListener(configurator)
216201
.build();
217202

@@ -259,6 +244,14 @@ private ActorRef provideTcpServer(final Injector injector, final ActorSystem sys
259244
return system.actorOf(GuiceActorCreator.props(injector, AggClientServer.class), "tcp-server");
260245
}
261246

247+
@Provides
248+
@Singleton
249+
@Named("cluster-joiner")
250+
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
251+
private ActorRef provideClusterJoiner(final ActorSystem system, final ClusterAggregatorConfiguration config) {
252+
return system.actorOf(config.getClusterJoinActor(), "cluster-joiner");
253+
}
254+
262255
@Provides
263256
@Singleton
264257
@Named("aggregator-lifecycle")
@@ -358,6 +351,14 @@ private ActorRef provideGracefulShutdownActor(final ActorSystem system, final In
358351
return system.actorOf(GuiceActorCreator.props(injector, GracefulShutdownActor.class), "graceful-shutdown");
359352
}
360353

354+
// @Provides
355+
// @Singleton
356+
// @Named("cluster-joiner")
357+
// @SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
358+
// private ActorRef provideClusterJoiner(final ActorSystem system, final Injector injector) {
359+
// return system.actorOf(GuiceActorCreator.props(injector, NonJoiningClusterJoiner.class), "cluster-joiner");
360+
// }
361+
361362
@Provides
362363
@Named("cluster-host-suffix")
363364
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice

src/main/java/com/arpnetworking/clusteraggregator/Main.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,22 @@
1717

1818
import akka.actor.ActorRef;
1919
import akka.actor.ActorSystem;
20+
import akka.actor.Props;
2021
import ch.qos.logback.classic.LoggerContext;
2122
import com.arpnetworking.clusteraggregator.configuration.ClusterAggregatorConfiguration;
2223
import com.arpnetworking.commons.builder.Builder;
2324
import com.arpnetworking.configuration.jackson.DynamicConfiguration;
2425
import com.arpnetworking.configuration.jackson.HoconFileSource;
2526
import com.arpnetworking.configuration.jackson.JsonNodeFileSource;
2627
import com.arpnetworking.configuration.jackson.JsonNodeSource;
28+
import com.arpnetworking.configuration.jackson.akka.ActorBuilderDeserializer;
2729
import com.arpnetworking.configuration.triggers.FileTrigger;
2830
import com.arpnetworking.steno.Logger;
2931
import com.arpnetworking.utility.Configurator;
3032
import com.arpnetworking.utility.Database;
3133
import com.arpnetworking.utility.Launchable;
3234
import com.fasterxml.jackson.databind.ObjectMapper;
35+
import com.fasterxml.jackson.databind.module.SimpleModule;
3336
import com.google.common.collect.Lists;
3437
import com.google.inject.Guice;
3538
import com.google.inject.Injector;
@@ -97,6 +100,9 @@ public static void main(final String[] args) {
97100
final File configurationFile = new File(args[0]);
98101
configurator = Optional.of(new Configurator<>(Main::new, ClusterAggregatorConfiguration.class));
99102
final ObjectMapper objectMapper = ClusterAggregatorConfiguration.createObjectMapper();
103+
final SimpleModule module = new SimpleModule();
104+
module.addDeserializer(Props.class, new ActorBuilderDeserializer(objectMapper));
105+
objectMapper.registerModule(module);
100106
configuration = Optional.of(new DynamicConfiguration.Builder()
101107
.setObjectMapper(objectMapper)
102108
.addSourceBuilder(getFileSourceBuilder(configurationFile, objectMapper))
@@ -196,6 +202,11 @@ private void launchActors(final Injector injector) {
196202
.log();
197203
injector.getInstance(Key.get(ActorRef.class, Names.named("jvm-metrics-collector")));
198204

205+
LOGGER.info()
206+
.setMessage("Launching cluster joiner")
207+
.log();
208+
injector.getInstance(Key.get(ActorRef.class, Names.named("cluster-joiner")));
209+
199210
LOGGER.info()
200211
.setMessage("Launching http server")
201212
.log();

src/main/java/com/arpnetworking/clusteraggregator/configuration/ClusterAggregatorConfiguration.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.arpnetworking.clusteraggregator.configuration;
1717

18+
import akka.actor.Props;
19+
import com.arpnetworking.akka.NonJoiningClusterJoiner;
1820
import com.arpnetworking.commons.builder.OvalBuilder;
1921
import com.arpnetworking.commons.jackson.databind.ObjectMapperFactory;
2022
import com.arpnetworking.utility.InterfaceDatabase;
@@ -43,7 +45,7 @@ public final class ClusterAggregatorConfiguration {
4345
* @return An <code>ObjectMapper</code> for TsdAggregator configuration.
4446
*/
4547
public static ObjectMapper createObjectMapper() {
46-
return ObjectMapperFactory.getInstance();
48+
return ObjectMapperFactory.createInstance();
4749
}
4850

4951
public String getMonitoringCluster() {
@@ -114,6 +116,10 @@ public String getClusterHostSuffix() {
114116
return _clusterHostSuffix;
115117
}
116118

119+
public Props getClusterJoinActor() {
120+
return _clusterJoinActor;
121+
}
122+
117123
/**
118124
* {@inheritDoc}
119125
*/
@@ -138,6 +144,7 @@ public String toString() {
138144
.add("RebalanceConfiguration", _rebalanceConfiguration)
139145
.add("ClusterHostSuffix", _clusterHostSuffix)
140146
.add("DatabaseConfigurations", _databaseConfigurations)
147+
.add("ClusterJoinActor", _clusterJoinActor)
141148
.toString();
142149
}
143150

@@ -159,6 +166,7 @@ private ClusterAggregatorConfiguration(final Builder builder) {
159166
_rebalanceConfiguration = builder._rebalanceConfiguration;
160167
_clusterHostSuffix = builder._clusterHostSuffix;
161168
_databaseConfigurations = Maps.newHashMap(builder._databaseConfigurations);
169+
_clusterJoinActor = builder._clusterJoinActor;
162170
}
163171

164172
private final String _monitoringCluster;
@@ -177,6 +185,7 @@ private ClusterAggregatorConfiguration(final Builder builder) {
177185
private final Period _jvmMetricsCollectionInterval;
178186
private final RebalanceConfiguration _rebalanceConfiguration;
179187
private final String _clusterHostSuffix;
188+
private final Props _clusterJoinActor;
180189
private final Map<String, DatabaseConfiguration> _databaseConfigurations;
181190

182191
private static final InterfaceDatabase INTERFACE_DATABASE = ReflectionsDatabase.newInstance();
@@ -390,6 +399,17 @@ public Builder setDatabaseConfigurations(final Map<String, DatabaseConfiguration
390399
return this;
391400
}
392401

402+
/**
403+
* Configuration a cluster join actor.
404+
*
405+
* @param value The cluster join actor configuration.
406+
* @return This instance of <code>Builder</code>.
407+
*/
408+
public Builder setClusterJoinActor(final Props value) {
409+
_clusterJoinActor = value;
410+
return this;
411+
}
412+
393413
@NotNull
394414
@NotEmpty
395415
private String _monitoringCluster;
@@ -429,6 +449,8 @@ public Builder setDatabaseConfigurations(final Map<String, DatabaseConfiguration
429449
private RebalanceConfiguration _rebalanceConfiguration;
430450
@NotNull
431451
private String _clusterHostSuffix = "";
452+
@NotNull
453+
private Props _clusterJoinActor = new NonJoiningClusterJoiner.Builder().build();
432454
private Map<String, DatabaseConfiguration> _databaseConfigurations;
433455
}
434456
}

0 commit comments

Comments
 (0)