Skip to content

Commit 437e5f2

Browse files
authored
Fixes #464: Kafka sink ignores neo4j.batch.timeout.msecs when neo4j.batch.parallelize = false (#468)
1 parent 7091cd2 commit 437e5f2

File tree

8 files changed

+122
-18
lines changed

8 files changed

+122
-18
lines changed

doc/docs/modules/ROOT/pages/architecture/throughput.adoc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ And so a critical factor is how many items are in the "events" array, per-transa
1717

1818
* Heap sizing affects how big transactions coming from Kafka can be without starving other queries that are running
1919
* Page cache sizing affects how much data is hot and has a major impact on the cypher queries that neo4j-streams runs.
20-
* `neo4j.batch.parallelize` affects whether batches can be run in parallel. This is a potential massive benefit for throughput - but comes with a sharp edge that ordering cannot be guaranteed if you parallelize, and there is risk of locking errors. **This setting only applies to the kafka connect worker**.
21-
* `neo4j.batch.size` affects how many records get run in a Neo4j batch.
22-
* `neo4j.batch.timeout.msecs` affects how long batches are given to execute.
20+
* `neo4j.batch.parallelize` affects whether batches can be run in parallel. This is a potential massive benefit for throughput - but comes with a sharp edge that ordering cannot be guaranteed if you parallelize, and there is risk of locking errors. (`*`)
21+
* `neo4j.batch.size` affects how many records get run in a Neo4j batch. (`*`)
22+
* `neo4j.batch.timeout.msecs` affects how long batches are given to execute. (`*`)
23+
24+
**The settings with `*` only applies to the kafka connect worker**.
25+
2326

2427
== Bolt Protocol overhead & Latency
2528

doc/docs/modules/ROOT/pages/kafka-connect.adoc

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ include::cud-file-format.adoc[]
206206

207207
Neo4j 4.0 Enterprise has https://neo4j.com/docs/operations-manual/4.0/manage-databases/[multi-tenancy support],
208208
in order to support this feature, in order to support this feature with Kafka Connect plugin, creating the Sink
209-
instance we have to add the `database` property, which tells the Connector the database to use as default. If you
209+
instance we have to add the `neo4j.database` property, which tells the Connector the database to use as default. If you
210210
don't specify that property, the default database `neo4j` will be used.
211211

212212
[NOTE]
@@ -303,7 +303,6 @@ Following a summary of all the configuration parameters you can use for the Kafk
303303
| Mandatory
304304
| Note
305305

