Skip to content

Commit af8ec69

Browse files
committed
Merge branch '3.5' of github.com:neo4j-contrib/neo4j-streams into 3.5
2 parents 13da42d + e7abb9f commit af8ec69

File tree

6 files changed

+94
-43
lines changed

6 files changed

+94
-43
lines changed

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,10 @@ object Neo4jUtils {
4141
.resolveDependency(LogService::class.java)
4242
}
4343

44-
fun isEnterpriseEdition(db: GraphDatabaseAPI): Boolean {
44+
fun isCluster(db: GraphDatabaseAPI): Boolean {
4545
try {
46-
return db.execute("""
47-
CALL dbms.components() YIELD edition
48-
RETURN edition = "enterprise" AS isEnterprise
49-
""".trimIndent()).columnAs<Boolean>("isEnterprise").next()
46+
db.execute("CALL dbms.cluster.role()").columnAs<String>("role").next()
47+
return true
5048
} catch (e: QueryExecutionException) {
5149
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
5250
return false

common/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package streams.utils
22

3-
import org.junit.AfterClass
4-
import org.junit.BeforeClass
5-
import org.junit.Test
3+
import org.junit.*
64
import org.neo4j.kernel.internal.GraphDatabaseAPI
75
import org.neo4j.test.TestGraphDatabaseFactory
86
import kotlin.test.assertFalse
@@ -34,8 +32,8 @@ class Neo4jUtilsTest {
3432
}
3533

3634
@Test
37-
fun shouldCheckIfIsEnterpriseEdition() {
38-
val isEnterprise = Neo4jUtils.isEnterpriseEdition(db)
35+
fun shouldCheckIfIsACluster() {
36+
val isEnterprise = Neo4jUtils.isCluster(db)
3937
assertFalse { isEnterprise }
4038
}
4139

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ abstract class StreamsEventConsumer(private val log: Log) {
3030

3131
abstract fun read(topicConfig: Map<String, Any> = emptyMap(), action: (String, List<Any>) -> Unit)
3232

33+
abstract fun read(action: (String, List<Any>) -> Unit)
34+
3335
}
3436

3537
abstract class StreamsEventConsumerFactory {

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
3030

3131
class StreamsEventLifecycle(private val dependencies: StreamsEventSinkExtensionFactory.Dependencies): LifecycleAdapter() {
3232
private val db = dependencies.graphdatabaseAPI()
33-
private val log = dependencies.log()
33+
private val logService = dependencies.log()
3434
private val configuration = dependencies.config()
35-
private var streamsLog = log.getUserLog(StreamsEventLifecycle::class.java)
35+
private var streamsLog = logService.getUserLog(StreamsEventLifecycle::class.java)
3636

3737
private lateinit var eventSink: StreamsEventSink
3838

@@ -42,49 +42,42 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
4242
override fun unavailable() {}
4343

4444
override fun available() {
45+
streamsLog.info("Initialising the Streams Sink module")
4546
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration)
46-
4747
val streamsTopicService = StreamsTopicService(db)
4848
streamsTopicService.clearAll()
4949
streamsTopicService.setAll(streamsSinkConfiguration.topics)
5050
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
5151
streamsSinkConfiguration.sourceIdStrategyConfig)
5252
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db,
53-
log.getUserLog(StreamsEventSinkQueryExecution::class.java),
53+
logService.getUserLog(StreamsEventSinkQueryExecution::class.java),
5454
strategyMap)
5555

5656
// Create the Sink
57-
val sinkLog = log.getUserLog(StreamsEventSinkFactory::class.java)
57+
val log = logService.getUserLog(StreamsEventSinkFactory::class.java)
5858
eventSink = StreamsEventSinkFactory
5959
.getStreamsEventSink(configuration,
6060
streamsQueryExecution,
6161
streamsTopicService,
62-
63-
sinkLog,
62+
log,
6463
db)
6564
// start the Sink
66-
if (Neo4jUtils.isEnterpriseEdition(db)) {
67-
sinkLog.info("The Sink module is running in an enterprise edition, checking for the ${Neo4jUtils.LEADER}")
68-
Neo4jUtils.executeInLeader(db, sinkLog) { initSinkModule() }
65+
if (Neo4jUtils.isCluster(db)) {
66+
log.info("The Sink module is running in a cluster, checking for the ${Neo4jUtils.LEADER}")
67+
Neo4jUtils.executeInLeader(db, log) { initSinkModule() }
6968
} else {
7069
// check if is writeable instance
7170
Neo4jUtils.executeInWriteableInstance(db) { initSinkModule() }
7271
}
7372

7473
// Register required services for the Procedures
7574
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
76-
eventSink = StreamsEventSinkFactory.getStreamsEventSink(configuration,
77-
streamsQueryExecution,
78-
streamsTopicService,
79-
log.getUserLog(StreamsEventSinkFactory::class.java), db)
80-
eventSink.start()
8175
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
8276
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
8377
}
8478

8579
})
8680
} catch (e: Exception) {
87-
e.printStackTrace()
8881
streamsLog.error("Error initializing the streaming sink", e)
8982
}
9083
}

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,10 @@ class KafkaEventSink(private val config: Config,
7575
}
7676

