Skip to content

Commit 98d06d1

Browse files
authored
Merge pull request #139 from conker84/3.5
Rebase from v3.4 pre-release
2 parents 2715c92 + f77e6d8 commit 98d06d1

31 files changed

+766
-135
lines changed

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ abstract class StreamsEventConsumer<T>(private val consumer: T, config: StreamsS
2626

2727
abstract fun start()
2828

29-
abstract fun read(): Map<String, List<Map<String, Any?>>>?
29+
abstract fun read(): Map<String, List<Any>>?
3030

3131
}
3232

consumer/src/main/kotlin/streams/StreamsEventSinkQueryExecution.kt

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package streams
22

3-
import org.neo4j.graphdb.GraphDatabaseService
43
import org.neo4j.kernel.internal.GraphDatabaseAPI
54
import org.neo4j.logging.Log
65
import streams.utils.Neo4jUtils
@@ -14,12 +13,16 @@ class StreamsEventSinkQueryExecution(private val streamsTopicService: StreamsTop
1413
return
1514
}
1615
val query = "${StreamsUtils.UNWIND} $cypherQuery"
17-
if(log.isDebugEnabled){
18-
log.debug("Processing ${params.size} events with query: $query")
16+
if (log.isDebugEnabled) {
17+
log.debug("Processing ${params.size} events, for topic $topic with query: $query")
1918
}
2019
if (Neo4jUtils.isWriteableInstance(db)) {
2120
try {
22-
db.execute(query, mapOf("events" to params)).close()
21+
val result = db.execute(query, mapOf("events" to params))
22+
if (log.isDebugEnabled) {
23+
log.debug("Query statistics:\n${result.queryStatistics}")
24+
}
25+
result.close()
2326
} catch (e: Exception) {
2427
log.error("Error while executing the query", e)
2528
}

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ private object StreamsSinkConfigurationConstants {
1313

1414
data class StreamsSinkConfiguration(val enabled: Boolean = true,
1515
val proceduresEnabled: Boolean = true,
16-
val sinkPollingInterval: Long = Long.MAX_VALUE,
16+
val sinkPollingInterval: Long = 10000,
1717
val topics: Map<String, String> = emptyMap()) {
1818

1919
companion object {

consumer/src/main/kotlin/streams/StreamsTopicService.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ class StreamsTopicService(private val db: GraphDatabaseAPI, private val topicMap
1919
}
2020

2121
fun clear() {
22+
if (!Neo4jUtils.isWriteableInstance(db)) {
23+
return
24+
}
2225
return db.beginTx().use {
2326
val keys = properties.allProperties
2427
.filterKeys { it.startsWith(STREAMS_TOPIC_KEY) }
@@ -31,6 +34,9 @@ class StreamsTopicService(private val db: GraphDatabaseAPI, private val topicMap
3134
}
3235

3336
fun remove(topic: String) {
37+
if (!Neo4jUtils.isWriteableInstance(db)) {
38+
return
39+
}
3440
val key = "$STREAMS_TOPIC_KEY$topic"
3541
return db.beginTx().use {
3642
if (!properties.hasProperty(key)) {

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
package streams.kafka
22

3-
import kotlinx.coroutines.Dispatchers
4-
import kotlinx.coroutines.GlobalScope
5-
import kotlinx.coroutines.Job
6-
import kotlinx.coroutines.launch
3+
import kotlinx.coroutines.*
74
import org.apache.kafka.clients.consumer.KafkaConsumer
8-
import org.apache.kafka.common.errors.WakeupException
95
import org.neo4j.kernel.configuration.Config
106
import org.neo4j.logging.Log
117
import streams.*
@@ -49,17 +45,15 @@ class KafkaEventSink(private val config: Config,
4945
.createStreamsEventConsumer(config.raw, log)
5046
.withTopics(topics.keys)
5147
this.eventConsumer.start()
52-
if (log.isDebugEnabled) {
53-
log.debug("Subscribed topics with queries: $topics")
54-
}
5548
this.job = createJob()
5649
log.info("Kafka Sink started")
5750
}
5851

59-
override fun stop() {
52+
override fun stop() = runBlocking {
6053
log.info("Stopping Sink daemon Job")
61-
this.eventConsumer.stop()
62-
StreamsUtils.ignoreExceptions({ job.cancel() }, UninitializedPropertyAccessException::class.java)
54+
try {
55+
job.cancelAndJoin()
56+
} catch (e : UninitializedPropertyAccessException) { /* ignoring this one only */ }
6357
}
6458

6559
override fun getEventSinkConfigMapper(): StreamsEventSinkConfigMapper {
@@ -78,7 +72,7 @@ class KafkaEventSink(private val config: Config,
7872
log.info("Creating Sink daemon Job")
7973
return GlobalScope.launch(Dispatchers.IO) {
8074
try {
81-
while (true) {
75+
while (isActive) {
8276
val data= eventConsumer.read()
8377
data?.forEach {
8478
if (log.isDebugEnabled) {
@@ -87,6 +81,7 @@ class KafkaEventSink(private val config: Config,
8781
queryExecution.execute(it.key, it.value)
8882
}
8983
}
84+
eventConsumer.stop()
9085
} catch (e: Throwable) {
9186
val message = e.message ?: "Generic error, please check the stack trace: "
9287
log.error(message, e)
@@ -114,20 +109,18 @@ class KafkaEventConsumer(private val consumer: KafkaConsumer<String, ByteArray>,
114109
return
115110
}
116111
this.consumer.subscribe(topics)
117-
log.info("Subscribing topics: $topics")
118112
}
119113

120114
override fun stop() {
121115
StreamsUtils.ignoreExceptions({ consumer.close() }, UninitializedPropertyAccessException::class.java)
122116
}
123117

124-
override fun read(): Map<String, List<Map<String, Any?>>>? {
118+
override fun read(): Map<String, List<Any>>? {
125119
val records = consumer.poll(config.sinkPollingInterval)
126120
if (records != null && !records.isEmpty) {
127121
return records
128122
.map {
129-
it.topic()!! to JSONUtils.readValue(it.value(), Map::class.java)
130-
.mapKeys { it.key.toString() }
123+
it.topic()!! to JSONUtils.readValue(it.value(), Any::class.java)
131124
}
132125
.groupBy({ it.first }, { it.second })
133126
}

consumer/src/main/kotlin/streams/procedures/StreamsSinkProcedures.kt

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
package streams.procedures
22

3-
import org.apache.commons.lang3.StringUtils
43
import org.neo4j.logging.Log
54
import org.neo4j.procedure.*
65
import streams.StreamsEventConsumerFactory
76
import streams.StreamsEventSinkConfigMapper
87
import streams.StreamsSinkConfiguration
98
import java.util.stream.Stream
109

11-
class StreamResult(@JvmField val event: Any)
10+
class StreamResult(@JvmField val event: Map<String, *>)
1211

1312
class StreamsSinkProcedures {
1413

1514
@JvmField @Context
1615
var log: Log? = null
1716

1817
@Procedure(mode = Mode.SCHEMA, name = "streams.consume")
19-
@Description("streams.consume(topic, config) - Allows to subscribe custom topics")
18+
@Description("streams.consume(topic, {timeout: <long value>, from: <string>}) YIELD event - Allows to consume custom topics")
2019
fun consume(@Name("topic") topic: String?,
2120
@Name(value = "config", defaultValue = "{}") config: Map<String, Any>?): Stream<StreamResult> {
2221
checkEnabled()
@@ -33,13 +32,20 @@ class StreamsSinkProcedures {
3332
.createStreamsEventConsumer(configuration, log!!)
3433
.withTopics(setOf(topic))
3534
consumer.start()
36-
val data = consumer.read()
35+
val data = try {
36+
consumer.read()
37+
} catch (e: Exception) {
38+
if (log?.isDebugEnabled!!) {
39+
log?.error("Error while consuming data", e)
40+
}
41+
emptyMap<String, List<Any>>()
42+
}
3743
consumer.stop()
3844

3945
if (log?.isDebugEnabled!!) {
40-
log?.debug("Data retrieved after ${configuration["streams.sink.polling.interval"]} milliseconds: $data")
46+
log?.debug("Data retrieved from topic $topic after ${configuration["streams.sink.polling.interval"]} milliseconds: $data")
4147
}
42-
return data?.values?.flatMap { list -> list.map { StreamResult(it) } }?.stream() ?: Stream.empty()
48+
return data?.values?.flatMap { list -> list.map { StreamResult(mapOf("data" to it)) } }?.stream() ?: Stream.empty()
4349
}
4450

4551
private fun checkEnabled() {

consumer/src/test/kotlin/integrations/StreamsSinkProceduresIT.kt

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ class StreamsSinkProceduresIT {
4444

4545
private val cypherQueryTemplate = "MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties"
4646

47-
private val topics = listOf("shouldWriteCypherQuery")
48-
4947
@Rule
5048
@JvmField
5149
var testName = TestName()
@@ -114,7 +112,8 @@ class StreamsSinkProceduresIT {
114112
assertTrue { resultMap.containsKey("event") }
115113
assertNotNull(resultMap["event"], "should contain event")
116114
val event = resultMap["event"] as Map<String, Any?>
117-
assertEquals(data, event)
115+
val resultData = event["data"] as Map<String, Any?>
116+
assertEquals(data, resultData)
118117
}
119118

120119
@Test
@@ -123,4 +122,51 @@ class StreamsSinkProceduresIT {
123122
assertFalse { result.hasNext() }
124123
}
125124

125+
@Test
126+
fun shouldReadArrayOfJson() {
127+
val topic = "array-topic"
128+
val list = listOf(data, data)
129+
val producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(list))
130+
kafkaProducer.send(producerRecord).get()
131+
db.execute("""
132+
CALL streams.consume('$topic', {timeout: 5000}) YIELD event
133+
UNWIND event.data AS data
134+
CREATE (t:TEST) SET t += data.properties
135+
""".trimIndent()).close()
136+
val searchResult = db.execute("MATCH (t:TEST) WHERE properties(t) = {props} RETURN count(t) AS count", mapOf("props" to dataProperties))
137+
assertTrue { searchResult.hasNext() }
138+
val searchResultMap = searchResult.next()
139+
assertTrue { searchResultMap.containsKey("count") }
140+
assertEquals(2L, searchResultMap["count"])
141+
}
142+
143+
@Test
144+
fun shouldReadSimpleDataType() {
145+
val topic = "simple-data"
146+
val simpleInt = 1
147+
val simpleBoolean = true
148+
val simpleString = "test"
149+
var producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(simpleInt))
150+
kafkaProducer.send(producerRecord).get()
151+
producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(simpleBoolean))
152+
kafkaProducer.send(producerRecord).get()
153+
producerRecord = ProducerRecord(topic, UUID.randomUUID().toString(), JSONUtils.writeValueAsBytes(simpleString))
154+
kafkaProducer.send(producerRecord).get()
155+
db.execute("""
156+
CALL streams.consume('$topic', {timeout: 5000}) YIELD event
157+
MERGE (t:LOG{simpleData: event.data})
158+
RETURN count(t) AS insert
159+
""".trimIndent()).close()
160+
val searchResult = db.execute("""
161+
MATCH (l:LOG)
162+
WHERE l.simpleData IN [$simpleInt, $simpleBoolean, "$simpleString"]
163+
RETURN count(l) as count
164+
""".trimIndent())
165+
assertTrue { searchResult.hasNext() }
166+
val searchResultMap = searchResult.next()
167+
assertTrue { searchResultMap.containsKey("count") }
168+
assertEquals(3L, searchResultMap["count"])
169+
170+
}
171+
126172
}

consumer/src/test/kotlin/streams/StreamsSinkConfigurationTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class StreamsSinkConfigurationTest {
3333
companion object {
3434
fun testDefaultConf(default: StreamsSinkConfiguration) {
3535
assertEquals(emptyMap(), default.topics)
36-
assertEquals(Long.MAX_VALUE, default.sinkPollingInterval)
36+
assertEquals(10000, default.sinkPollingInterval)
3737
}
3838
fun testFromConf(streamsConfig: StreamsSinkConfiguration, pollingInterval: String, topic: String, topicValue: String) {
3939
assertEquals(pollingInterval.toLong(), streamsConfig.sinkPollingInterval)

doc/asciidoc/procedures/index.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ Uses:
5858
`CALL streams.consume('my-topic', {<config>}) YIELD event RETURN event`
5959

6060
Example:
61-
Imagine you have a producer that publish events like this `{"name": "Andrea", "surname": "Santurbano"}`), we can create user nodes in this way:
61+
Imagine you have a producer that publish events like this `{"name": "Andrea", "surname": "Santurbano"}`, we can create user nodes in this way:
6262

6363
```
6464
CALL streams.consume('my-topic', {<config>}) YIELD event
65-
CREATE (p:Person{firstName: event.name, lastName: event.surname})
65+
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})
6666
```
6767

6868
Input Parameters:

doc/asciidoc/producer/configuration.adoc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ kafka.session.timeout.ms=15000
1414
kafka.connection.timeout.ms=10000
1515
kafka.replication=1
1616
kafka.linger.ms=1
17-
kafka.transaction.id=
17+
kafka.transactional.id=
1818
1919
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
2020
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
2121
streams.source.enable=<true/false, default=true>
2222
----
2323

24-
Note: To use the Kafka transactions please set `kafka.transaction.id` and `kafka.acks` properly
24+
Note: To use the Kafka transactions please set `kafka.transactional.id` and `kafka.acks` properly
2525

26-
See the https://kafka.apache.org/documentation/#brokerconfigs[Apache Kafka documentation] for details on these settings.
26+
See the https://kafka.apache.org/documentation/#brokerconfigs[Apache Kafka documentation] for details on these settings.

0 commit comments

Comments
 (0)