Skip to content

Commit 3d92c99

Browse files
committed
fixes #191: The latest version of plugin fails to start polling messages
1 parent c91d5c5 commit 3d92c99

File tree

3 files changed

+15
-25
lines changed

3 files changed

+15
-25
lines changed

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@ object Neo4jUtils {
4141
.resolveDependency(LogService::class.java)
4242
}
4343

44-
fun isEnterpriseEdition(db: GraphDatabaseAPI): Boolean {
44+
fun isCluster(db: GraphDatabaseAPI): Boolean {
4545
try {
46-
return db.execute("""
47-
CALL dbms.components() YIELD edition
48-
RETURN edition = "enterprise" AS isEnterprise
49-
""".trimIndent()).columnAs<Boolean>("isEnterprise").next()
46+
db.execute("CALL dbms.cluster.role()").columnAs<String>("role").next()
47+
return true
5048
} catch (e: QueryExecutionException) {
5149
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
5250
return false

common/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package streams.utils
22

3-
import org.junit.AfterClass
4-
import org.junit.BeforeClass
5-
import org.junit.Test
3+
import org.junit.*
64
import org.neo4j.kernel.internal.GraphDatabaseAPI
75
import org.neo4j.test.TestGraphDatabaseFactory
86
import kotlin.test.assertFalse
@@ -34,8 +32,8 @@ class Neo4jUtilsTest {
3432
}
3533

3634
@Test
37-
fun shouldCheckIfIsEnterpriseEdition() {
38-
val isEnterprise = Neo4jUtils.isEnterpriseEdition(db)
35+
fun shouldCheckIfIsACluster() {
36+
val isEnterprise = Neo4jUtils.isCluster(db)
3937
assertFalse { isEnterprise }
4038
}
4139

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
3030

3131
class StreamsEventLifecycle(private val dependencies: StreamsEventSinkExtensionFactory.Dependencies): LifecycleAdapter() {
3232
private val db = dependencies.graphdatabaseAPI()
33-
private val log = dependencies.log()
33+
private val logService = dependencies.log()
3434
private val configuration = dependencies.config()
35-
private var streamsLog = log.getUserLog(StreamsEventLifecycle::class.java)
35+
private var streamsLog = logService.getUserLog(StreamsEventLifecycle::class.java)
3636

3737
private lateinit var eventSink: StreamsEventSink
3838

@@ -42,42 +42,36 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
4242
override fun unavailable() {}
4343

4444
override fun available() {
45+
streamsLog.info("Initialising the Streams Sink module")
4546
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration)
46-
4747
val streamsTopicService = StreamsTopicService(db)
4848
streamsTopicService.clearAll()
4949
streamsTopicService.setAll(streamsSinkConfiguration.topics)
5050
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
5151
streamsSinkConfiguration.sourceIdStrategyConfig)
5252
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db,
53-
log.getUserLog(StreamsEventSinkQueryExecution::class.java),
53+
logService.getUserLog(StreamsEventSinkQueryExecution::class.java),
5454
strategyMap)
5555

5656
// Create the Sink
57-
val sinkLog = log.getUserLog(StreamsEventSinkFactory::class.java)
57+
val log = logService.getUserLog(StreamsEventSinkFactory::class.java)
5858
eventSink = StreamsEventSinkFactory
5959
.getStreamsEventSink(configuration,
6060
streamsQueryExecution,
6161
streamsTopicService,
62-
63-
sinkLog,
62+
log,
6463
db)
6564
// start the Sink
66-
if (Neo4jUtils.isEnterpriseEdition(db)) {
67-
sinkLog.info("The Sink module is running in an enterprise edition, checking for the ${Neo4jUtils.LEADER}")
68-
Neo4jUtils.executeInLeader(db, sinkLog) { initSinkModule() }
65+
if (Neo4jUtils.isCluster(db)) {
66+
log.info("The Sink module is running in a cluster, checking for the ${Neo4jUtils.LEADER}")
67+
Neo4jUtils.executeInLeader(db, log) { initSinkModule() }
6968
} else {
7069
// check if is writeable instance
7170
Neo4jUtils.executeInWriteableInstance(db) { initSinkModule() }
7271
}
7372

7473
// Register required services for the Procedures
7574
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
76-
eventSink = StreamsEventSinkFactory.getStreamsEventSink(configuration,
77-
streamsQueryExecution,
78-
streamsTopicService,
79-
log.getUserLog(StreamsEventSinkFactory::class.java), db)
80-
eventSink.start()
8175
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
8276
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
8377
}

0 commit comments

Comments
 (0)