You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Using neo4j-streams is a form of graph ETL. And it pays to separate out these two pieces (the extraction and the transformation) and handle them separately if we want to do this in a performant and easy to maintain manner. If we are producing records from Neo4j back out to Kafka, it's still the same challenge, just in the opposite direction.
20
20
21
-
https://www.confluent.io/blog/building-real-time-streaming-etl-pipeline-20-minutes/[Streaming ETL is nothing new for Kafka] -- it is one of the platform's core use cases. A big complicating factor for Neo4j-streams is that not many people have done it for graph before neo4j-streams.
21
+
{url-confluent-blog}/building-real-time-streaming-etl-pipeline-20-minutes/[Streaming ETL is nothing new for Kafka] -- it is one of the platform's core use cases. A big complicating factor for Neo4j-streams is that not many people have done it for graph before neo4j-streams.
Neo4j can't ingest fast if Kafka isn't set up correctly. While this isn't a common source of problems, it has come up. Confluent has https://www.confluent.io/blog/optimizing-apache-kafka-deployment/[good overall documentation] on optimizing Kafka that is worth being familiar with.
3
+
Neo4j can't ingest fast if Kafka isn't set up correctly. While this isn't a common source of problems, it has come up. Confluent has {url-confluent-blog}/optimizing-apache-kafka-deployment/[good overall documentation] on optimizing Kafka that is worth being familiar with.
4
4
5
5
The main trade offs are these, and they have to make sense at the Kafka layer before they can make sense for Neo4j.
6
6
7
7
* Do you want to optimize for high throughput, which is the rate that data is moved from producers to brokers or brokers to consumers?
8
-
* Do you want to optimize for low latency, which is the elapsed time moving messages end-to-end (from producers to brokers to consumers)?
8
+
* Do you want to optimize for low latency, which is the elapsed time moving messages end-to-end (from producers to brokers to consumers)?
9
9
* Do you want to optimize for high durability, which guarantees that messages that have been committed will not be lost?
10
10
* Do you want to optimize for high availability, which minimizes downtime in case of unexpected failures? Kafka is a distributed system, and it is designed to tolerate failures.
* Batch size (`neo4j.batch.size`) - the number of messages to include in a single transactional batch.
38
-
* Max Poll Records (`kafka.max.poll.records`) - the number of records to use per transaction in Neo4j. There is a tradeoff between memory usage and total transactional overhead.
37
+
* Batch size (`neo4j.batch.size`) - the number of messages to include in a single transactional batch.
38
+
* Max Poll Records (`kafka.max.poll.records`) - the number of records to use per transaction in Neo4j. There is a tradeoff between memory usage and total transactional overhead.
39
39
** Fewer larger batches is faster to import data into Neo4j overall, but requires more memory.
40
-
** **The smaller the payload the larger the batch in general (via unwind)**. A default to start with is 1000 and work your way up. If you are only creating nodes via unwind you can go much higher (20k as a start). Then go for a lower number for the relationship merges (back to 1000-5000).
40
+
** **The smaller the payload the larger the batch in general (via unwind)**. A default to start with is 1000 and work your way up. If you are only creating nodes via unwind you can go much higher (20k as a start). Then go for a lower number for the relationship merges (back to 1000-5000).
41
41
Each batch represents a transaction in memory, and consider that the message size * the batch size is an important factor in determining how much heap you need for your transactions.
42
42
* Fetch bytes (`kafka.max.partition.fetch.bytes`) The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.
43
43
44
-
Every time the kafka client calls the poll() operation, it’s limited by these factors. The first is the maximum number of bytes you can pull, so as to constrain your memory overhead. The second is how many records you might want in a batch. Note that at this layer you have no idea how many bytes/record. The default for the batch size is 1mb. So say you have 200kb records (big json files). If you leave batch size at 1mb default, then you’ll never have more than 5 records/tx. The max poll records constrains the other aspect. Finally, you may wish to read into or adjust kafka.max.poll.interval.ms to constrain the amount of time spent polling in advanced scenarios. https://docs.confluent.io/current/clients/consumer.html=group-configuration[See this documentation] for more information on that setting.
44
+
Every time the kafka client calls the poll() operation, it’s limited by these factors. The first is the maximum number of bytes you can pull, so as to constrain your memory overhead. The second is how many records you might want in a batch. Note that at this layer you have no idea how many bytes/record. The default for the batch size is 1mb. So say you have 200kb records (big json files). If you leave batch size at 1mb default, then you’ll never have more than 5 records/tx. The max poll records constrains the other aspect. Finally, you may wish to read into or adjust kafka.max.poll.interval.ms to constrain the amount of time spent polling in advanced scenarios.
45
+
{url-confluent-clients}/consumer.html#group-configuration[See this documentation] for more information on that setting.
45
46
46
47
A logical setting might be to set max poll records = your desired transactional batch size, set neo4j.batch.size to the same number. In general you can leave kafka.max.partition.fetch.bytes the same, but if you need to adjust it for memory reasons, it should be equal to max poll records * number of bytes/record on average, + 10% or so.
47
48
48
-
https://docs.confluent.io/current/installation/configuration/consumer-configs.html=cp-config-consumer[Important Kafka Consumer Configuration Elements & Their Explanations]
49
+
{url-confluent-install}/configuration/consumer-configs.html[Important Kafka Consumer Configuration Elements & Their Explanations]
49
50
50
51
(Use these with neo4j-streams by prepending with "kafka." in the config)
51
52
52
53
== Kafka Partitioning Strategy
53
54
54
-
A big factor can be how the Kafka topic is set up. See https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/[how to choose the number of topics/partitions in a Kafka Cluster].
55
+
A big factor can be how the Kafka topic is set up. See {url-confluent-blog}/how-choose-number-topics-partitions-kafka-cluster/[how to choose the number of topics/partitions in a Kafka Cluster].
Copy file name to clipboardExpand all lines: doc/docs/modules/ROOT/pages/architecture/transformations.adoc
+2-2Lines changed: 2 additions & 2 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -11,15 +11,15 @@ KSQL is the best available method of transforming streams from one format to ano
11
11
12
12
The downside to KSQL is that it may not work everywhere. Because it's a Confluent Enterprise feature, you won't find it in Amazon MSK or other open source kafka installations.
13
13
14
-
https://docs.confluent.io/current/ksql/docs/quickstart.html[KSQL Documentation can be found here.]
14
+
{url-confluent-ksql}[KSQL Documentation can be found here.]
15
15
16
16
== KStreams
17
17
18
18
KStreams is a Java API that allows for rich transformation of streams in any way you can design. It is more akin to the Neo4j Traversal API, in that you can do whatever you can imagine, but it requires custom code to do so. Typically KStreams programs are small apps which might read from one topic, transform, and write to another. And so for our purposes with graphs, KStreams serves the same architectural purpose as KSQL, it's just more powerful, and requires custom code.
19
19
20
20
In contrast to KSQL which is only available for Confluent Enterprise & Confluent Cloud, the KStreams API should be available with any open source kafka.
21
21
22
-
https://kafka.apache.org/documentation/=streamsapi[KStreams Documentation can be found here]
22
+
https://kafka.apache.org/documentation/#streamsapi[KStreams Documentation can be found here]
Copy file name to clipboardExpand all lines: doc/docs/modules/ROOT/pages/consumer.adoc
+2-2Lines changed: 2 additions & 2 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -104,7 +104,7 @@ The Neo4j Streams Plugin provides several means to handle processing errors.
104
104
It can fail fast or log errors with different detail levels.
105
105
Another way is to re-route all the data and errors that for something reason it wasn't able to ingest to a `Dead Letter Queue`.
106
106
107
-
NOTE: It behaves by default like Kafka Connect, see this https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues[blog post^]
107
+
NOTE: It behaves by default like Kafka Connect, see this {url-confluent-blog}/kafka-connect-deep-dive-error-handling-dead-letter-queues/[blog post^]
108
108
109
109
* fail fast (abort) by default
110
110
* need to configure dead-letter-queue topic to enable
0 commit comments