@@ -45,8 +45,6 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
4545 streamsLog.info(" Initialising the Streams Sink module" )
4646 val streamsSinkConfiguration = StreamsSinkConfiguration .from(configuration)
4747 val streamsTopicService = StreamsTopicService (db)
48- streamsTopicService.clearAll()
49- streamsTopicService.setAll(streamsSinkConfiguration.topics)
5048 val strategyMap = TopicUtils .toStrategyMap(streamsSinkConfiguration.topics,
5149 streamsSinkConfiguration.sourceIdStrategyConfig)
5250 val streamsQueryExecution = StreamsEventSinkQueryExecution (streamsTopicService, db,
@@ -64,10 +62,10 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
6462 // start the Sink
6563 if (Neo4jUtils .isCluster(db)) {
6664 log.info(" The Sink module is running in a cluster, checking for the ${Neo4jUtils .LEADER } " )
67- Neo4jUtils .executeInLeader (db, log) { initSinkModule() }
65+ Neo4jUtils .waitForTheLeader (db, log) { initSinkModule(streamsTopicService, streamsSinkConfiguration ) }
6866 } else {
6967 // check if is writeable instance
70- Neo4jUtils .executeInWriteableInstance(db) { initSinkModule() }
68+ Neo4jUtils .executeInWriteableInstance(db) { initSinkModule(streamsTopicService, streamsSinkConfiguration ) }
7169 }
7270
7371 // Register required services for the Procedures
@@ -82,7 +80,9 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
8280 }
8381 }
8482
85- private fun initSinkModule () {
83+ private fun initSinkModule (streamsTopicService : StreamsTopicService , streamsSinkConfiguration : StreamsSinkConfiguration ) {
84+ streamsTopicService.clearAll()
85+ streamsTopicService.setAll(streamsSinkConfiguration.topics)
8686 eventSink.start()
8787 streamsLog.info(" Streams Sink module initialised" )
8888 }
0 commit comments