@@ -43,8 +43,6 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
4343 streamsLog.info(" Initialising the Streams Sink module" )
4444 val streamsSinkConfiguration = StreamsSinkConfiguration .from(configuration)
4545 val streamsTopicService = StreamsTopicService (db)
46- streamsTopicService.clearAll()
47- streamsTopicService.setAll(streamsSinkConfiguration.topics)
4846 val strategyMap = TopicUtils .toStrategyMap(streamsSinkConfiguration.topics,
4947 streamsSinkConfiguration.sourceIdStrategyConfig)
5048 val streamsQueryExecution = StreamsEventSinkQueryExecution (streamsTopicService, db,
@@ -62,10 +60,10 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
6260 // start the Sink
6361 if (Neo4jUtils .isCluster(db)) {
6462 log.info(" The Sink module is running in a cluster, checking for the ${Neo4jUtils .LEADER } " )
65- Neo4jUtils .executeInLeader (db, log) { initSinkModule() }
63+ Neo4jUtils .waitForTheLeader (db, log) { initSinkModule(streamsTopicService, streamsSinkConfiguration ) }
6664 } else {
6765 // check if is writeable instance
68- Neo4jUtils .executeInWriteableInstance(db) { initSinkModule() }
66+ Neo4jUtils .executeInWriteableInstance(db) { initSinkModule(streamsTopicService, streamsSinkConfiguration ) }
6967 }
7068
7169 // Register required services for the Procedures
@@ -80,7 +78,9 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
8078 }
8179 }
8280
83- private fun initSinkModule () {
81+ private fun initSinkModule (streamsTopicService : StreamsTopicService , streamsSinkConfiguration : StreamsSinkConfiguration ) {
82+ streamsTopicService.clearAll()
83+ streamsTopicService.setAll(streamsSinkConfiguration.topics)
8484 eventSink.start()
8585 streamsLog.info(" Streams Sink module initialised" )
8686 }
0 commit comments