Skip to content

Commit 13466ed

Browse files
committed
refactor emitter initialization
1 parent d2ce331 commit 13466ed

File tree

1 file changed

+17
-32
lines changed

1 file changed

+17
-32
lines changed

src/main/java/com/arpnetworking/clusteraggregator/GuiceModule.java

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -164,54 +164,39 @@ private ActorSystem provideActorSystem(@Named("akka-config") final Config akkaCo
164164
@Named("cluster-emitter")
165165
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
166166
private ActorRef provideClusterEmitter(final Injector injector, final ActorSystem system) {
167+
return launchEmitter(injector, system, _configuration.getClusterPipelineConfiguration(), "cluster-emitter-configurator");
168+
}
169+
170+
@Provides
171+
@Singleton
172+
@Named("host-emitter")
173+
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
174+
private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) {
175+
return launchEmitter(injector, system, _configuration.getHostPipelineConfiguration(), "host-emitter-configurator");
176+
}
177+
178+
private ActorRef launchEmitter(final Injector injector, final ActorSystem system, final File pipelineFile, final String name) {
167179
final ActorRef emitterConfigurationProxy = system.actorOf(
168180
ConfigurableActorProxy.props(new RoundRobinEmitterFactory()),
169-
"cluster-emitter-configurator");
181+
name);
170182
final ActorConfigurator<EmitterConfiguration> configurator =
171183
new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class);
172184
final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector);
173-
final File configurationFile = _configuration.getClusterPipelineConfiguration();
174185
final Builder<? extends JsonNodeSource> sourceBuilder;
175-
if (configurationFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) {
186+
if (pipelineFile.getName().toLowerCase(Locale.getDefault()).endsWith(HOCON_FILE_EXTENSION)) {
176187
sourceBuilder = new HoconFileSource.Builder()
177188
.setObjectMapper(objectMapper)
178-
.setFile(configurationFile);
189+
.setFile(pipelineFile);
179190
} else {
180191
sourceBuilder = new JsonNodeFileSource.Builder()
181192
.setObjectMapper(objectMapper)
182-
.setFile(configurationFile);
193+
.setFile(pipelineFile);
183194
}
184195

185196
final DynamicConfiguration configuration = new DynamicConfiguration.Builder()
186197
.setObjectMapper(objectMapper)
187198
.addSourceBuilder(sourceBuilder)
188-
.addTrigger(new FileTrigger.Builder().setFile(_configuration.getClusterPipelineConfiguration()).build())
189-
.addListener(configurator)
190-
.build();
191-
192-
configuration.launch();
193-
194-
return emitterConfigurationProxy;
195-
}
196-
197-
@Provides
198-
@Singleton
199-
@Named("host-emitter")
200-
@SuppressFBWarnings("UPM_UNCALLED_PRIVATE_METHOD") // Invoked reflectively by Guice
201-
private ActorRef provideHostEmitter(final Injector injector, final ActorSystem system) {
202-
final ActorRef emitterConfigurationProxy = system.actorOf(
203-
ConfigurableActorProxy.props(new RoundRobinEmitterFactory()),
204-
"host-emitter-configurator");
205-
final ActorConfigurator<EmitterConfiguration> configurator =
206-
new ActorConfigurator<>(emitterConfigurationProxy, EmitterConfiguration.class);
207-
final ObjectMapper objectMapper = EmitterConfiguration.createObjectMapper(injector);
208-
final DynamicConfiguration configuration = new DynamicConfiguration.Builder()
209-
.setObjectMapper(objectMapper)
210-
.addSourceBuilder(
211-
new JsonNodeFileSource.Builder()
212-
.setObjectMapper(objectMapper)
213-
.setFile(_configuration.getHostPipelineConfiguration()))
214-
.addTrigger(new FileTrigger.Builder().setFile(_configuration.getHostPipelineConfiguration()).build())
199+
.addTrigger(new FileTrigger.Builder().setFile(pipelineFile).build())
215200
.addListener(configurator)
216201
.build();
217202

0 commit comments

Comments
 (0)