Skip to content

Commit 6bd66a6

Browse files
conker84mneedham
authored andcommitted
fixes #185: Consumer not working in Cluster environment
1 parent 986be24 commit 6bd66a6

File tree

7 files changed

+124
-24
lines changed

7 files changed

+124
-24
lines changed

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
11
package streams.utils
22

3+
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.GlobalScope
5+
import kotlinx.coroutines.delay
6+
import kotlinx.coroutines.launch
37
import org.neo4j.graphdb.QueryExecutionException
48
import org.neo4j.kernel.internal.GraphDatabaseAPI
59
import org.neo4j.logging.internal.LogService
10+
import org.neo4j.logging.Log
11+
import org.neo4j.logging.NullLog
612
import java.lang.reflect.InvocationTargetException
13+
import kotlin.streams.toList
714

815
object Neo4jUtils {
16+
@JvmStatic val LEADER = "LEADER"
917
fun isWriteableInstance(db: GraphDatabaseAPI): Boolean {
1018
try {
1119
val isSlave = StreamsUtils.ignoreExceptions(
@@ -19,7 +27,7 @@ object Neo4jUtils {
1927
}
2028

2129
val role = db.execute("CALL dbms.cluster.role()").columnAs<String>("role").next()
22-
return role.equals("LEADER", ignoreCase = true)
30+
return role.equals(LEADER, ignoreCase = true)
2331
} catch (e: QueryExecutionException) {
2432
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
2533
return true
@@ -32,4 +40,55 @@ object Neo4jUtils {
3240
return db.dependencyResolver
3341
.resolveDependency(LogService::class.java)
3442
}
43+
44+
fun isEnterpriseEdition(db: GraphDatabaseAPI): Boolean {
45+
try {
46+
return db.execute("""
47+
CALL dbms.components() YIELD edition
48+
RETURN edition = "enterprise" AS isEnterprise
49+
""".trimIndent()).columnAs<Boolean>("isEnterprise").next()
50+
} catch (e: QueryExecutionException) {
51+
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
52+
return false
53+
}
54+
throw e
55+
}
56+
}
57+
58+
fun clusterHasLeader(db: GraphDatabaseAPI): Boolean {
59+
try {
60+
return db.execute("""
61+
CALL dbms.cluster.overview() YIELD role
62+
RETURN role
63+
""".trimIndent())
64+
.columnAs<String>("role")
65+
.stream()
66+
.toList()
67+
.contains(LEADER)
68+
} catch (e: QueryExecutionException) {
69+
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
70+
return false
71+
}
72+
throw e
73+
}
74+
}
75+
76+
fun <T> executeInWriteableInstance(db: GraphDatabaseAPI, action: () -> T) {
77+
if (isWriteableInstance(db)) {
78+
action()
79+
}
80+
}
81+
82+
fun executeInLeader(db: GraphDatabaseAPI, log: Log, timeout: Long = 120000, action: () -> Unit) {
83+
GlobalScope.launch(Dispatchers.IO) {
84+
val start = System.currentTimeMillis()
85+
val delay: Long = 2000
86+
while (!clusterHasLeader(db) && System.currentTimeMillis() - start < timeout) {
87+
log.info("$LEADER not found, new check comes in $delay milliseconds...")
88+
delay(delay)
89+
}
90+
executeInWriteableInstance(db, action)
91+
}
92+
}
93+
3594
}

common/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.junit.BeforeClass
55
import org.junit.Test
66
import org.neo4j.kernel.internal.GraphDatabaseAPI
77
import org.neo4j.test.TestGraphDatabaseFactory
8+
import kotlin.test.assertFalse
89
import kotlin.test.assertTrue
910

1011
class Neo4jUtilsTest {
@@ -32,4 +33,10 @@ class Neo4jUtilsTest {
3233
assertTrue { isWriteableInstance }
3334
}
3435

36+
@Test
37+
fun shouldCheckIfIsEnterpriseEdition() {
38+
val isEnterprise = Neo4jUtils.isEnterpriseEdition(db)
39+
assertFalse { isEnterprise }
40+
}
41+
3542
}

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package streams
22

33
import org.neo4j.kernel.configuration.Config
4+
import org.neo4j.kernel.internal.GraphDatabaseAPI
45
import org.neo4j.logging.Log
56

67
abstract class StreamsEventSink(private val config: Config,
78
private val queryExecution: StreamsEventSinkQueryExecution,
89
private val streamsTopicService: StreamsTopicService,
9-
private val log: Log) {
10+
private val log: Log,
11+
private val db: GraphDatabaseAPI) {
1012

1113
abstract fun stop()
1214

@@ -36,13 +38,14 @@ abstract class StreamsEventConsumerFactory {
3638

3739
object StreamsEventSinkFactory {
3840
fun getStreamsEventSink(config: Config, streamsQueryExecution: StreamsEventSinkQueryExecution,
39-
streamsTopicService: StreamsTopicService, log: Log): StreamsEventSink {
41+
streamsTopicService: StreamsTopicService, log: Log, db: GraphDatabaseAPI): StreamsEventSink {
4042
return Class.forName(config.raw.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
4143
.getConstructor(Config::class.java,
4244
StreamsEventSinkQueryExecution::class.java,
4345
StreamsTopicService::class.java,
44-
Log::class.java)
45-
.newInstance(config, streamsQueryExecution, streamsTopicService, log) as StreamsEventSink
46+
Log::class.java,
47+
GraphDatabaseAPI::class.java)
48+
.newInstance(config, streamsQueryExecution, streamsTopicService, log, db) as StreamsEventSink
4649
}
4750
}
4851

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,28 +53,30 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
5353
log.getUserLog(StreamsEventSinkQueryExecution::class.java),
5454
strategyMap)
5555

56-
// Create and start the Sink
56+
// Create the Sink
57+
val sinkLog = log.getUserLog(StreamsEventSinkFactory::class.java)
5758
eventSink = StreamsEventSinkFactory
5859
.getStreamsEventSink(configuration,
5960
streamsQueryExecution,
6061
streamsTopicService,
61-
log.getUserLog(StreamsEventSinkFactory::class.java))
62-
eventSink.start()
63-
if (Neo4jUtils.isWriteableInstance(db)) {
64-
if (streamsLog.isDebugEnabled) {
65-
streamsLog.debug("Subscribed topics with Cypher queries: ${streamsTopicService.getAllCypherTemplates()}")
66-
streamsLog.debug("Subscribed topics with CDC configuration: ${streamsTopicService.getAllCDCTopics()}")
67-
} else {
68-
streamsLog.info("Subscribed topics: ${streamsTopicService.getTopics()}")
69-
}
62+
63+
sinkLog,
64+
db)
65+
// start the Sink
66+
if (Neo4jUtils.isEnterpriseEdition(db)) {
67+
sinkLog.info("The Sink module is running in an enterprise edition, checking for the ${Neo4jUtils.LEADER}")
68+
Neo4jUtils.executeInLeader(db, sinkLog) { initSinkModule() }
69+
} else {
70+
// check if is writeable instance
71+
Neo4jUtils.executeInWriteableInstance(db) { initSinkModule() }
7072
}
7173

7274
// Register required services for the Procedures
7375
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
7476
eventSink = StreamsEventSinkFactory.getStreamsEventSink(configuration,
7577
streamsQueryExecution,
7678
streamsTopicService,
77-
log.getUserLog(StreamsEventSinkFactory::class.java))
79+
log.getUserLog(StreamsEventSinkFactory::class.java), db)
7880
eventSink.start()
7981
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
8082
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
@@ -87,6 +89,11 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
8789
}
8890
}
8991