7777
override fun stop() = runBlocking {
78-
log.info("Stopping Sink daemon Job")
78+
log.info("Stopping Kafka Sink daemon Job")
7979
try {
8080
job.cancelAndJoin()
81+
log.info("Kafka Sink daemon Job stopped")
8182
} catch (e : UninitializedPropertyAccessException) { /* ignoring this one only */ }
8283
}
8384

@@ -98,20 +99,22 @@ class KafkaEventSink(private val config: Config,
9899
return GlobalScope.launch(Dispatchers.IO) { // TODO improve exception management
99100
try {
100101
while (isActive) {
101-
if (Neo4jUtils.isWriteableInstance(db)) {
102+
val timeMillis = if (Neo4jUtils.isWriteableInstance(db)) {
102103
eventConsumer.read { topic, data ->
103104
if (log.isDebugEnabled) {
104105
log.debug("Reading data from topic $topic")
105106
}
106107
queryExecution.writeForTopic(topic, data)
107108
}
109+
TimeUnit.SECONDS.toMillis(1)
108110
} else {
109-
val timeMillis = TimeUnit.MILLISECONDS.toMinutes(5)
111+
val timeMillis = TimeUnit.MINUTES.toMillis(5)
110112
if (log.isDebugEnabled) {
111113
log.debug("Not in a writeable instance, new check in $timeMillis millis")
112114
}
113-
delay(timeMillis)
115+
timeMillis
114116
}
117+
delay(timeMillis)
115118
}
116119
eventConsumer.stop()
117120
} catch (e: Throwable) {
@@ -135,7 +138,7 @@ data class KafkaTopicConfig(val commit: Boolean, val topicPartitionsMap: Map<Top
135138
TopicPartition(topicConfigEntry.key, partition) to offset
136139
}
137140
}
138-
.associateBy({ it.first }, { it.second })
141+
.toMap()
139142

140143
fun fromMap(map: Map<String, Any>): KafkaTopicConfig {
141144
val commit = map.getOrDefault("commit", true).toString().toBoolean()
@@ -202,6 +205,10 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
202205
}
203206
}
204207

208+
override fun read(action: (String, List<Any>) -> Unit) {
209+
readSimple(action)
210+
}
211+
205212
override fun read(topicConfig: Map<String, Any>, action: (String, List<Any>) -> Unit) {
206213
val kafkaTopicConfig = KafkaTopicConfig.fromMap(topicConfig)
207214
if (kafkaTopicConfig.topicPartitionsMap.isEmpty()) {
@@ -301,6 +308,11 @@ class KafkaManualCommitEventConsumer(private val config: KafkaSinkConfiguration,
301308
}
302309
}
303310

311+
override fun read(action: (String, List<Any>) -> Unit) {
312+
val topicMap = readSimple(action)
313+
commitData(true, topicMap)
314+
}
315+
304316
override fun read(topicConfig: Map<String, Any>, action: (String, List<Any>) -> Unit) {
305317
val kafkaTopicConfig = KafkaTopicConfig.fromMap(topicConfig)
306318
val topicMap = if (kafkaTopicConfig.topicPartitionsMap.isEmpty()) {

readme.adoc

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,85 @@
11
= Neo4j Streaming Data Integrations
2+
:docs: https://neo4j-contrib.github.io/neo4j-streams/
23

3-
== Description
4+
image::https://github.com/neo4j-contrib/neo4j-streams/raw/gh-pages/3.4/images/neo4j-loves-confluent.png[]
45

5-
The project is composed by 2 parts:
6+
This project integrates Neo4j with streaming data solutions.
67

7-
* Neo4j Streams Producer: a transaction event handler events that sends data to a Kafka topic
8-
* Neo4j Streams Consumer: a Neo4j application that ingest data from Kafka topics into Neo4j via templated Cypher Statements
8+
Currently it provides an integration with *Apache Kafka and the Confluent Platform*.
99

10-
== Installation
10+
The project contains these components:
1111

12-
Build locally
12+
== Neo4j Kafka Connect Plugin
13+
14+
A https://www.confluent.io/connector/kafka-connect-neo4j-sink/[Kafka Connect Sink plugin] that allows to ingest events from Kafka to Neo4j via templated Cypher statements. (link:{docs}#_kafka_connect[docs], https://www.confluent.io/blog/kafka-connect-neo4j-sink-plugin[article])
15+
16+
image::https://www.confluent.io/wp-content/uploads/Kafka_Connect_Neo4j_Sink.png[width=300,link=https://www.confluent.io/connector/kafka-connect-neo4j-sink/]
17+
18+
== Neo4j Server Extension
19+
20+
* Source: a Change-Data-Capture (CDC) implementation sends change data to Kafka topics (link:{docs}#_neo4j_streams_producer[docs])
21+
* Sink: a Neo4j extension that ingest data from Kafka topics into Neo4j via templated Cypher statements (link:{docs}#_neo4j_streams_consumer[docs])
22+
* Neo4j Streams Procedures (Read & Write): Procedures to write to and read from topics interactively/programmatically (link:{docs}#_procedures[docs])
23+
24+
== Documentation & Articles
25+
26+
Read more at http://r.neo4j.com/kafka
27+
28+
Here are articles, introducing the https://medium.com/neo4j/a-new-neo4j-integration-with-apache-kafka-6099c14851d2[Neo4j Extension] and the https://www.confluent.io/blog/kafka-connect-neo4j-sink-plugin[Kafka Connect Plugin].
29+
30+
And practical applications of the extension for https://medium.freecodecamp.org/how-to-leverage-neo4j-streams-and-build-a-just-in-time-data-warehouse-64adf290f093[Building Data Pipelines with Kafka, Spark, Neo4j & Zeppelin] (https://medium.freecodecamp.org/how-to-ingest-data-into-neo4j-from-a-kafka-stream-a34f574f5655[part 2]).
31+
32+
And for exchanging results of https://medium.freecodecamp.org/how-to-embrace-event-driven-graph-analytics-using-neo4j-and-apache-kafka-474c9f405e06[Neo4j Graph Algorithms within a Neo4j Cluster].
33+
34+
== Feedback & Suggestions
35+
36+
Please raise https://github.com/neo4j-contrib/neo4j-streams/issues[issues on GitHub], we also love contributions, so don't be shy to send a Pull Request.
37+
38+
We would also love you to https://goo.gl/forms/VLwvqwsIvdfdm9fL2[**fill out our survey**] to learn more about your Kafka + Neo4j use-cases and deployments.
39+
40+
== Installation Server Extension
41+
42+
You can run/test the extension link:{docs}#docker[locally with Docker], or install it manually into your existing Neo4j server.
43+
44+
1. Download the jar-file from the https://github.com/neo4j-contrib/neo4j-streams/releases/latest[latest release]
45+
2. Copy `neo4j-streams-<VERSION>.jar` into `$NEO4J_HOME/plugins`
46+
3. Update `$NEO4J_HOME/conf/neo4j.conf` with the necessary configuration.
47+
4. Restart Neo4j
48+
49+
== Development & Contributions
50+
51+
==== Build locally
1352

1453
----
1554
mvn clean install
1655
----
1756

18-
2. Copy `<project_dir>/target/neo4j-streams-<VERSION>.jar` into `$NEO4J_HOME/plugins`
19-
3. Restart Neo4j
57+
You'll find the build artifact in `<project_dir>/target/neo4j-streams-<VERSION>.jar`
58+
59+
Testing the link:{docs}#_docker_compose_file[Kafka Connect Plugin locally with Docker].
60+
61+
////
62+
== Documentation Links
63+
64+
=== Kafka Connect Plugin
65+
66+
### link:doc/asciidoc/kafka-connect/index.adoc[Kafka Connect Plugin]
2067
21-
== Streams Producer
68+
=== Streams Producer
2269
2370
### link:doc/asciidoc/producer/configuration.adoc[Configuration]
2471
2572
### link:doc/asciidoc/producer/patterns.adoc[Patterns]
2673
27-
== Streams Consumer
74+
=== Streams Consumer
2875
2976
### link:doc/asciidoc/consumer/configuration.adoc[Configuration]
3077
31-
== Streams Procedures
78+
=== Streams Procedures
3279
3380
### link:doc/asciidoc/procedures/index.adoc[Procedures]
3481
35-
== Docker
82+
=== Docker
3683
3784
### link:doc/asciidoc/docker/index.adoc[Docker]
85+
////

0 commit comments

Comments
 (0)