Skip to content

Commit ffcd22a

Browse files
authored
fixes #375: Source Plugin for Kafka Connect (#496)
* fixes #375: Source Plugin for Kafka Connect * feedback * fixed Neo4j version in deprecation message
1 parent d7e8745 commit ffcd22a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2058
-704
lines changed

common/src/main/kotlin/streams/extensions/CommonExtensions.kt

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import org.apache.avro.generic.IndexedRecord
88
import org.apache.kafka.clients.consumer.ConsumerRecord
99
import org.apache.kafka.clients.consumer.OffsetAndMetadata
1010
import org.apache.kafka.common.TopicPartition
11+
import org.neo4j.driver.types.Relationship
1112
import org.neo4j.graphdb.Node
12-
import streams.utils.JSONUtils
1313
import streams.service.StreamsSinkEntity
14+
import streams.utils.JSONUtils
1415
import java.nio.ByteBuffer
1516
import java.util.*
1617
import javax.lang.model.SourceVersion
@@ -26,6 +27,22 @@ fun Node.labelNames() : List<String> {
2627
return this.labels.map { it.name() }
2728
}
2829

30+
fun org.neo4j.driver.types.Node.asStreamsMap(): Map<String, Any?> {
31+
val nodeMap = this.asMap().toMutableMap()
32+
nodeMap["<id>"] = this.id()
33+
nodeMap["<labels>"] = this.labels()
34+
return nodeMap
35+
}
36+
37+
fun Relationship.asStreamsMap(): Map<String, Any?> {
38+
val relMap = this.asMap().toMutableMap()
39+
relMap["<id>"] = this.id()
40+
relMap["<type>"] = this.type()
41+
relMap["<source.id>"] = this.startNodeId()
42+
relMap["<target.id>"] = this.endNodeId()
43+
return relMap
44+
}
45+
2946
fun String.toPointCase(): String {
3047
return this.split("(?<=[a-z])(?=[A-Z])".toRegex()).joinToString(separator = ".").toLowerCase()
3148
}

common/src/main/kotlin/streams/utils/JSONUtils.kt

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.neo4j.driver.internal.value.PointValue
1212
import org.neo4j.graphdb.spatial.Point
1313
import org.neo4j.values.storable.CoordinateReferenceSystem
1414
import streams.events.*
15+
import streams.extensions.asStreamsMap
1516
import java.io.IOException
1617
import java.time.temporal.TemporalAccessor
1718
import kotlin.reflect.full.isSubclassOf
@@ -43,6 +44,17 @@ fun PointValue.toStreamsPoint(): StreamsPoint {
4344
}
4445
}
4546

