Skip to content

Commit 8e67bce

Browse files
authored
fixes #407: Move the internal procedures calls to direct API calls (#415)
1 parent 437e5f2 commit 8e67bce

File tree

9 files changed

+105
-11268
lines changed

9 files changed

+105
-11268
lines changed

common/src/main/kotlin/streams/config/StreamsConfig.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import streams.extensions.databaseManagementService
1515
import streams.extensions.getDefaultDbName
1616
import streams.extensions.isAvailable
1717
import streams.utils.Neo4jUtils
18+
import streams.utils.ProcedureUtils
1819
import streams.utils.StreamsUtils
1920
import java.io.File
2021
import java.util.concurrent.ConcurrentHashMap
@@ -117,7 +118,7 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
117118
log.warn("${getInstanceWaitTimeout()} ms have passed and the instance is not online, the Streams plugin will not started")
118119
return@runBlocking
119120
}
120-
if (Neo4jUtils.isCluster(dbms, log)) {
121+
if (ProcedureUtils.isCluster(dbms)) {
121122
log.info("We're in cluster instance waiting for the ${StreamsUtils.LEADER}s to be elected in each database")
122123
// in case is a cluster we wait for the correct cluster formation => LEADER elected
123124
Neo4jUtils.waitForTheLeaders(dbms, log) { configStart() }

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 2 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import org.neo4j.dbms.api.DatabaseManagementService
88
import org.neo4j.graphdb.QueryExecutionException
99
import org.neo4j.kernel.internal.GraphDatabaseAPI
1010
import org.neo4j.logging.Log
11-
import org.neo4j.logging.internal.LogService
1211
import streams.extensions.execute
1312
import java.lang.reflect.InvocationTargetException
1413
import kotlin.streams.toList
@@ -26,7 +25,7 @@ object Neo4jUtils {
2625
return false
2726
}
2827

29-
return availableAction() && getMemberRole(db).equals(StreamsUtils.LEADER, ignoreCase = true)
28+
return availableAction() && ProcedureUtils.clusterMemberRole(db).equals(StreamsUtils.LEADER, ignoreCase = true)
3029
} catch (e: QueryExecutionException) {
3130
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
3231
return availableAction()
@@ -44,36 +43,7 @@ object Neo4jUtils {
4443
false
4544
}
4645

47-
fun getLogService(db: GraphDatabaseAPI): LogService = db.dependencyResolver
48-
.resolveDependency(LogService::class.java)
49-
50-
fun isCluster(db: GraphDatabaseAPI, log: Log? = null): Boolean {
51-
try {
52-
return db.execute("CALL dbms.cluster.overview()") {
53-
if (it.hasNext()) {
54-
if (log?.isDebugEnabled == true) {
55-
log?.debug(it.resultAsString())
56-
}
57-
}
58-
true
59-
}
60-
} catch (e: QueryExecutionException) {
61-
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
62-
return false
63-
}
64-
throw e
65-
}
66-
}
67-
68-
fun isCluster(dbms: DatabaseManagementService, log: Log? = null): Boolean = dbms.listDatabases()
69-
.firstOrNull { it != StreamsUtils.SYSTEM_DATABASE_NAME }
70-
?.let { dbms.database(it) as GraphDatabaseAPI }
71-
?.let { isCluster(it, log) } ?: false
72-
73-
private fun getMemberRole(db: GraphDatabaseAPI) = db.execute("CALL dbms.cluster.role(\$database)",
74-
mapOf("database" to db.databaseName())) { it.columnAs<String>("role").next() }
75-
76-
fun clusterHasLeader(db: GraphDatabaseAPI): Boolean = try {
46+
private fun clusterHasLeader(db: GraphDatabaseAPI): Boolean = try {
7747
db.execute("""
7848
|CALL dbms.cluster.overview() YIELD databases
7949
|RETURN databases[${'$'}database] AS role
@@ -98,34 +68,6 @@ object Neo4jUtils {
9868
null
9969
}
10070

101-
fun executeInLeader(db: GraphDatabaseAPI,
102-
log: Log,
103-
timeout: Long = 120000,
104-
availableAction: () -> Boolean,
105-
action: () -> Unit) {
106-
GlobalScope.launch(Dispatchers.IO) {
107-
val start = System.currentTimeMillis()
108-
val delay: Long = 2000
109-
while (!clusterHasLeader(db) && System.currentTimeMillis() - start < timeout) {
110-
log.info("${StreamsUtils.LEADER} not found, new check comes in $delay milliseconds...")
111-
delay(delay)
112-
}
113-
executeInWriteableInstance(db, availableAction, action)
114-
}
115-
}
116-
117-
fun waitForTheLeader(db: GraphDatabaseAPI, log: Log, timeout: Long = 120000, action: () -> Unit) {
118-
GlobalScope.launch(Dispatchers.IO) {
119-
val start = System.currentTimeMillis()
120-
val delay: Long = 2000
121-
while (!clusterHasLeader(db) && System.currentTimeMillis() - start < timeout) {
122-
log.info("${StreamsUtils.LEADER} not found, new check comes in $delay milliseconds...")
123-
delay(delay)
124-
}
125-
action()
126-
}
127-
}
128-
12971
fun isClusterCorrectlyFormed(dbms: DatabaseManagementService) = dbms.listDatabases()
13072
.filterNot { it == StreamsUtils.SYSTEM_DATABASE_NAME }
13173
.map { dbms.database(it) as GraphDatabaseAPI }
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package streams.utils
2+
3+
import org.neo4j.dbms.api.DatabaseManagementService
4+
import org.neo4j.exceptions.UnsatisfiedDependencyException
5+
import org.neo4j.kernel.impl.factory.DbmsInfo
6+
import org.neo4j.kernel.internal.GraphDatabaseAPI
7+
import org.neo4j.logging.internal.LogService
8+
import streams.extensions.execute
9+
import java.lang.invoke.MethodHandles
10+
import java.lang.invoke.MethodType
11+
12+
13+
object ProcedureUtils {
14+
@JvmStatic
15+
private val coreMetadata: Class<*>? = try {
16+
Class.forName("com.neo4j.causalclustering.core.consensus.CoreMetaData")
17+
} catch (e: ClassNotFoundException) {
18+
null
19+
}
20+
21+
@JvmStatic
22+
private val isLeaderMethodHandle = coreMetadata?.let {
23+
val lookup = MethodHandles.lookup()
24+
lookup.findVirtual(it, "isLeader", MethodType.methodType(Boolean::class.java))
25+
.asType(MethodType.methodType(Boolean::class.java, Any::class.java))
26+
}
27+
28+
fun clusterMemberRole(db: GraphDatabaseAPI): String {
29+
val fallback: (Exception?) -> String = { e: Exception? ->
30+
val userLog = db.dependencyResolver
31+
.resolveDependency(LogService::class.java)
32+
.getUserLog(ProcedureUtils::class.java)
33+
e?.let { userLog.warn("Cannot call the APIs, trying with the cypher query", e) }
34+
?: userLog.warn("Cannot call the APIs, trying with the cypher query")
35+
db.execute("CALL dbms.cluster.role(\$database)",
36+
mapOf("database" to db.databaseName())
37+
) { it.columnAs<String>("role").next() }
38+
}
39+
val execute = {
40+
coreMetadata?.let {
41+
try {
42+
val raftMachine: Any = db.dependencyResolver.resolveDependency(coreMetadata)
43+
val isLeader = isLeaderMethodHandle!!.invokeExact(raftMachine) as Boolean
44+
if (isLeader) "LEADER" else "FOLLOWER"
45+
} catch (e: UnsatisfiedDependencyException) {
46+
"LEADER"
47+
}
48+
} ?: "LEADER"
49+
}
50+
return executeOrFallback(execute, fallback)
51+
}
52+
53+
fun isCluster(db: GraphDatabaseAPI): Boolean = db.dbmsInfo() == DbmsInfo.CORE || db.dbmsInfo() == DbmsInfo.READ_REPLICA
54+
55+
fun isCluster(dbms: DatabaseManagementService): Boolean = dbms.listDatabases()
56+
.firstOrNull { it != StreamsUtils.SYSTEM_DATABASE_NAME }
57+
?.let { dbms.database(it) as GraphDatabaseAPI }
58+
?.let { isCluster(it) } ?: false
59+
60+
private fun <T> executeOrFallback(execute: () -> T, fallback: (Exception?) -> T): T = try {
61+
execute()
62+
} catch (e: Exception) {
63+
fallback(e)
64+
}
65+
}

common/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,6 @@ class Neo4jUtilsTest {
1919
assertTrue { isWriteableInstance }
2020
}
2121

22-
@Test
23-
fun shouldCheckIfIsACluster() {
24-
val isEnterprise = Neo4jUtils.isCluster(db)
25-
assertFalse { isEnterprise }
26-
}
27-
2822
@Test
2923
fun `should not have APOC`() {
3024
assertFalse { Neo4jUtils.hasApoc(db) }
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package streams.utils
2+
3+
import org.junit.ClassRule
4+
import org.junit.Test
5+
import org.neo4j.test.rule.ImpermanentDbmsRule
6+
import kotlin.test.assertFalse
7+
8+
class ProcedureUtilsTest {
9+
10+
companion object {
11+
@ClassRule @JvmField
12+
val db = ImpermanentDbmsRule()
13+
}
14+
15+
@Test
16+
fun shouldCheckIfIsACluster() {
17+
val isCluster = ProcedureUtils.isCluster(db)
18+
assertFalse { isCluster }
19+
}
20+
21+
}

consumer/src/main/kotlin/streams/StreamsSinkConfigurationListener.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import streams.procedures.StreamsSinkProcedures
1919
import streams.utils.ConsumerUtils
2020
import streams.utils.KafkaValidationUtils
2121
import streams.utils.Neo4jUtils
22+
import streams.utils.ProcedureUtils
2223
import streams.utils.StreamsUtils
2324

2425
class StreamsSinkConfigurationListener(private val db: GraphDatabaseAPI,
@@ -119,7 +120,7 @@ class StreamsSinkConfigurationListener(private val db: GraphDatabaseAPI,
119120
try {
120121
if (streamsSinkConfiguration.enabled) {
121122
log.info("[Sink] The Streams Sink module is starting")
122-
if (Neo4jUtils.isCluster(db, log)) {
123+
if (ProcedureUtils.isCluster(db)) {
123124
initSinkModule(streamsSinkConfiguration)
124125
} else {
125126
runInASingleInstance(streamsSinkConfiguration)

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import streams.extensions.isDefaultDb
2727
import streams.utils.ConsumerUtils
2828
import streams.utils.KafkaValidationUtils.getInvalidTopicsError
2929
import streams.utils.Neo4jUtils
30+
import streams.utils.ProcedureUtils
3031
import streams.utils.StreamsUtils
3132

3233

@@ -72,7 +73,7 @@ class KafkaEventSink(private val config: Map<String, String>,
7273
}
7374

7475
override fun start() = runBlocking { // TODO move to the abstract class
75-
if (streamsConfig.clusterOnly && !Neo4jUtils.isCluster(db)) {
76+
if (streamsConfig.clusterOnly && !ProcedureUtils.isCluster(db)) {
7677
if (log.isDebugEnabled) {
7778
log.info("""
7879
|Cannot init the Kafka Sink module as is forced to work only in a cluster env,

0 commit comments

Comments
 (0)