306-
| database | <DATABASE_NAME> | false | Specify a database name only if you want to use a non-default database. Default value is 'neo4j'
307306
| topics | <topicA,topicB> | true | A list of comma-separated topics
308307
| connector.class | streams.kafka.connect.sink.Neo4jSinkConnector | true |
309308
| key.converter | org.apache.kafka.connect.storage.StringConverter | false | Converter class for key Connect data
@@ -321,10 +320,25 @@ Following a summary of all the configuration parameters you can use for the Kafk
321320
| errors.deadletterqueue.context.headers.enable | false/true | false | enrich messages with metadata headers like exception, timestamp, org. topic, org.part, default:false
322321
| errors.deadletterqueue.context.headers.prefix | prefix-text | false | common prefix for header entries, e.g. `"__streams.errors."` , default: not set
323322
| errors.deadletterqueue.topic.replication.factor | 3/1 | false | replication factor, need to set to 1 for single partition, default:3
323+
| neo4j.database | "bolt://neo4j:7687" | true | Specify a database name only if you want to use a non-default database. Default value is 'neo4j'
324324
| neo4j.server.uri | "bolt://neo4j:7687" | true | Neo4j Server URI
325325
| neo4j.authentication.basic.username | your_neo4j_user | true | Neo4j username
326326
| neo4j.authentication.basic.password | your_neo4j_password | true | Neo4j password
327+
| neo4j.authentication.basic.realm | your_neo4j_auth_realm | false | The authentication realm
328+
| neo4j.authentication.kerberos.ticket | your_kerberos_ticket | false | The Kerberos ticket
329+
| neo4j.authentication.type | NONE/BASIC/KERBEROS | false | The authentication type (default: 'BASIC')
330+
| neo4j.batch.size | Integer | false | The max number of events processed by the Cypher query (default: 1000)
331+
| neo4j.batch.timeout.msecs | Integer | false | The execution timeout for the cypher query (default: 0, that is without timeout)
332+
| neo4j.batch.parallelize | boolean | false | If enabled messages are processed concurrently in the sink. Non concurrent execution supports in-order processing, e.g. for CDC
333+
| neo4j.connection.max.lifetime.msecs | Long | false | The max Neo4j connection lifetime (default: 1 hour)
334+
| neo4j.connection.acquisition.timeout.msecs | Long | false | The max Neo4j acquisition timeout (default 1 hour)
335+
| neo4j.connection.liveness.check.timeout.msecs | Long | false | The max Neo4j liveness check timeout (default 1 hour)
336+
| neo4j.connection.max.pool.size | Int | false | The max pool size (default: 100)
337+
| neo4j.encryption.ca.certificate.path | your_certificate_path | false | The path of the certificate
327338
| neo4j.encryption.enabled | true/false | false |
339+
| neo4j.encryption.trust.strategy | TRUST_ALL_CERTIFICATES/TRUST_CUSTOM_CA_SIGNED_CERTIFICATES/TRUST_SYSTEM_CA_SIGNED_CERTIFICATES | false | The Neo4j trust strategy (default: TRUST_ALL_CERTIFICATES)
340+
| neo4j.retry.backoff.msecs | Long | false | The time in milliseconds to wait following a transient error before a retry attempt is made (default: 30000).
341+
| neo4j.retry.max.attemps | Long | false | The maximum number of times to retry on transient errors (except for TimeoutException) before failing the task (default: 5).
328342
| neo4j.topic.cdc.sourceId | <list of topics separated by semicolon> | false |
329343
| neo4j.topic.cdc.sourceId.labelName | <the label attached to the node> | false | default value is *SourceEvent*
330344
| neo4j.topic.cdc.sourceId.idName | <the id name given to the CDC id field> | false | default value is *sourceId*

doc/docs/modules/ROOT/partials/docker-data/kafka-connect-docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ services:
9696
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
9797
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
9898
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
99-
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
99+
CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
100100
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
101101
command:
102102
# - bash

kafka-connect-neo4j/docker/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ services:
9696
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
9797
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
9898
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
99-
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
99+
CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
100100
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
101101
command:
102102
# - bash

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import kotlinx.coroutines.Dispatchers
44
import kotlinx.coroutines.ExperimentalCoroutinesApi
55
import kotlinx.coroutines.ObsoleteCoroutinesApi
66
import kotlinx.coroutines.async
7+
import kotlinx.coroutines.awaitAll
78
import kotlinx.coroutines.runBlocking
89
import org.apache.kafka.common.config.ConfigException
910
import org.apache.kafka.connect.errors.ConnectException
@@ -12,17 +13,18 @@ import org.neo4j.driver.Config
1213
import org.neo4j.driver.Driver
1314
import org.neo4j.driver.GraphDatabase
1415
import org.neo4j.driver.SessionConfig
16+
import org.neo4j.driver.TransactionConfig
1517
import org.neo4j.driver.exceptions.ClientException
1618
import org.neo4j.driver.exceptions.TransientException
1719
import org.neo4j.driver.net.ServerAddress
1820
import org.slf4j.Logger
1921
import org.slf4j.LoggerFactory
20-
import streams.extensions.awaitAll
2122
import streams.extensions.errors
2223
import streams.kafka.connect.utils.PropertiesUtil
2324
import streams.service.StreamsSinkEntity
2425
import streams.service.StreamsSinkService
2526
import streams.utils.retryForException
27+
import java.time.Duration
2628
import java.util.concurrent.TimeUnit
2729
import kotlin.streams.toList
2830

