Skip to content

Commit e3f5338

Browse files
conker84jexp
authored andcommitted
fixes #96: New procedure to receive data from the stream
1 parent dd4bd77 commit e3f5338

File tree

19 files changed

+486
-149
lines changed

19 files changed

+486
-149
lines changed

common/src/main/kotlin/streams/utils/StreamsUtils.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ object StreamsUtils {
1212
return try {
1313
action()
1414
} catch (e: Throwable) {
15-
when (e::class.java) {
15+
if (toIgnore.isEmpty()) {
16+
return null
17+
}
18+
return when (e::class.java) {
1619
in toIgnore -> null
1720
else -> throw e
1821
}
Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,51 @@
11
package streams
22

3-
import org.neo4j.kernel.AvailabilityGuard
43
import org.neo4j.kernel.configuration.Config
5-
import org.neo4j.kernel.internal.GraphDatabaseAPI
4+
import org.neo4j.logging.Log
65

76
abstract class StreamsEventSink(private val config: Config,
8-
private val db: GraphDatabaseAPI): AvailabilityGuard.AvailabilityListener {
7+
private val queryExecution: StreamsEventSinkQueryExecution,
8+
private val streamsTopicService: StreamsTopicService,
9+
private val log: Log) {
910

10-
abstract var streamsTopicService: StreamsTopicService?
11+
abstract fun stop()
12+
13+
abstract fun start()
14+
15+
abstract fun getEventConsumerFactory(): StreamsEventConsumerFactory
16+
17+
abstract fun getEventSinkConfigMapper(): StreamsEventSinkConfigMapper
18+
19+
}
20+
21+
abstract class StreamsEventConsumer<T>(private val consumer: T, config: StreamsSinkConfiguration, private val log: Log) {
1122

1223
abstract fun stop()
1324

25+
abstract fun withTopics(topics: Set<String>): StreamsEventConsumer<T>
26+
1427
abstract fun start()
1528

16-
override fun unavailable() {
17-
stop()
18-
}
29+
abstract fun read(): Map<String, List<Map<String, Any?>>>?
1930

20-
override fun available() {
21-
start()
22-
}
31+
}
2332

33+
abstract class StreamsEventConsumerFactory {
34+
abstract fun createStreamsEventConsumer(config: Map<String, String>, log: Log): StreamsEventConsumer<*>
2435
}
2536

2637
object StreamsEventSinkFactory {
27-
fun getStreamsEventSink(config: Config, db: GraphDatabaseAPI): StreamsEventSink {
38+
fun getStreamsEventSink(config: Config, streamsQueryExecution: StreamsEventSinkQueryExecution,
39+
streamsTopicService: StreamsTopicService, log: Log): StreamsEventSink {
2840
return Class.forName(config.raw.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
2941
.getConstructor(Config::class.java,
30-
GraphDatabaseAPI::class.java)
31-
.newInstance(config, db) as StreamsEventSink
42+
StreamsEventSinkQueryExecution::class.java,
43+
StreamsTopicService::class.java,
44+
Log::class.java)
45+
.newInstance(config, streamsQueryExecution, streamsTopicService, log) as StreamsEventSink
3246
}
47+
}
48+
49+
abstract class StreamsEventSinkConfigMapper(private val baseConfiguration: Map<String, String>, private val mapping: Map<String, String>) {
50+
abstract fun convert(config: Map<String, String>): Map<String, String>
3351
}

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import org.neo4j.kernel.impl.spi.KernelContext
88
import org.neo4j.kernel.internal.GraphDatabaseAPI
99
import org.neo4j.kernel.lifecycle.Lifecycle
1010
import org.neo4j.kernel.lifecycle.LifecycleAdapter
11+
import streams.procedures.StreamsSinkProcedures
1112
import streams.utils.StreamsUtils
1213

1314
class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSinkExtensionFactory.Dependencies>("Streams.Consumer") {
@@ -33,8 +34,24 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
3334

3435
override fun start() {
3536
try {
36-
eventSink = StreamsEventSinkFactory.getStreamsEventSink(configuration, db)
37-
dependencies.availabilityGuard().addListener(eventSink)
37+
dependencies.availabilityGuard().addListener(object: AvailabilityGuard.AvailabilityListener {
38+
override fun unavailable() {}
39+
40+
override fun available() {
41+
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration)
42+
val streamsTopicService = StreamsTopicService(db, streamsSinkConfiguration.topics)
43+
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db, log.getUserLog(StreamsEventSinkQueryExecution::class.java))
44+
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
45+
eventSink = StreamsEventSinkFactory.getStreamsEventSink(configuration,
46+
streamsQueryExecution,
47+
streamsTopicService,
48+
log.getUserLog(StreamsEventSinkFactory::class.java))
49+
eventSink.start()
50+
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
51+
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
52+
}
53+
54+
})
3855
} catch (e: Exception) {
3956
e.printStackTrace()
4057
streamsLog.error("Error initializing the streaming sink", e)

consumer/src/main/kotlin/streams/StreamsEventSinkQueryExecution.kt

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,34 @@
11
package streams
22

3+
import org.neo4j.graphdb.GraphDatabaseService
34
import org.neo4j.kernel.internal.GraphDatabaseAPI
45
import org.neo4j.logging.Log
6+
import streams.utils.Neo4jUtils
7+
import streams.utils.StreamsUtils
58

69
class StreamsEventSinkQueryExecution(private val streamsTopicService: StreamsTopicService, private val db: GraphDatabaseAPI, val log: Log) {
710

8-
private val UNWIND: String = "UNWIND {events} AS event"
9-
1011
fun execute(topic: String, params: Collection<Any>) {
1112
val cypherQuery = streamsTopicService.get(topic)
1213
if (cypherQuery == null) {
1314
return
1415
}
16+
val query = "${StreamsUtils.UNWIND} $cypherQuery"
1517
if(log.isDebugEnabled){
16-
log.debug("Processing ${params.size} events from Kafka")
18+
log.debug("Processing ${params.size} events with query: $query")
19+
}
20+
if (Neo4jUtils.isWriteableInstance(db)) {
21+
try {
22+
db.execute(query, mapOf("events" to params)).close()
23+
} catch (e: Exception) {
24+
log.error("Error while executing the query", e)
25+
}
26+
} else {
27+
if(log.isDebugEnabled){
28+
log.debug("Not writeable instance")
29+
}
1730
}
18-
db.execute("$UNWIND $cypherQuery", mapOf("events" to params)).close()
31+
1932
}
2033

2134
fun execute(map: Map<String, Collection<Any>>) {

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,30 @@ private object StreamsSinkConfigurationConstants {
88
const val STREAMS_CONFIG_PREFIX: String = "streams."
99
const val STREAMS_SINK_TOPIC_CYPHER_PREFIX: String = "sink.topic.cypher."
1010
const val ENABLED = "sink.enabled"
11+
const val PROCEDURES_ENABLED = "procedures.enabled"
1112
}
1213

1314
data class StreamsSinkConfiguration(val enabled: Boolean = true,
15+
val proceduresEnabled: Boolean = true,
1416
val sinkPollingInterval: Long = Long.MAX_VALUE,
1517
val topics: Map<String, String> = emptyMap()) {
1618

1719
companion object {
1820
fun from(cfg: Config): StreamsSinkConfiguration {
21+
return from(cfg.raw)
22+
}
23+
24+
fun from(cfg: Map<String, String>): StreamsSinkConfiguration {
1925
val default = StreamsSinkConfiguration()
20-
val config = cfg.raw
26+
val config = cfg
2127
.filterKeys { it.startsWith(StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX) }
2228
.mapKeys { it.key.substring(StreamsSinkConfigurationConstants.STREAMS_CONFIG_PREFIX.length) }
2329
val topics = config
2430
.filterKeys { it.startsWith(StreamsSinkConfigurationConstants.STREAMS_SINK_TOPIC_CYPHER_PREFIX) }
2531
.mapKeys { it.key.replace(StreamsSinkConfigurationConstants.STREAMS_SINK_TOPIC_CYPHER_PREFIX, StringUtils.EMPTY) }
2632
return default.copy(enabled = config.getOrDefault(StreamsSinkConfigurationConstants.ENABLED, default.enabled).toString().toBoolean(),
33+
proceduresEnabled = config.getOrDefault(StreamsSinkConfigurationConstants.PROCEDURES_ENABLED, default.proceduresEnabled)
34+
.toString().toBoolean(),
2735
sinkPollingInterval = config.getOrDefault("sink.polling.interval", default.sinkPollingInterval).toString().toLong(),
2836
topics = topics)
2937
}

consumer/src/main/kotlin/streams/StreamsTopicService.kt

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,45 @@ package streams
33
import org.apache.commons.lang3.StringUtils
44
import org.neo4j.kernel.impl.core.EmbeddedProxySPI
55
import org.neo4j.kernel.impl.core.GraphProperties
6-
import org.neo4j.kernel.impl.logging.LogService
76
import org.neo4j.kernel.internal.GraphDatabaseAPI
87
import streams.utils.Neo4jUtils
98

109

1110
private const val STREAMS_TOPIC_KEY: String = "streams.sink.topic."
1211

13-
class StreamsTopicService(private val db: GraphDatabaseAPI, private val streamsSinkConfiguration: StreamsSinkConfiguration) {
12+
class StreamsTopicService(private val db: GraphDatabaseAPI, private val topicMap: Map<String, String>) {
1413
private val properties: GraphProperties = db.dependencyResolver.resolveDependency(EmbeddedProxySPI::class.java).newGraphPropertiesProxy()
1514
private val log = Neo4jUtils.getLogService(db).getUserLog(StreamsTopicService::class.java)
1615

1716
init {
18-
setAll(streamsSinkConfiguration.topics)
17+
clear()
18+
setAll(topicMap)
19+
}
20+
21+
fun clear() {
22+
return db.beginTx().use {
23+
val keys = properties.allProperties
24+
.filterKeys { it.startsWith(STREAMS_TOPIC_KEY) }
25+
.keys
26+
keys.forEach {
27+
properties.removeProperty(it)
28+
}
29+
it.success()
30+
}
31+
}
32+
33+
fun remove(topic: String) {
34+
val key = "$STREAMS_TOPIC_KEY$topic"
35+
return db.beginTx().use {
36+
if (!properties.hasProperty(key)) {
37+
if (log.isDebugEnabled) {
38+
log.debug("No query registered for topic $topic")
39+
}
40+
return
41+
}
42+
properties.removeProperty(key)
43+
it.success()
44+
}
1945
}
2046

2147
fun set(topic: String, query: String) {

0 commit comments

Comments
 (0)