92+
private fun initSinkModule() {
93+
eventSink.start()
94+
streamsLog.info("Streams Sink module initialised")
95+
}
96+
9097
override fun stop() {
9198
try {
9299
StreamsUtils.ignoreExceptions({ eventSink.stop() }, UninitializedPropertyAccessException::class.java)

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
66
import streams.service.Topics
77

88

9-
private object StreamsSinkConfigurationConstants {
9+
object StreamsSinkConfigurationConstants {
1010
const val STREAMS_CONFIG_PREFIX: String = "streams."
1111
const val ENABLED = "sink.enabled"
1212
const val PROCEDURES_ENABLED = "procedures.enabled"

consumer/src/main/kotlin/streams/StreamsTopicService.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
1818
private val properties: GraphProperties = db.dependencyResolver.resolveDependency(EmbeddedProxySPI::class.java).newGraphPropertiesProxy()
1919
private val log = Neo4jUtils.getLogService(db).getUserLog(StreamsTopicService::class.java)
2020

21-
fun clearAll() {
21+
fun clearAll() { // TODO move to Neo4jUtils#executeInWriteableInstance
2222
if (!Neo4jUtils.isWriteableInstance(db)) {
2323
return
2424
}

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,21 @@ import kotlinx.coroutines.*
44
import org.apache.kafka.clients.consumer.*
55
import org.apache.kafka.common.TopicPartition
66
import org.neo4j.kernel.configuration.Config
7+
import org.neo4j.kernel.internal.GraphDatabaseAPI
78
import org.neo4j.logging.Log
89
import streams.*
9-
import streams.kafka.KafkaTopicConfig.Companion.toTopicPartitionMap
1010
import streams.serialization.JSONUtils
11+
import streams.utils.Neo4jUtils
1112
import streams.utils.StreamsUtils
1213
import java.util.concurrent.ConcurrentHashMap
14+
import java.util.concurrent.TimeUnit
1315

1416

1517
class KafkaEventSink(private val config: Config,
1618
private val queryExecution: StreamsEventSinkQueryExecution,
1719
private val streamsTopicService: StreamsTopicService,
18-
private val log: Log): StreamsEventSink(config, queryExecution, streamsTopicService, log) {
20+
private val log: Log,
21+
private val db: GraphDatabaseAPI): StreamsEventSink(config, queryExecution, streamsTopicService, log, db) {
1922

2023
private lateinit var eventConsumer: StreamsEventConsumer
2124
private lateinit var job: Job
@@ -46,15 +49,28 @@ class KafkaEventSink(private val config: Config,
4649

4750
override fun start() {
4851
val streamsConfig = StreamsSinkConfiguration.from(config)
52+
val topics = streamsTopicService.getTopics()
53+
val isWriteableInstance = Neo4jUtils.isWriteableInstance(db)
4954
if (!streamsConfig.enabled) {
55+
if (topics.isNotEmpty() && isWriteableInstance) {
56+
log.warn("You configured the following topics: $topics, in order to make the Sink work please set the `${StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX}${StreamsSinkConfigurationConstants.ENABLED}` to `true`")
57+
}
5058
return
5159
}
5260
log.info("Starting the Kafka Sink")
5361
this.eventConsumer = getEventConsumerFactory()
5462
.createStreamsEventConsumer(config.raw, log)
55-
.withTopics(streamsTopicService.getTopics())
63+
.withTopics(topics)
5664
this.eventConsumer.start()
5765
this.job = createJob()
66+
if (isWriteableInstance) {
67+
if (log.isDebugEnabled) {
68+
log.debug("Subscribed topics with Cypher queries: ${streamsTopicService.getAllCypherTemplates()}")
69+
log.debug("Subscribed topics with CDC configuration: ${streamsTopicService.getAllCDCTopics()}")
70+
} else {
71+
log.info("Subscribed topics: $topics")
72+
}
73+
}
5874
log.info("Kafka Sink started")
5975
}
6076

@@ -82,11 +98,19 @@ class KafkaEventSink(private val config: Config,
8298
return GlobalScope.launch(Dispatchers.IO) { // TODO improve exception management
8399
try {
84100
while (isActive) {
85-
eventConsumer.read { topic, data ->
101+
if (Neo4jUtils.isWriteableInstance(db)) {
102+
eventConsumer.read { topic, data ->
103+
if (log.isDebugEnabled) {
104+
log.debug("Reading data from topic $topic")
105+
}
106+
queryExecution.writeForTopic(topic, data)
107+
}
108+
} else {
109+
val timeMillis = TimeUnit.MILLISECONDS.toMinutes(5)
86110
if (log.isDebugEnabled) {
87-
log.debug("Reading data from topic $topic")
111+
log.debug("Not in a writeable instance, new check in $timeMillis millis")
88112
}
89-
queryExecution.writeForTopic(topic, data)
113+
delay(timeMillis)
90114
}
91115
}
92116
eventConsumer.stop()

0 commit comments

Comments
 (0)