47+
fun org.neo4j.driver.types.Point.toStreamsPoint(): StreamsPoint {
48+
val point = this
49+
return when (val crsType = point.srid()) {
50+
CoordinateReferenceSystem.Cartesian.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian.name, point.x(), point.y())
51+
CoordinateReferenceSystem.Cartesian_3D.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian_3D.name, point.x(), point.y(), point.z())
52+
CoordinateReferenceSystem.WGS84.code -> StreamsPointWgs(CoordinateReferenceSystem.WGS84.name, point.x(), point.y())
53+
CoordinateReferenceSystem.WGS84_3D.code -> StreamsPointWgs(CoordinateReferenceSystem.WGS84_3D.name, point.x(), point.y(), point.z())
54+
else -> throw IllegalArgumentException("Point type $crsType not supported")
55+
}
56+
}
57+
4658
class PointSerializer : JsonSerializer<Point>() {
4759
@Throws(IOException::class, JsonProcessingException::class)
4860
override fun serialize(value: Point?, jgen: JsonGenerator,
@@ -54,6 +66,17 @@ class PointSerializer : JsonSerializer<Point>() {
5466
}
5567
}
5668

69+
class DriverPointSerializer : JsonSerializer<org.neo4j.driver.types.Point>() {
70+
@Throws(IOException::class, JsonProcessingException::class)
71+
override fun serialize(value: org.neo4j.driver.types.Point?, jgen: JsonGenerator,
72+
provider: SerializerProvider) {
73+
if (value == null) {
74+
return
75+
}
76+
jgen.writeObject(value.toStreamsPoint())
77+
}
78+
}
79+
5780
class PointValueSerializer : JsonSerializer<PointValue>() {
5881
@Throws(IOException::class, JsonProcessingException::class)
5982
override fun serialize(value: PointValue?, jgen: JsonGenerator,
@@ -76,6 +99,27 @@ class TemporalAccessorSerializer : JsonSerializer<TemporalAccessor>() {
7699
}
77100
}
78101

102+
class DriverNodeSerializer : JsonSerializer<org.neo4j.driver.types.Node>() {
103+
@Throws(IOException::class, JsonProcessingException::class)
104+
override fun serialize(value: org.neo4j.driver.types.Node?, jgen: JsonGenerator,
105+
provider: SerializerProvider) {
106+
if (value == null) {
107+
return
108+
}
109+
jgen.writeObject(value.asStreamsMap())
110+
}
111+
}
112+
113+
class DriverRelationshipSerializer : JsonSerializer<org.neo4j.driver.types.Relationship>() {
114+
@Throws(IOException::class, JsonProcessingException::class)
115+
override fun serialize(value: org.neo4j.driver.types.Relationship?, jgen: JsonGenerator,
116+
provider: SerializerProvider) {
117+
if (value == null) {
118+
return
119+
}
120+
jgen.writeObject(value.asStreamsMap())
121+
}
122+
}
79123

80124
object JSONUtils {
81125

@@ -86,6 +130,9 @@ object JSONUtils {
86130
val module = SimpleModule("Neo4jKafkaSerializer")
87131
StreamsUtils.ignoreExceptions({ module.addSerializer(Point::class.java, PointSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
88132
StreamsUtils.ignoreExceptions({ module.addSerializer(PointValue::class.java, PointValueSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
133+
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Point::class.java, DriverPointSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
134+
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Node::class.java, DriverNodeSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
135+
StreamsUtils.ignoreExceptions({ module.addSerializer(org.neo4j.driver.types.Relationship::class.java, DriverRelationshipSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
89136
module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer())
90137
OBJECT_MAPPER.registerModule(module)
91138
OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)

common/src/main/kotlin/streams/utils/StreamsUtils.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,10 @@ object StreamsUtils {
6262

6363
fun getName(db: GraphDatabaseService) = db.databaseName()
6464

65+
fun closeSafetely(closeable: AutoCloseable, onError: (Throwable) -> Unit) = try {
66+
closeable.close()
67+
} catch (e: Throwable) {
68+
onError(e)
69+
}
70+
6571
}

doc/docs/modules/ROOT/nav.adoc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
33
* xref::overview.adoc[Project overview]
44
// ** xref::overview.adoc#neo4j_streams_plugin_overview[Neo4j Streams plugin]
5-
// ** xref::overview.adoc#kafka_connect_plugin_overview[Kafka Connect plugin]
5+
// ** xref::overview.adoc#kafka_connect_neo4j_connector_overview[Kafka Connect Neo4j Connector]
66
77
* xref::quickstart.adoc[Quick Start]
88
// ** xref::quickstart.adoc#neo4j_streams_plugin_quickstart[Neo4j Streams plugin]
9-
// ** xref::quickstart.adoc#kafka_connect_plugin_quickstart[Kafka Connect plugin]
9+
// ** xref::quickstart.adoc#kafka_connect_neo4j_connector_quickstart[Kafka Connect Neo4j Connector]
1010
1111
* xref::configuration.adoc[The Configuration System]
1212
// ** xref::configuration.adoc#neo4j_configuration_system[Configuration System overview]
@@ -28,8 +28,8 @@
2828
// ** xref::procedures.adoc#_streams_publish[streams.publish]
2929
// ** xref::procedures.adoc#_streams_consume[streams.consume]
3030
31-
* xref::kafka-connect.adoc[Kafka Connect Plugin]
32-
// ** xref::kafka-connect.adoc#kafka_connect_plugin_install[Plugin installation]
31+
* xref::kafka-connect.adoc[Kafka Connect Neo4j Connector]
32+
// ** xref::kafka-connect.adoc#kafka_connect_neo4j_connector_install[Plugin installation]
3333
// ** xref::kafka-connect.adoc#kafka-connect-sink-instance[Create the Sink Instance]
3434
// ** xref::kafka-connect.adoc#kafka-connect-sink-strategies[Sink ingestion strategies]
3535
// ** xref::kafka-connect.adoc#kafka-connect-cud-file-format[How deal with bad data]
@@ -45,7 +45,7 @@
4545
4646
* xref::docker.adoc[Run with Docker]
4747
// ** xref::docker.adoc#neo4j_streams_docker[Neo4j Streams plugin]
48-
// ** xref::docker.adoc#docker_kafka_connect[Kafka Connect Plugin]
48+
// ** xref::docker.adoc#docker_kafka_connect[Kafka Connect Neo4j Connector]
4949
// ** xref::docker.adoc#docker_streams_cluster[Neo4j Streams with Neo4j Cluster and Kafka Cluster]
5050
5151
* xref::kafka-ssl.adoc[Configure with Kafka over SSL]
Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
= When to use Kafka Connect vs. Neo4j Streams as a Plugin
1+
= When to use Kafka Connect Neo4j Connector vs. Neo4j Streams as a Plugin
22

33
[abstract]
44
This section covers how to decide whether to run as a Kafka Connect worker, or as a Neo4j Plugin.
@@ -7,16 +7,18 @@ This section covers how to decide whether to run as a Kafka Connect worker, or a
77

88
=== Pros
99

10-
* Processing is outside of Neo4j so that memory & CPU impact doesn't impact Neo4j. You don't need to size the database with Kafka utilization in mind.
11-
* Much easier for Kafka pros to manage; they benefit from the Confluent ecosystem, such as connecting the REST API to manipulate connectors, the control center to administer & monitor them.
12-
* By restarting the worker, you can restart your sink strategy without having downtime for Neo4j.
10+
* Processing is outside of Neo4j so that memory & CPU impact doesn't impact Neo4j.
11+
You don't need to size the database with Kafka utilization in mind.
12+
* Much easier for Kafka pros to manage; they benefit from the Confluent ecosystem,
13+
such as connecting the REST API to manipulate connectors, the control center to administer & monitor them.
14+
* By restarting the worker, you can restart your sink/source strategy without having downtime for Neo4j.
1315
* Upgrade Neo4j-Streams without restarting the cluster
14-
Strictly an external bolt client, so better overall security management of plugin actions.
16+
* Strictly an external bolt client, so better overall security management of plugin actions.
1517

1618
=== Cons
1719

18-
* You can't do TransactionEventHandlers from outside of the database, so you can only sink to Neo4j, you can't produce from it.
19-
* If you're using Confluent Cloud, you can't host the connector in the cloud (yet). So this requires a 3rd piece of architecture: Confluent Cloud, Neo4j, and the Connect Worker (usually a separate VM)
20+
* If you're using Confluent Cloud, you can't host the connector in the cloud (yet).
21+
So this requires a 3rd piece of architecture: Confluent Cloud, Neo4j, and the Connect Worker (usually a separate VM)
2022
* Possibly worse throughput due to bolt latency & overhead, and separate network hop.
2123

2224
== Neo4j-Streams Plugin
@@ -31,8 +33,6 @@ Strictly an external bolt client, so better overall security management of plugi
3133
=== Cons
3234

3335
* Memory & CPU consumption on your Neo4j Server
34-
* Requires restarting Neo4j in order to update your configuration and/or Cypher.
35-
* Upgrading plugin requires cluster restart
3636
* Need to track config to be identical across all members in the cluster
3737
* Lesser ability to manage the plugin because it is running inside of the database and not under a particular user account.
3838

doc/docs/modules/ROOT/pages/configuration.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
[#neo4j_configuration_system]
2+
3+
[NOTE]
4+
The Neo4j Streams Plugin running inside the Neo4j database is deprecated and will not be supported after version 4.3 of Neo4j.
5+
We recommend users not to adopt this plugin for new implementations, and to consider migrating to the use of the Kafka Connect Neo4j Connector as a replacement
6+
27
== Configuration system overview
38

49
=== Location of Configuration Information

doc/docs/modules/ROOT/pages/consumer.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ Use this section to configure Neo4j to ingest the data from Kafka into Neo4j.
1010
--
1111

1212
endif::env-docs[]
13+
14+
[NOTE]
15+
The Neo4j Streams Plugin running inside the Neo4j database is deprecated and will not be supported after version 4.3 of Neo4j.
16+
We recommend users not to adopt this plugin for new implementations, and to consider migrating to the use of the Kafka Connect Neo4j Connector as a replacement
17+
1318
[[neo4j_streams_sink]]
1419
Is the Kafka Sink that ingest the data directly into Neo4j
1520

doc/docs/modules/ROOT/pages/docker.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
= Run with Docker
22

3+
[NOTE]
4+
The Neo4j Streams Plugin running inside the Neo4j database is deprecated and will not be supported after version 4.3 of Neo4j.
5+
We recommend users not to adopt this plugin for new implementations, and to consider migrating to the use of the Kafka Connect Neo4j Connector as a replacement
6+
37
ifdef::env-docs[]
48
[abstract]
59
--
@@ -277,7 +281,7 @@ You'll see something like this:
277281
----
278282

279283
[[docker_kafka_connect]]
280-
== Kafka Connect plugin
284+
== Kafka Connect Neo4j Connector
281285

282286
Inside the directory `/kafka-connect-neo4j/docker` you'll find a compose file that allows you to start the whole testing environment:
283287

doc/docs/modules/ROOT/pages/faq.adoc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,16 @@ the source code repository for full terms and conditions.
99

1010
== How to integrate Neo4j and Kafka
1111

12-
When integrating Neo4j and Kafka using Neo4j Streams plugin or Kafka Connect plugin
12+
When integrating Neo4j and Kafka using Neo4j Streams plugin or Kafka Connect Neo4j Connector
1313
is important configure just one of them and not both.
14-
If you need to load data from Kafka to Neo4j (and not viceversa) you can just use the Kafka Connect plugin.
15-
If you need to have both sink and source functionalities then you have to use the Neo4j Streams plugin.
16-
This is the main difference between those plugins.
1714

1815
== About CUD file format
1916

2017
The CUD file format is JSON file that represents Graph Entities (Nodes/Relationships) and how to manage them in term
2118
of **C**reate/**U**pdate/**D**elete operations.
2219
So every JSON event represents a single operation.
2320
For more details about how to use these, please checkout the xref:consumer.adoc#_cud_file_format[CUD File Format] section for the
24-
Neo4j Streams plugin, and the xref:kafka-connect.adoc#kafka-connect-cud-file-format[CUD File Format] section for Kafka Connect plugin.
21+
Neo4j Streams plugin, and the xref:kafka-connect.adoc#kafka-connect-cud-file-format[CUD File Format] section for Kafka Connect Neo4j Connector.
2522

2623
== How to ingest events using CDC Schema strategy
2724

@@ -45,7 +42,7 @@ streams.sink.topic.cdc.schema=<topic-name>
4542
streams.sink.enabled=true
4643
----
4744

48-
If you decide to use Kafka Connect plugin for the sink instance, then it has to be configured as follow:
45+
If you decide to use Kafka Connect Neo4j Connector for the sink instance, then it has to be configured as follow:
4946

5047
[source, json]
5148
----
@@ -191,3 +188,7 @@ Please see the following link for further details:
191188
* https://docs.confluent.io/platform/current/kafka/deployment.html#multi-node-configuration[Kafka multi-node configuration]
192189
* https://docs.confluent.io/platform/current/zookeeper/deployment.html#multi-node-setup[Zookeeper multi-node setup]
193190
--
191+
192+
== Which way should I run the Neo4j Connector for Apache Kafka: As a database plugin, or using the Kafka Connect Framework?
193+
194+
If you have already implemented the database plugin and are running Neo4j <= 4.2, there is no need to change, all other users, new users, and Neo4j Aura users should implement only the Kafka Connect Neo4j Connector

doc/docs/modules/ROOT/pages/index.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ The guide covers the following areas:
1717
* xref:producer.adoc[How to configure Neo4j Streams as Source]
1818
* xref:consumer.adoc[How to configure Neo4j Streams as Sink]
1919
* xref:procedures.adoc[How to use Neo4j Streams Procedures]
20-
* xref:kafka-connect.adoc[How to configure Kafka Connect plugin]
20+
* xref:kafka-connect.adoc[How to configure Kafka Connect Neo4j Connector]
2121
* xref:neo4j-cluster.adoc[Using with Neo4j Causal Cluster]
2222
* xref:docker.adoc[Run Neo4j Streams and Kafka connect with Docker]
2323
* xref:kafka-ssl.adoc[A guidance on how to configure SSL between Kafka and Neo4j]

0 commit comments

Comments
 (0)