@@ -164,54 +164,39 @@ private ActorSystem provideActorSystem(@Named("akka-config") final Config akkaCo
164164 @ Named ("cluster-emitter" )
165165 @ SuppressFBWarnings ("UPM_UNCALLED_PRIVATE_METHOD" ) // Invoked reflectively by Guice
166166 private ActorRef provideClusterEmitter (final Injector injector , final ActorSystem system ) {
167+ return launchEmitter (injector , system , _configuration .getClusterPipelineConfiguration (), "cluster-emitter-configurator" );
168+ }
169+
170+ @ Provides
171+ @ Singleton
172+ @ Named ("host-emitter" )
173+ @ SuppressFBWarnings ("UPM_UNCALLED_PRIVATE_METHOD" ) // Invoked reflectively by Guice
174+ private ActorRef provideHostEmitter (final Injector injector , final ActorSystem system ) {
175+ return launchEmitter (injector , system , _configuration .getHostPipelineConfiguration (), "host-emitter-configurator" );
176+ }
177+
178+ private ActorRef launchEmitter (final Injector injector , final ActorSystem system , final File pipelineFile , final String name ) {
167179 final ActorRef emitterConfigurationProxy = system .actorOf (
168180 ConfigurableActorProxy .props (new RoundRobinEmitterFactory ()),
169- "cluster-emitter-configurator" );
181+ name );
170182 final ActorConfigurator <EmitterConfiguration > configurator =
171183 new ActorConfigurator <>(emitterConfigurationProxy , EmitterConfiguration .class );
172184 final ObjectMapper objectMapper = EmitterConfiguration .createObjectMapper (injector );
173- final File configurationFile = _configuration .getClusterPipelineConfiguration ();
174185 final Builder <? extends JsonNodeSource > sourceBuilder ;
175- if (configurationFile .getName ().toLowerCase (Locale .getDefault ()).endsWith (HOCON_FILE_EXTENSION )) {
186+ if (pipelineFile .getName ().toLowerCase (Locale .getDefault ()).endsWith (HOCON_FILE_EXTENSION )) {
176187 sourceBuilder = new HoconFileSource .Builder ()
177188 .setObjectMapper (objectMapper )
178- .setFile (configurationFile );
189+ .setFile (pipelineFile );
179190 } else {
180191 sourceBuilder = new JsonNodeFileSource .Builder ()
181192 .setObjectMapper (objectMapper )
182- .setFile (configurationFile );
193+ .setFile (pipelineFile );
183194 }
184195
185196 final DynamicConfiguration configuration = new DynamicConfiguration .Builder ()
186197 .setObjectMapper (objectMapper )
187198 .addSourceBuilder (sourceBuilder )
188- .addTrigger (new FileTrigger .Builder ().setFile (_configuration .getClusterPipelineConfiguration ()).build ())
189- .addListener (configurator )
190- .build ();
191-
192- configuration .launch ();
193-
194- return emitterConfigurationProxy ;
195- }
196-
197- @ Provides
198- @ Singleton
199- @ Named ("host-emitter" )
200- @ SuppressFBWarnings ("UPM_UNCALLED_PRIVATE_METHOD" ) // Invoked reflectively by Guice
201- private ActorRef provideHostEmitter (final Injector injector , final ActorSystem system ) {
202- final ActorRef emitterConfigurationProxy = system .actorOf (
203- ConfigurableActorProxy .props (new RoundRobinEmitterFactory ()),
204- "host-emitter-configurator" );
205- final ActorConfigurator <EmitterConfiguration > configurator =
206- new ActorConfigurator <>(emitterConfigurationProxy , EmitterConfiguration .class );
207- final ObjectMapper objectMapper = EmitterConfiguration .createObjectMapper (injector );
208- final DynamicConfiguration configuration = new DynamicConfiguration .Builder ()
209- .setObjectMapper (objectMapper )
210- .addSourceBuilder (
211- new JsonNodeFileSource .Builder ()
212- .setObjectMapper (objectMapper )
213- .setFile (_configuration .getHostPipelineConfiguration ()))
214- .addTrigger (new FileTrigger .Builder ().setFile (_configuration .getHostPipelineConfiguration ()).build ())
199+ .addTrigger (new FileTrigger .Builder ().setFile (pipelineFile ).build ())
215200 .addListener (configurator )
216201 .build ();
217202
@@ -259,6 +244,14 @@ private ActorRef provideTcpServer(final Injector injector, final ActorSystem sys
259244 return system .actorOf (GuiceActorCreator .props (injector , AggClientServer .class ), "tcp-server" );
260245 }
261246
247+ @ Provides
248+ @ Singleton
249+ @ Named ("cluster-joiner" )
250+ @ SuppressFBWarnings ("UPM_UNCALLED_PRIVATE_METHOD" ) // Invoked reflectively by Guice
251+ private ActorRef provideClusterJoiner (final ActorSystem system , final ClusterAggregatorConfiguration config ) {
252+ return system .actorOf (config .getClusterJoinActor (), "cluster-joiner" );
253+ }
254+
262255 @ Provides
263256 @ Singleton
264257 @ Named ("aggregator-lifecycle" )
0 commit comments