|
| 1 | +[[introduction]] |
| 2 | +== Introduction |
| 3 | + |
| 4 | +ifdef::env-docs[] |
| 5 | +[abstract] |
| 6 | +-- |
| 7 | +This chapter provides an introduction to the Neo4j Streams Library, and instructions for installation. |
| 8 | +-- |
| 9 | +endif::env-docs[] |
| 10 | + |
| 11 | +Many user and customers want to integrate Kafka and other streaming solutions with Neo4j. |
| 12 | +Either to ingest data into the graph from other sources. |
| 13 | +Or to send update events (change data capture - CDC) to the event log for later consumption. |
| 14 | + |
| 15 | +This extension was developed to satisfy all these use-cases and more to come. |
| 16 | + |
| 17 | +The project is composed of several parts: |
| 18 | + |
| 19 | +* Neo4j Streams Procedure: a procedure to send a payload to a topic |
| 20 | +* Neo4j Streams Producer: a transaction event handler events that sends data to a Kafka topic |
| 21 | +* Neo4j Streams Consumer: a Neo4j application that ingest data from Kafka topics into Neo4j via templated Cypher Statements |
| 22 | +* Kafka-Connect Plugin: a plugin for the Confluent Platform that allows to ingest data into Neo4j, from Kafka topics, via Cypher queries. |
| 23 | + |
| 24 | +[[before_begin]] |
| 25 | +=== Before you Begin |
| 26 | + |
| 27 | +Neo4j streams can run in two modes: |
| 28 | + |
| 29 | +* As a **Neo4j plugin**, neo4j-streams runs inside of the database, and can both consume and produce messages |
| 30 | +to Kafka. |
| 31 | +* As a **Kafka Connect worker**, neo4j-streams is deployed separately from the Neo4j database. At this time, |
| 32 | +the connect worker can be used to push data to Neo4j (Neo4j as the consumer) but does not yet support |
| 33 | +change data capture (CDC) coming _from_ Neo4j. |
| 34 | + |
| 35 | +Experienced Neo4j users will likely prefer running the software as a Neo4j Plugin. Kafka administrators |
| 36 | +may prefer using the Kafka Connect method. |
| 37 | + |
| 38 | +The remainder of the introduction section assumes you are running Neo4j Streams as a Neo4j plugin. |
| 39 | +More information on the alternative Kafka Connect method can be link:/kafka-connect/[found in this section]. |
| 40 | + |
| 41 | +[[installation]] |
| 42 | +=== Installation |
| 43 | + |
| 44 | +Download the latest release jar from https://github.com/neo4j-contrib/neo4j-streams/releases/latest |
| 45 | + |
| 46 | +Copy it into `$NEO4J_HOME/plugins` and configure the relevant connections. |
| 47 | + |
| 48 | +[[configuration]] |
| 49 | +=== Configuration Example |
| 50 | + |
| 51 | +Configuring neo4j-streams comes in three different parts, depending on your need: |
| 52 | + |
| 53 | +. *Required*: Configuring a connection to Kafka |
| 54 | +. _Optional_: Configuring Neo4j to ingest from Kafka (link:/consumer[Consumer]) |
| 55 | +. _Optional_: Configuring Neo4j to produce records to Kafka (link:/producer[Producer]) |
| 56 | + |
| 57 | +Below is a complete configured example of using the plugin in both modes, assuming kafka running |
| 58 | +on localhost. See the relevant subsections to adjust the configuration as necessary. |
| 59 | + |
| 60 | +.neo4j.conf |
| 61 | +[source,ini] |
| 62 | +---- |
| 63 | +kafka.zookeeper.connect=localhost:2181 |
| 64 | +kafka.bootstrap.servers=localhost:9092 |
| 65 | +
|
| 66 | +streams.sink.enabled=true |
| 67 | +streams.sink.topic.cypher.topic-name=MERGE (n:Person { id: event.id }) SET n += event |
| 68 | +
|
| 69 | +streams.source.enabled=true |
| 70 | +streams.source.topic.nodes.new-person-topic=Person{*} |
| 71 | +streams.source.topic.relationships.who-knows-who=KNOWS{*} |
| 72 | +streams.source.schema.polling.interval=10000 |
| 73 | +---- |
| 74 | + |
| 75 | +The rest of this section will deal with overall plugin configuration. |
| 76 | + |
| 77 | +[[kafka_settings]] |
| 78 | +=== Kafka Settings |
| 79 | + |
| 80 | +Any configuration option that starts with `kafka.` will be passed to the underlying Kafka driver. Neo4j |
| 81 | +streams uses the official Confluent Kafka producer and consumer java clients. |
| 82 | +Configuration settings which are valid for those connectors will also work for Neo4j Streams. |
| 83 | + |
| 84 | +For example, in the |
| 85 | +kafka documentation linked below, the configuration setting named `batch.size` should be stated as |
| 86 | +`kafka.batch.size` in Neo4j Streams. |
| 87 | + |
| 88 | +The following are common configuration settings you may wish to use. _This is not a complete |
| 89 | +list_. The full list of configuration options and reference material is available from Confluent's |
| 90 | +site for link:https://docs.confluent.io/current/installation/configuration/consumer-configs.html#cp-config-consumer[consumer configurations] and |
| 91 | +link:https://docs.confluent.io/current/installation/configuration/producer-configs.html#cp-config-producer[producer configurations] |
| 92 | + |
| 93 | +.Most Common Needed Configuration Settings |
| 94 | +|=== |
| 95 | +|Setting Name |Description |Default Value |
| 96 | + |
| 97 | +|kafka.max.poll.records |
| 98 | +|The maximum number of records to pull per batch from Kafka. Increasing this number will mean |
| 99 | +larger transactions in Neo4j memory and may improve throughput. |
| 100 | +|500 |
| 101 | + |
| 102 | +|kafka.buffer.memory |
| 103 | +|The total bytes of memory the producer can use to buffer records waiting. Use this to adjust |
| 104 | +how much memory the plugin may require to hold messages not yet delivered to Neo4j |
| 105 | +|33554432 |
| 106 | + |
| 107 | +|kafka.batch.size |
| 108 | +|(Producer only) The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. |
| 109 | +|16384 |
| 110 | + |
| 111 | +|kafka.batch.size |
| 112 | +|(Producer only) The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. |
| 113 | +|16384 |
| 114 | + |
| 115 | +|kafka.max.partition.fetch.bytes |
| 116 | +|(Consumer only) The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. |
| 117 | +|1048576 |
| 118 | + |
| 119 | +|kafka.group.id |
| 120 | +|A unique string that identifies the consumer group this consumer belongs to. |
| 121 | +|N/A |
| 122 | +|=== |
| 123 | + |
| 124 | +[[confluent_cloud]] |
| 125 | +=== Confluent Cloud |
| 126 | + |
| 127 | +Configuring a connection to a Confluent Cloud instance should follow |
| 128 | +link:https://docs.confluent.io/current/cloud/using/config-client.html#java-client[Confluent's Java Client] |
| 129 | +configuration advice, and the advice just above. At a minimum, to configure this, you will need: |
| 130 | + |
| 131 | +* `bootstrap_server_url` |
| 132 | +* `api-key` |
| 133 | +* `api-secret` |
| 134 | + |
| 135 | +[[configuration_plugin]] |
| 136 | +=== Plugin Configuration |
| 137 | + |
| 138 | +Any configuration option that starts with `streams.` controls how the plugin itself behaves. For a full |
| 139 | +list of options available, see the documentation subsections on the producer and consumer. |
| 140 | + |
| 141 | +[[configuration_docker]] |
| 142 | +=== A Note on Running Neo4j in Docker |
| 143 | + |
| 144 | +When Neo4j is run in a docker, some special considerations apply; please see |
| 145 | +link:https://neo4j.com/docs/operations-manual/current/docker/configuration/[Neo4j Docker Configuration] |
| 146 | +for more information. In particular, the configuration format used in `neo4j.conf` looks different. |
| 147 | + |
| 148 | +Please note that the Neo4j Docker image use a naming convention; you can override every neo4j.conf property by prefix it with `NEO4J_` and using the following transformations: |
| 149 | + |
| 150 | +* single underscore is converted in double underscore: `_ -> __` |
| 151 | +* point is converted in single underscore: `.` -> `_` |
| 152 | + |
| 153 | +Example: |
| 154 | + |
| 155 | +* `dbms.memory.heap.max_size=8G` -> `NEO4J_dbms_memory_heap_max__size: 8G` |
| 156 | +* `dbms.logs.debug.level=DEBUG` -> `NEO4J_dbms_logs_debug_level: DEBUG` |
| 157 | + |
| 158 | +For more information and examples see the link:/docker[Docker section] of the documentation. |
| 159 | + |
| 160 | +[[restart]] |
| 161 | +=== Restart Neo4j |
| 162 | + |
| 163 | +Once the plugin is installed and configured, restarting the database will make it active. |
| 164 | +If you have configured Neo4j to consume from kafka, it will begin immediately processing messages. |
0 commit comments