Skip to content

Commit b040aed

Browse files
authored
add the sink lifecycle procedures (#320)
* add the sink lifecycle procedures * remove call to dbms.procedures * added apoc check
1 parent 4442020 commit b040aed

File tree

18 files changed

+442
-127
lines changed

18 files changed

+442
-127
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package streams.events
2+
3+
enum class StreamsPluginStatus { RUNNING, STOPPED, UNKNOWN }

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import kotlin.streams.toList
1414

1515
object Neo4jUtils {
1616
@JvmStatic val LEADER = "LEADER"
17-
fun isWriteableInstance(db: GraphDatabaseAPI): Boolean {
17+
fun isWriteableInstance(db: GraphDatabaseAPI, isCluster: Boolean = false): Boolean {
1818
try {
1919
val isSlave = StreamsUtils.ignoreExceptions(
2020
{
@@ -26,8 +26,12 @@ object Neo4jUtils {
2626
return false
2727
}
2828

29-
val role = db.execute("CALL dbms.cluster.role()").columnAs<String>("role").next()
30-
return role.equals(LEADER, ignoreCase = true)
29+
return if (isCluster) {
30+
val role = db.execute("CALL dbms.cluster.role() YIELD role RETURN role").columnAs<String>("role").next()
31+
return role.equals(LEADER, ignoreCase = true)
32+
} else {
33+
true
34+
}
3135
} catch (e: QueryExecutionException) {
3236
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
3337
return true
@@ -53,6 +57,19 @@ object Neo4jUtils {
5357
}
5458
}
5559

60+
fun hasApoc(db: GraphDatabaseAPI): Boolean {
61+
try {
62+
db.execute("RETURN apoc.version() AS version").columnAs<String>("version").next()
63+
return true
64+
} catch (e: QueryExecutionException) {
65+
if (e.statusCode.equals("Neo.ClientError.Statement.SyntaxError", ignoreCase = true)
66+
&& e.message!!.contains("Unknown function", ignoreCase = true)) {
67+
return false
68+
}
69+
throw e
70+
}
71+
}
72+
5673
fun clusterHasLeader(db: GraphDatabaseAPI): Boolean {
5774
try {
5875
return db.execute("""

common/src/main/kotlin/streams/utils/StreamsUtils.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package streams.utils
22

3+
import kotlinx.coroutines.runBlocking
4+
import kotlinx.coroutines.delay
5+
36
object StreamsUtils {
47

58
const val UNWIND: String = "UNWIND {events} AS event"
@@ -22,4 +25,14 @@ object StreamsUtils {
2225
}
2326
}
2427

28+
fun blockUntilTrueOrTimeout(timeout: Long, delay: Long = 1000, action: () -> Boolean): Boolean = runBlocking {
29+
val start = System.currentTimeMillis()
30+
var success = action()
31+
while (System.currentTimeMillis() - start < timeout && !success) {
32+
delay(delay)
33+
success = action()
34+
}
35+
success
36+
}
37+
2538
}

common/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,9 @@ class Neo4jUtilsTest {
3737
assertFalse { isEnterprise }
3838
}
3939

40+
@Test
41+
fun `should not have APOC`() {
42+
assertFalse { Neo4jUtils.hasApoc(db) }
43+
}
44+
4045
}

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package streams
33
import org.neo4j.kernel.configuration.Config
44
import org.neo4j.kernel.internal.GraphDatabaseAPI
55
import org.neo4j.logging.Log
6+
import streams.events.StreamsPluginStatus
67

78
abstract class StreamsEventSink(private val config: Config,
89
private val queryExecution: StreamsEventSinkQueryExecution,
@@ -23,6 +24,8 @@ abstract class StreamsEventSink(private val config: Config,
2324

2425
open fun printInvalidTopics() {}
2526

27+
abstract fun status(): StreamsPluginStatus
28+
2629
}
2730

2831
object StreamsEventSinkFactory {

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package streams
22

3+
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.GlobalScope
5+
import kotlinx.coroutines.launch
36
import org.neo4j.kernel.availability.AvailabilityGuard
47
import org.neo4j.kernel.availability.AvailabilityListener
58
import org.neo4j.kernel.configuration.Config
@@ -72,6 +75,7 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
7275
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
7376
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
7477
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
78+
StreamsSinkProcedures.registerStreamsEventSink(eventSink)
7579
} catch (e: Exception) {
7680
streamsLog.error("Error initializing the streaming sink:", e)
7781
}
@@ -80,6 +84,28 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
8084
}
8185

8286
private fun initSinkModule(streamsTopicService: StreamsTopicService, streamsSinkConfiguration: StreamsSinkConfiguration) {
87+
if (streamsSinkConfiguration.checkApocTimeout > -1) {
88+
GlobalScope.launch(Dispatchers.IO) {
89+
val success = StreamsUtils.blockUntilTrueOrTimeout(streamsSinkConfiguration.checkApocTimeout, streamsSinkConfiguration.checkApocInterval) {
90+
val hasApoc = Neo4jUtils.hasApoc(db)
91+
if (!hasApoc && streamsLog.isDebugEnabled) {
92+
streamsLog.debug("APOC not loaded yet, next check in ${streamsSinkConfiguration.checkApocInterval} ms")
93+
}
94+
hasApoc
95+
}
96+
if (success) {
97+
initSink(streamsTopicService, streamsSinkConfiguration)
98+
} else {
99+
streamsLog.info("Streams Sink plugin not loaded as APOC are not installed")
100+
}
101+
}
102+
} else {
103+
initSink(streamsTopicService, streamsSinkConfiguration)
104+
}
105+
106+
}
107+
108+
private fun initSink(streamsTopicService: StreamsTopicService, streamsSinkConfiguration: StreamsSinkConfiguration) {
83109
streamsTopicService.clearAll()
84110
streamsTopicService.setAll(streamsSinkConfiguration.topics)
85111
eventSink.start()

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,28 @@
11
package streams
22

33
import org.neo4j.kernel.configuration.Config
4+
import streams.extensions.toPointCase
5+
import streams.serialization.JSONUtils
46
import streams.service.TopicUtils
57
import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
68
import streams.service.Topics
9+
import java.util.Properties
710

811

912
object StreamsSinkConfigurationConstants {
10-
const val STREAMS_CONFIG_PREFIX: String = "streams."
13+
const val CHECK_APOC_TIMEOUT = "check.apoc.timeout"
14+
const val CHECK_APOC_INTERVAL = "check.apoc.interval"
15+
const val STREAMS_CONFIG_PREFIX = "streams."
1116
const val ENABLED = "sink.enabled"
1217
const val PROCEDURES_ENABLED = "procedures.enabled"
1318
}
1419

1520
data class StreamsSinkConfiguration(val enabled: Boolean = false,
1621
val proceduresEnabled: Boolean = true,
17-
val sinkPollingInterval: Long = 10000,
1822
val topics: Topics = Topics(),
1923
val errorConfig: Map<String,Any?> = emptyMap(),
24+
val checkApocTimeout: Long = -1,
25+
val checkApocInterval: Long = 1000,
2026
val sourceIdStrategyConfig: SourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()) {
2127

2228
companion object {
@@ -47,9 +53,12 @@ data class StreamsSinkConfiguration(val enabled: Boolean = false,
4753
return default.copy(enabled = config.getOrDefault(StreamsSinkConfigurationConstants.ENABLED, default.enabled).toString().toBoolean(),
4854
proceduresEnabled = config.getOrDefault(StreamsSinkConfigurationConstants.PROCEDURES_ENABLED, default.proceduresEnabled)
4955
.toString().toBoolean(),
50-
sinkPollingInterval = config.getOrDefault("sink.polling.interval", default.sinkPollingInterval).toString().toLong(),
5156
topics = topics,
5257
errorConfig = errorHandler,
58+
checkApocTimeout = config.getOrDefault(StreamsSinkConfigurationConstants.CHECK_APOC_TIMEOUT, default.checkApocTimeout)
59+
.toString().toLong(),
60+
checkApocInterval = config.getOrDefault(StreamsSinkConfigurationConstants.CHECK_APOC_INTERVAL, default.checkApocInterval)
61+
.toString().toLong(),
5362
sourceIdStrategyConfig = sourceIdStrategyConfig)
5463
}
5564

Lines changed: 50 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,123 +1,79 @@
11
package streams
22

3-
import org.neo4j.kernel.impl.core.EmbeddedProxySPI
4-
import org.neo4j.kernel.impl.core.GraphProperties
53
import org.neo4j.kernel.internal.GraphDatabaseAPI
6-
import streams.serialization.JSONUtils
7-
import streams.service.STREAMS_TOPIC_KEY
84
import streams.service.TopicType
95
import streams.service.Topics
106
import streams.utils.Neo4jUtils
7+
import java.util.Collections
8+
import java.util.concurrent.ConcurrentHashMap
119

12-
class StreamsTopicService(private val db: GraphDatabaseAPI) {
13-
private val properties: GraphProperties = db.dependencyResolver.resolveDependency(EmbeddedProxySPI::class.java).newGraphPropertiesProxy()
10+
class StreamsTopicService(db: GraphDatabaseAPI) {
11+
private val log = Neo4jUtils.getLogService(db).getUserLog(StreamsTopicService::class.java)
1412

15-
fun clearAll() { // TODO move to Neo4jUtils#executeInWriteableInstance
16-
if (!Neo4jUtils.isWriteableInstance(db)) {
17-
return
18-
}
19-
return db.beginTx().use {
20-
val keys = properties.allProperties
21-
.filterKeys { it.startsWith(STREAMS_TOPIC_KEY) }
22-
.keys
23-
keys.forEach {
24-
properties.removeProperty(it)
25-
}
26-
it.success()
27-
}
13+
private val storage = ConcurrentHashMap<TopicType, Any>()
14+
15+
fun clearAll() {
16+
storage.clear()
2817
}
2918

30-
fun set(topicType: TopicType, data: Any) = Neo4jUtils.executeInWriteableInstance(db) {
31-
db.beginTx().use {
32-
if (properties.hasProperty(topicType.key)) {
33-
val topicData = JSONUtils.readValue<Any>(properties.getProperty(topicType.key))
34-
val newData = when (topicData) {
35-
is Map<*, *> -> topicData + (data as Map<String, Any?>)
36-
is Collection<*> -> topicData + (data as Collection<String>)
37-
else -> throw RuntimeException("Unsupported data $data for topic type $topicType")
38-
}
39-
properties.setProperty(topicType.key, JSONUtils.writeValueAsString(newData))
40-
} else {
41-
properties.setProperty(topicType.key, JSONUtils.writeValueAsString(data))
42-
}
43-
it.success()
19+
fun set(topicType: TopicType, data: Any) {
20+
val runtimeException = RuntimeException("Unsupported data $data for topic type $topicType")
21+
var oldData = storage[topicType]
22+
oldData = oldData ?: when (data) {
23+
is Map<*, *> -> emptyMap<String, Any?>()
24+
is Collection<*> -> emptyList<String>()
25+
else -> throw runtimeException
26+
}
27+
val newData = when (oldData) {
28+
is Map<*, *> -> oldData + (data as Map<String, Any?>)
29+
is Collection<*> -> oldData + (data as Collection<String>)
30+
else -> throw runtimeException
4431
}
32+
storage[topicType] = newData
4533
}
4634

47-
fun remove(topicType: TopicType, topic: String) = Neo4jUtils.executeInWriteableInstance(db) {
48-
db.beginTx().use {
49-
if (properties.hasProperty(topicType.key)) {
50-
val topicData = JSONUtils.readValue<Any>(properties.getProperty(topicType.key))
51-
val newData = when (topicData) {
52-
is Map<*, *> -> topicData.filterKeys { it.toString() != topic }
53-
is Collection<*> -> topicData.filter { it.toString() != topic }
54-
else -> throw RuntimeException("Unsupported data $topicData for topic type $topicType")
55-
}
56-
val isEmpty = when (newData) {
57-
is Map<*, *> -> newData.isEmpty()
58-
is Collection<*> -> newData.isEmpty()
59-
else -> throw RuntimeException("Unsupported data $topicData for topic type $topicType")
60-
}
61-
if (isEmpty) {
62-
properties.removeProperty(topicType.key)
63-
} else {
64-
properties.setProperty(topicType.key, JSONUtils.writeValueAsString(newData))
65-
}
66-
}
67-
it.success()
35+
fun remove(topicType: TopicType, topic: String) {
36+
val topicData = storage[topicType] ?: return
37+
38+
val runtimeException = RuntimeException("Unsupported data $topicData for topic type $topicType")
39+
val filteredData = when (topicData) {
40+
is Map<*, *> -> topicData.filterKeys { it.toString() != topic }
41+
is Collection<*> -> topicData.filter { it.toString() != topic }
42+
else -> throw runtimeException
6843
}
44+
45+
storage[topicType] = filteredData
6946
}
7047

71-
fun getTopicType(topic: String) = Neo4jUtils.executeInWriteableInstance(db) {
72-
db.beginTx().use {
73-
TopicType.values().find {
74-
if (!properties.hasProperty(it.key)) {
75-
false
76-
} else {
77-
val data = JSONUtils.readValue<Any>(properties.getProperty(it.key))
78-
when (data) {
79-
is Map<*, *> -> data.containsKey(topic)
80-
is Collection<*> -> data.contains(topic)
81-
else -> false
82-
}
48+
fun getTopicType(topic: String) = TopicType.values()
49+
.find {
50+
val topicData = storage[it]
51+
when (topicData) {
52+
is Map<*, *> -> topicData.containsKey(topic)
53+
is Collection<*> -> topicData.contains(topic)
54+
else -> false
8355
}
8456
}
85-
}
86-
}
8757

88-
fun getTopics() = db.beginTx().use {
89-
TopicType.values()
90-
.filter { properties.hasProperty(it.key) }
91-
.flatMap {
92-
val data = JSONUtils.readValue<Any>(properties.getProperty(it.key))
93-
when (data) {
94-
is Map<*, *> -> data.keys
95-
is Collection<*> -> data.toSet()
96-
else -> emptySet()
97-
}
98-
}.toSet() as Set<String>
99-
}
58+
fun getTopics() = TopicType.values()
59+
.flatMap {
60+
val data = storage[it]
61+
when (data) {
62+
is Map<*, *> -> data.keys
63+
is Collection<*> -> data.toSet()
64+
else -> emptySet<String>()
65+
}
66+
}.toSet() as Set<String>
10067

10168
fun setAll(topics: Topics) {
10269
topics.asMap().forEach { topicType, data ->
10370
set(topicType, data)
10471
}
10572
}
10673

107-
fun getCypherTemplate(topic: String) = db.beginTx().use {
108-
if (properties.hasProperty(TopicType.CYPHER.key)) {
109-
val data = JSONUtils.readValue<Map<String, String>>(properties.getProperty(TopicType.CYPHER.key))
110-
data[topic]
111-
} else {
112-
null
113-
}
114-
}
74+
fun getCypherTemplate(topic: String) = (storage.getOrDefault(TopicType.CYPHER, emptyMap<String, String>()) as Map<String, String>)
75+
.let { it[topic] }
11576

116-
fun getAll() = db.beginTx().use {
117-
TopicType.values()
118-
.filter { properties.hasProperty(it.key) }
119-
.map { it to JSONUtils.readValue<Any>(properties.getProperty(it.key)) }
120-
.toMap()
121-
}
77+
fun getAll(): Map<TopicType, Any> = Collections.unmodifiableMap(storage)
12278

12379
}

0 commit comments

Comments
 (0)