Skip to content

Commit 9297f66

Browse files
authored
add sink lifecyle procedures (#328)
1 parent d68caa2 commit 9297f66

File tree

15 files changed

+462
-5
lines changed

15 files changed

+462
-5
lines changed

common/src/main/kotlin/streams/config/StreamsConfig.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagem
2828
const val SINK_ENABLED = "streams.sink.enabled"
2929
const val SINK_ENABLED_VALUE = false
3030
const val DEFAULT_PATH = "."
31+
const val CHECK_APOC_TIMEOUT = "check.apoc.timeout"
32+
const val CHECK_APOC_INTERVAL = "check.apoc.interval"
3133
private var afterInitListeners = mutableListOf<((MutableMap<String, String>) -> Unit)>()
3234

3335
fun registerListener(after: (MutableMap<String, String>) -> Unit) {
@@ -54,8 +56,8 @@ data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagem
5456
private fun loadConfiguration() {
5557
val properties = neo4jConfAsProperties()
5658

57-
val filteredValues = filterProperties(properties,
58-
{ key -> !SUPPORTED_PREFIXES.find { key.toString().startsWith(it) }.isNullOrBlank() })
59+
val filteredValues = filterProperties(properties)
60+
{ key -> !SUPPORTED_PREFIXES.find { key.toString().startsWith(it) }.isNullOrBlank() }
5961

6062
if (log.isDebugEnabled) {
6163
log.debug("Neo4j Streams Global configuration from neo4j.conf file: $filteredValues")
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package streams.events
2+
3+
enum class StreamsPluginStatus { RUNNING, STOPPED, UNKNOWN }

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ object Neo4jUtils {
3737
}
3838
}
3939

40+
fun hasApoc(db: GraphDatabaseAPI): Boolean = try {
41+
db.execute("RETURN apoc.version() AS version") {
42+
it.columnAs<String>("version").next()
43+
true
44+
}
45+
} catch (e: QueryExecutionException) {
46+
false
47+
}
48+
4049
fun getLogService(db: GraphDatabaseAPI): LogService {
4150
return db.dependencyResolver
4251
.resolveDependency(LogService::class.java)

common/src/main/kotlin/streams/utils/StreamsUtils.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package streams.utils
22

3+
import kotlinx.coroutines.runBlocking
4+
import kotlinx.coroutines.delay
5+
36
object StreamsUtils {
47

58
const val UNWIND: String = "UNWIND \$events AS event"
@@ -22,4 +25,14 @@ object StreamsUtils {
2225
}
2326
}
2427

28+
fun blockUntilTrueOrTimeout(timeout: Long, delay: Long = 1000, action: () -> Boolean): Boolean = runBlocking {
29+
val start = System.currentTimeMillis()
30+
var success = action()
31+
while (System.currentTimeMillis() - start < timeout && !success) {
32+
delay(delay)
33+
success = action()
34+
}
35+
success
36+
}
37+
2538
}

common/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,9 @@ class Neo4jUtilsTest {
2525
assertFalse { isEnterprise }
2626
}
2727

28+
@Test
29+
fun `should not have APOC`() {
30+
assertFalse { Neo4jUtils.hasApoc(db) }
31+
}
32+
2833
}

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package streams
22

3-
import org.neo4j.configuration.Config
43
import org.neo4j.kernel.internal.GraphDatabaseAPI
54
import org.neo4j.logging.Log
65
import streams.config.StreamsConfig
6+
import streams.events.StreamsPluginStatus
77

88
abstract class StreamsEventSink(private val config: StreamsConfig,
99
private val queryExecution: StreamsEventSinkQueryExecution,
@@ -24,6 +24,8 @@ abstract class StreamsEventSink(private val config: StreamsConfig,
2424

2525
open fun printInvalidTopics() {}
2626

27+
abstract fun status(): StreamsPluginStatus
28+
2729
}
2830

2931
object StreamsEventSinkFactory {

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package streams
22

3+
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.GlobalScope
5+
import kotlinx.coroutines.launch
36
import org.neo4j.dbms.api.DatabaseManagementService
47
import org.neo4j.kernel.availability.AvailabilityGuard
58
import org.neo4j.kernel.availability.AvailabilityListener
@@ -80,6 +83,7 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
8083
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
8184
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
8285
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
86+
StreamsSinkProcedures.registerStreamsEventSink(eventSink)
8387
} catch (e: Exception) {
8488
streamsLog.error("Error initializing the streaming sink:", e)
8589
}
@@ -88,6 +92,28 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
8892
}
8993

9094
private fun initSinkModule(streamsTopicService: StreamsTopicService, streamsSinkConfiguration: StreamsSinkConfiguration) {
95+
if (streamsSinkConfiguration.checkApocTimeout > -1) {
96+
GlobalScope.launch(Dispatchers.IO) {
97+
val success = StreamsUtils.blockUntilTrueOrTimeout(streamsSinkConfiguration.checkApocTimeout, streamsSinkConfiguration.checkApocInterval) {
98+
val hasApoc = Neo4jUtils.hasApoc(db)
99+
if (!hasApoc && streamsLog.isDebugEnabled) {
100+
streamsLog.debug("APOC not loaded yet, next check in ${streamsSinkConfiguration.checkApocInterval} ms")
101+
}
102+
hasApoc
103+
}
104+
if (success) {
105+
initSink(streamsTopicService, streamsSinkConfiguration)
106+
} else {
107+
streamsLog.info("Streams Sink plugin not loaded as APOC are not installed")
108+
}
109+
}
110+
} else {
111+
initSink(streamsTopicService, streamsSinkConfiguration)
112+
}
113+
114+
}
115+
116+
private fun initSink(streamsTopicService: StreamsTopicService, streamsSinkConfiguration: StreamsSinkConfiguration) {
91117
streamsTopicService.clearAll()
92118
streamsTopicService.setAll(streamsSinkConfiguration.topics)
93119
eventSink.start()

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package streams
22

33
import streams.config.StreamsConfig
4+
import streams.extensions.toPointCase
5+
import streams.serialization.JSONUtils
46
import streams.service.TopicUtils
57
import streams.service.TopicValidationException
68
import streams.service.Topics
@@ -10,8 +12,27 @@ data class StreamsSinkConfiguration(val enabled: Boolean = StreamsConfig.SINK_EN
1012
val proceduresEnabled: Boolean = StreamsConfig.PROCEDURES_ENABLED_VALUE,
1113
val topics: Topics = Topics(),
1214
val errorConfig: Map<String,Any?> = emptyMap(),
15+
val checkApocTimeout: Long = -1,
16+
val checkApocInterval: Long = 1000,
1317
val sourceIdStrategyConfig: SourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()) {
1418

19+
fun asMap(): Map<String, Any?> {
20+
val configMap = JSONUtils.asMap(this)
21+
.filterKeys { it != "topics" && it != "enabled" && it != "proceduresEnabled" && !it.startsWith("check") }
22+
.mapKeys { it.key.toPointCase() }
23+
.mapKeys {
24+
when (it.key) {
25+
"error.config" -> "streams.sink.errors"
26+
"procedures.enabled" -> "streams.${it.key}"
27+
else -> if (it.key.startsWith("streams.sink")) it.key else "streams.sink.${it.key}"
28+
}
29+
}
30+
val topicMap = this.topics.asMap()
31+
.mapKeys { it.key.key }
32+
val invalidTopics = mapOf("invalid_topics" to this.topics.invalid)
33+
return (configMap + topicMap + invalidTopics)
34+
}
35+
1536
companion object {
1637
fun from(cfg: StreamsConfig, dbName: String, invalidTopics: List<String> = emptyList()): StreamsSinkConfiguration {
1738
val default = StreamsSinkConfiguration()
@@ -39,10 +60,19 @@ data class StreamsSinkConfiguration(val enabled: Boolean = StreamsConfig.SINK_EN
3960
.filterKeys { it.startsWith("streams.sink.error") }
4061
.mapKeys { it.key.substring("streams.sink.".length) }
4162

63+
4264
return default.copy(enabled = cfg.isSinkEnabled(dbName),
4365
proceduresEnabled = cfg.hasProceduresEnabled(dbName),
4466
topics = topics,
4567
errorConfig = errorHandler,
68+
checkApocTimeout = cfg.config.getOrDefault("streams.${StreamsConfig.CHECK_APOC_TIMEOUT}",
69+
default.checkApocTimeout)
70+
.toString()
71+
.toLong(),
72+
checkApocInterval = cfg.config.getOrDefault("streams.${StreamsConfig.CHECK_APOC_INTERVAL}",
73+
default.checkApocInterval)
74+
.toString()
75+
.toLong(),
4676
sourceIdStrategyConfig = sourceIdStrategyConfig)
4777
}
4878

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import streams.StreamsEventSinkQueryExecution
2020
import streams.StreamsSinkConfiguration
2121
import streams.StreamsTopicService
2222
import streams.config.StreamsConfig
23+
import streams.events.StreamsPluginStatus
2324
import streams.service.errors.ErrorService
2425
import streams.service.errors.KafkaErrorService
2526
import streams.utils.KafkaValidationUtils.getInvalidTopicsError
@@ -77,6 +78,12 @@ class KafkaEventSink(private val config: StreamsConfig,
7778
}
7879

7980
override fun start() { // TODO move to the abstract class
81+
if (StreamsPluginStatus.RUNNING == status()) {
82+
if (log.isDebugEnabled) {
83+
log.debug("Kafka Sink is already started.")
84+
}
85+
return
86+
}
8087
val streamsConfig = StreamsSinkConfiguration.from(config, db.databaseName())
8188
val topics = streamsTopicService.getTopics()
8289
val isWriteableInstance = Neo4jUtils.isWriteableInstance(db)
@@ -137,7 +144,7 @@ class KafkaEventSink(private val config: StreamsConfig,
137144
}
138145
TimeUnit.SECONDS.toMillis(1)
139146
} else {
140-
val timeMillis = TimeUnit.MINUTES.toMillis(5)
147+
val timeMillis = TimeUnit.MINUTES.toMillis(3)
141148
if (log.isDebugEnabled) {
142149
log.debug("Not in a writeable instance, new check in $timeMillis millis")
143150
}
@@ -166,6 +173,11 @@ class KafkaEventSink(private val config: StreamsConfig,
166173
}
167174
}, UninitializedPropertyAccessException::class.java)
168175
}
176+
177+
override fun status(): StreamsPluginStatus = when (this::job.isInitialized && this.job.isActive) {
178+
true -> StreamsPluginStatus.RUNNING
179+
else -> StreamsPluginStatus.STOPPED
180+
}
169181

170182
}
171183

consumer/src/main/kotlin/streams/procedures/StreamsSinkProcedures.kt

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,36 @@
11
package streams.procedures
22

3+
import org.apache.commons.lang3.exception.ExceptionUtils
4+
import org.neo4j.graphdb.GraphDatabaseService
5+
import org.neo4j.kernel.internal.GraphDatabaseAPI
36
import org.neo4j.logging.Log
4-
import org.neo4j.procedure.*
7+
import org.neo4j.procedure.Context
8+
import org.neo4j.procedure.Description
9+
import org.neo4j.procedure.Mode
10+
import org.neo4j.procedure.Name
11+
import org.neo4j.procedure.Procedure
512
import streams.StreamsEventConsumer
613
import streams.StreamsEventConsumerFactory
14+
import streams.StreamsEventSink
715
import streams.StreamsEventSinkConfigMapper
816
import streams.StreamsSinkConfiguration
917
import streams.config.StreamsConfig
18+
import streams.events.StreamsPluginStatus
19+
import streams.utils.Neo4jUtils
20+
import java.util.stream.Collectors
1021
import java.util.stream.Stream
1122

1223
class StreamResult(@JvmField val event: Map<String, *>)
24+
class KeyValueResult(@JvmField val name: String, @JvmField val value: Any?)
1325

1426
class StreamsSinkProcedures {
1527

1628
@JvmField @Context
1729
var log: Log? = null
1830

31+
@JvmField @Context
32+
var db: GraphDatabaseService? = null
33+
1934
@Procedure(mode = Mode.READ, name = "streams.consume")
2035
@Description("streams.consume(topic, {timeout: <long value>, from: <string>, groupId: <string>, commit: <boolean>, partitions:[{partition: <number>, offset: <number>}]}) " +
2136
"YIELD event - Allows to consume custom topics")
@@ -35,6 +50,71 @@ class StreamsSinkProcedures {
3550
return data.map { StreamResult(mapOf("data" to it)) }.stream()
3651
}
3752

53+
@Procedure("streams.sink.start")
54+
fun sinkStart(): Stream<KeyValueResult> {
55+
checkEnabled()
56+
return checkLeader {
57+
try {
58+
streamsEventSink?.start()
59+
sinkStatus()
60+
} catch (e: Exception) {
61+
log?.error("Cannot start the Sink because of the following exception", e)
62+
Stream.concat(sinkStatus(),
63+
Stream.of(KeyValueResult("exception", ExceptionUtils.getStackTrace(e))))
64+
}
65+
}
66+
}
67+
68+
@Procedure("streams.sink.stop")
69+
fun sinkStop(): Stream<KeyValueResult> {
70+
checkEnabled()
71+
return checkLeader {
72+
try {
73+
streamsEventSink?.stop()
74+
sinkStatus()
75+
} catch (e: Exception) {
76+
log?.error("Cannot stopped the Sink because of the following exception", e)
77+
Stream.concat(sinkStatus(),
78+
Stream.of(KeyValueResult("exception", ExceptionUtils.getStackTrace(e))))
79+
}
80+
}
81+
}
82+
83+
@Procedure("streams.sink.restart")
84+
fun sinkRestart(): Stream<KeyValueResult> {
85+
val stopped = sinkStop().collect(Collectors.toList())
86+
val hasError = stopped.any { it.name == "exception" }
87+
if (hasError) {
88+
return stopped.stream()
89+
}
90+
return sinkStart()
91+
}
92+
93+
@Procedure("streams.sink.config")
94+
fun sinkConfig(): Stream<KeyValueResult> {
95+
checkEnabled()
96+
return checkLeader {
97+
streamsSinkConfiguration.asMap()
98+
.entries.stream()
99+
.map { KeyValueResult(it.key, it.value) }
100+
}
101+
}
102+
103+
@Procedure("streams.sink.status")
104+
fun sinkStatus(): Stream<KeyValueResult> {
105+
checkEnabled()
106+
return checkLeader {
107+
val value = (streamsEventSink?.status() ?: StreamsPluginStatus.UNKNOWN).toString()
108+
Stream.of(KeyValueResult("status", value))
109+
}
110+
}
111+
112+
private fun checkLeader(lambda: () -> Stream<KeyValueResult>): Stream<KeyValueResult> = if (Neo4jUtils.isWriteableInstance(db as GraphDatabaseAPI)) {
113+
lambda()
114+
} else {
115+
Stream.of(KeyValueResult("error", "You can use this procedure only in the LEADER or in a single instance configuration."))
116+
}
117+
38118
private fun readData(topic: String, procedureConfig: Map<String, Any>, consumerConfig: Map<String, String>): List<Any> {
39119
val cfg = procedureConfig.mapValues { if (it.key != "partitions") it.value else mapOf(topic to it.value) }
40120
val timeout = cfg.getOrDefault("timeout", 1000).toString().toLong()
@@ -80,6 +160,7 @@ class StreamsSinkProcedures {
80160
private lateinit var streamsSinkConfiguration: StreamsSinkConfiguration
81161
private lateinit var streamsEventConsumerFactory: StreamsEventConsumerFactory
82162
private lateinit var streamsEventSinkConfigMapper: StreamsEventSinkConfigMapper
163+
private var streamsEventSink: StreamsEventSink? = null
83164

84165
fun registerStreamsEventSinkConfigMapper(streamsEventSinkConfigMapper: StreamsEventSinkConfigMapper) {
85166
this.streamsEventSinkConfigMapper = streamsEventSinkConfigMapper
@@ -92,5 +173,9 @@ class StreamsSinkProcedures {
92173
fun registerStreamsEventConsumerFactory(streamsEventConsumerFactory: StreamsEventConsumerFactory) {
93174
this.streamsEventConsumerFactory = streamsEventConsumerFactory
94175
}
176+
177+
fun registerStreamsEventSink(streamsEventSink: StreamsEventSink) {
178+
this.streamsEventSink = streamsEventSink
179+
}
95180
}
96181
}

0 commit comments

Comments
 (0)