@@ -34,6 +36,7 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
3436

3537
private val driver: Driver
3638
private val sessionConfig: SessionConfig
39+
private val transactionConfig: TransactionConfig
3740

3841
init {
3942
val configBuilder = Config.builder()
@@ -80,6 +83,15 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
8083
sessionConfigBuilder.withDatabase(config.database)
8184
}
8285
this.sessionConfig = sessionConfigBuilder.build()
86+
87+
val batchTimeout = this.config.batchTimeout
88+
this.transactionConfig = if (batchTimeout > 0) {
89+
TransactionConfig.builder()
90+
.withTimeout(Duration.ofMillis(batchTimeout))
91+
.build()
92+
} else {
93+
TransactionConfig.empty()
94+
}
8395
}
8496

8597
fun close() {
@@ -91,14 +103,15 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
91103
driver.session(sessionConfig).use { session ->
92104
try {
93105
runBlocking {
94-
retryForException<Unit>(exceptions = arrayOf(ClientException::class.java, TransientException::class.java),
95-
retries = config.retryMaxAttempts, delayTime = 0) { // we use the delayTime = 0, because we delegate the retryBackoff to the Neo4j Java Driver
96-
session.writeTransaction {
106+
retryForException(exceptions = arrayOf(ClientException::class.java, TransientException::class.java),
107+
retries = config.retryMaxAttempts, delayTime = 0) { // we use the delayTime = 0, because we delegate the retryBackoff to the Neo4j Java Driver
108+
109+
session.writeTransaction({
97110
val summary = it.run(query, data).consume()
98111
if (log.isDebugEnabled) {
99112
log.debug("Successfully executed query: `$query`. Summary: $summary")
100113
}
101-
}
114+
}, transactionConfig)
102115
}
103116
}
104117
} catch (e: Exception) {
@@ -129,7 +142,8 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
129142
records.map { async(Dispatchers.IO) { writeForTopic(topic, it) } }
130143
}
131144

132-
jobs.awaitAll(config.batchTimeout)
145+
// timeout starts in writeTransaction()
146+
jobs.awaitAll()
133147
jobs.mapNotNull { it.errors() }
134148
}
135149

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
169169
const val TOPIC_CUD = "neo4j.topic.cud"
170170

171171
const val CONNECTION_POOL_MAX_SIZE_DEFAULT = 100
172-
val BATCH_TIMEOUT_DEFAULT = TimeUnit.SECONDS.toMillis(30L)
172+
val BATCH_TIMEOUT_DEFAULT = TimeUnit.SECONDS.toMillis(0L)
173173
const val BATCH_SIZE_DEFAULT = 1000
174174
val RETRY_BACKOFF_DEFAULT = TimeUnit.SECONDS.toMillis(30L)
175175
const val RETRY_MAX_ATTEMPTS_DEFAULT = 5
@@ -298,7 +298,7 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
298298
.importance(ConfigDef.Importance.LOW)
299299
.defaultValue(BATCH_TIMEOUT_DEFAULT)
300300
.group(ConfigGroup.BATCH)
301-
.validator(ConfigDef.Range.atLeast(1)).build())
301+
.validator(ConfigDef.Range.atLeast(0)).build())
302302
.define(ConfigKeyBuilder
303303
.of(RETRY_BACKOFF_MSECS, ConfigDef.Type.LONG)
304304
.documentation(PropertiesUtil.getProperty(RETRY_BACKOFF_MSECS))

kafka-connect-neo4j/src/main/resources/kafka-connect-sink.properties

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,21 @@ neo4j.database=Type: String;\nDescription: The neo4j database instance name (def
1717
neo4j.server.uri=Type: String;\nDescription: The Bolt URI (default bolt://localhost:7687)
1818
neo4j.authentication.type=Type: enum[NONE, BASIC, KERBEROS];\nDescription: The authentication type (default BASIC)
1919
neo4j.batch.size=Type: Int;\nDescription: The max number of events processed by the Cypher query (default 1000)
20-
neo4j.batch.timeout.msecs=Type: Long;\nDescription: The execution timeout for the cypher query (default 30000)
20+
neo4j.batch.timeout.msecs=Type: Long;\nDescription: The execution timeout for the cypher query (default: 0, that is without timeout)
2121
neo4j.authentication.basic.username=Type: String;\nDescription: The authentication username
2222
neo4j.authentication.basic.password=Type: String;\nDescription: The authentication password
2323
neo4j.authentication.basic.realm=Type: String;\nDescription: The authentication realm
2424
neo4j.authentication.kerberos.ticket=Type: String;\nDescription: The Kerberos ticket
2525
neo4j.encryption.enabled=Type: Boolean;\nDescription: If the encryption is enabled (default false)
26-
neo4j.encryption.trust.strategy=Type: enum[TRUST_ALL_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES];\nDescription: The Neo4j trust strategy (default RUST_ALL_CERTIFICATES)
26+
neo4j.encryption.trust.strategy=Type: enum[TRUST_ALL_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES];\nDescription: The Neo4j trust strategy (default TRUST_ALL_CERTIFICATES)
2727
neo4j.encryption.ca.certificate.path=Type: String;\nDescription: The path of the certificate
2828
neo4j.connection.max.lifetime.msecs=Type: Long;\nDescription: The max Neo4j connection lifetime (default 1 hour)
2929
neo4j.connection.acquisition.timeout.msecs=Type: Long;\nDescription: The max Neo4j acquisition timeout (default 1 hour)
3030
neo4j.connection.liveness.check.timeout.msecs=Type: Long;\nDescription: The max Neo4j liveness check timeout (default 1 hour)
3131
neo4j.connection.max.pool.size=Type: Int;\nDescription: The max pool size (default 100)
3232
neo4j.load.balance.strategy=Type: enum[ROUND_ROBIN, LEAST_CONNECTED];\nDescription: The Neo4j load balance strategy (default LEAST_CONNECTED)
3333
neo4j.retry.backoff.msecs=Type: Long;\nDescription: The time in milliseconds to wait following a transient error before a retry attempt is made (default 30000).
34-
neo4j.retry.max.attemps=Type: Int;\nDescription: The maximum number of times to retry on transient errors before failing the task (default 5).
34+
neo4j.retry.max.attemps=Type: Int;\nDescription: The maximum number of times to retry on transient errors (except for TimeoutException) before failing the task (default 5).
3535
neo4j.topic.cdc.sourceId=Type: String;\nDescription: The topic list (separated by semicolon) that manages CDC events with the `SourceId` strategy
3636
neo4j.topic.cdc.sourceId.labelName=Type: String;\nDescription: The label name attached to the events with the `SourceId` strategy (default SourceEvent)
3737
neo4j.topic.cdc.sourceId.idName=Type: String;\nDescription: The id property name attached to the events with the `SourceId` strategy (default sourceId)

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import org.apache.kafka.connect.data.Timestamp
77
import org.apache.kafka.connect.sink.SinkRecord
88
import org.apache.kafka.connect.sink.SinkTask
99
import org.apache.kafka.connect.sink.SinkTaskContext
10+
import org.junit.After
1011
import org.junit.Rule
1112
import org.junit.Test
1213
import org.mockito.Mockito.mock
@@ -38,6 +39,11 @@ class Neo4jSinkTaskTest {
3839
.withDisabledServer()
3940
.withConfig(GraphDatabaseSettings.auth_enabled, false)
4041

42+
@After
43+
fun after() {
44+
cleanAll()
45+
}
46+
4147
private val PERSON_SCHEMA = SchemaBuilder.struct().name("com.example.Person")
4248
.field("firstName", Schema.STRING_SCHEMA)
4349
.field("lastName", Schema.STRING_SCHEMA)
@@ -1276,5 +1282,72 @@ class Neo4jSinkTaskTest {
12761282
&& errorData.exception!!.javaClass.name == "org.neo4j.driver.exceptions.ClientException")
12771283
}
12781284
}
1285+
1286+
@Test
1287+
fun `should stop the query and fails with small timeout and vice versa`() {
1288+
val myTopic = "foo"
1289+
val props = mutableMapOf<String, String>()
1290+
props[Neo4jSinkConnectorConfig.SERVER_URI] = db.boltURI().toString()
1291+
props["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}$myTopic"] = "CREATE (n:Person {name: event.name})"
1292+
props[Neo4jSinkConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
1293+
props[Neo4jSinkConnectorConfig.BATCH_PARALLELIZE] = true.toString()
1294+
val batchSize = 500000
1295+
props[Neo4jSinkConnectorConfig.BATCH_SIZE] = batchSize.toString()
1296+
props[Neo4jSinkConnectorConfig.BATCH_TIMEOUT_MSECS] = 1.toString()
1297+
props[SinkTask.TOPICS_CONFIG] = myTopic
1298+
val input = (1..batchSize).map {
1299+
SinkRecord(myTopic, 1, null, null, null, mapOf("name" to it.toString()), it.toLong())
1300+
}
1301+
// test timeout with parallel=true
1302+
assertFailsWithTimeout(props, input, batchSize)
1303+
countFooPersonEntities(0)
1304+
1305+
// test timeout with parallel=false
1306+
props[Neo4jSinkConnectorConfig.BATCH_PARALLELIZE] = false.toString()
1307+
assertFailsWithTimeout(props, input, batchSize)
1308+
countFooPersonEntities(0)
1309+
1310+
// test with large timeout
1311+
props[Neo4jSinkConnectorConfig.BATCH_TIMEOUT_MSECS] = 30000.toString()
1312+
val taskValidParallelFalse = Neo4jSinkTask()
1313+
taskValidParallelFalse.initialize(mock(SinkTaskContext::class.java))
1314+
taskValidParallelFalse.start(props)
1315+
taskValidParallelFalse.put(input)
1316+
countFooPersonEntities(batchSize)
1317+
1318+
props[Neo4jSinkConnectorConfig.BATCH_PARALLELIZE] = true.toString()
1319+
val taskValidParallelTrue = Neo4jSinkTask()
1320+
taskValidParallelTrue.initialize(mock(SinkTaskContext::class.java))
1321+
taskValidParallelTrue.start(props)
1322+
taskValidParallelTrue.put(input)
1323+
countFooPersonEntities(batchSize * 2)
1324+
}
1325+
1326+
private fun assertFailsWithTimeout(props: MutableMap<String, String>, input: List<SinkRecord>, expectedDataErrorSize: Int) {
1327+
try {
1328+
val taskInvalid = Neo4jSinkTask()
1329+
taskInvalid.initialize(mock(SinkTaskContext::class.java))
1330+
taskInvalid.start(props)
1331+
taskInvalid.put(input)
1332+
fail("Should fail because of TimeoutException")
1333+
} catch (e: ProcessingError) {
1334+
val errors = e.errorDatas
1335+
assertEquals(expectedDataErrorSize, errors.size)
1336+
}
1337+
}
1338+
1339+
private fun countFooPersonEntities(expected: Int) {
1340+
db.defaultDatabaseService().beginTx().use {
1341+
val personCount = it.execute("MATCH (p:Person) RETURN count(p) as count").columnAs<Long>("count").next()
1342+
assertEquals(expected, personCount.toInt())
1343+
}
1344+
}
1345+
1346+
private fun cleanAll() {
1347+
db.defaultDatabaseService().beginTx().use {
1348+
it.execute("MATCH (n) DETACH DELETE n")
1349+
it.commit()
1350+
}
1351+
}
12791352

12801353
}

0 commit comments

Comments
 (0)