Skip to content

Commit a4c752e

Browse files
jexpmoxious
authored andcommitted
Fixes #227 Introduce batch parallelization control (#232)
* issue #227 add parallelizeBatches config option * document neo4j.batch.parallelize * New Implementation of parallel sink processing, parallelization and error handling * Resolve feedback from PR * Update readme.adoc
1 parent 3fd6176 commit a4c752e

File tree

4 files changed

+72
-31
lines changed

4 files changed

+72
-31
lines changed

kafka-connect-neo4j/docker/readme.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ You can set the following configuration values via Confluent Connect UI, or via
2323
|neo4j.connection.liveness.check.timeout.msecs|Long| The max Neo4j liveness check timeout (default 1 hour)
2424
|neo4j.connection.max.pool.size|Int| The max pool size (default 100)
2525
|neo4j.load.balance.strategy|enum[ROUND_ROBIN, LEAST_CONNECTED]| The Neo4j load balance strategy (default LEAST_CONNECTED)
26+
|neo4j.batch.parallelize|Boolean|(default true) While concurrent batch processing improves throughput, it might cause out-of-order handling of events. Set to `false` if you need application of messages with strict ordering, e.g. for change-data-capture (CDC) events.
2627
|===
2728

2829
=== Configuring the stack
@@ -530,7 +531,7 @@ CREATE INDEX ON :Person(name)
530531

531532
[source,cypher]
532533
----
533-
CREATE INDEX ON :Family(surname)
534+
CREATE INDEX ON :Family(name)
534535
----
535536

536537
Please type:

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package streams.kafka.connect.sink
22

3-
import kotlinx.coroutines.Dispatchers
4-
import kotlinx.coroutines.async
3+
import kotlinx.coroutines.*
54
import kotlinx.coroutines.channels.ticker
6-
import kotlinx.coroutines.coroutineScope
7-
import kotlinx.coroutines.runBlocking
85
import kotlinx.coroutines.selects.whileSelect
96
import org.apache.kafka.common.config.ConfigException
107
import org.apache.kafka.connect.errors.ConnectException
@@ -24,7 +21,11 @@ import streams.service.TopicType
2421
import streams.service.TopicTypeGroup
2522
import streams.utils.StreamsUtils
2623
import streams.utils.retryForException
24+
import java.lang.RuntimeException
25+
import java.util.concurrent.CompletionException
26+
import java.util.concurrent.CopyOnWriteArraySet
2727
import java.util.concurrent.TimeUnit
28+
import java.util.concurrent.TimeoutException
2829

2930

3031
class Neo4jService(private val config: Neo4jSinkConnectorConfig):
@@ -117,35 +118,67 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
117118
}
118119
}
119120

120-
// perhaps better? https://stackoverflow.com/questions/52192752/kotlin-how-to-run-n-coroutines-and-wait-for-first-m-results-or-timeout
121-
suspend fun writeData(data: Map<String, List<List<StreamsSinkEntity>>>) = coroutineScope {
122-
val timeout = config.batchTimeout
123-
val ticker = ticker(timeout)
124-
val deferredList = data
125-
.flatMap { (topic, records) ->
126-
records.map { async(Dispatchers.IO) { writeForTopic(topic, it) } }
127-
}
128-
// loops while select<Boolean> returns true
121+
// taken from https://stackoverflow.com/questions/52192752/kotlin-how-to-run-n-coroutines-and-wait-for-first-m-results-or-timeout
122+
@ObsoleteCoroutinesApi
123+
@ExperimentalCoroutinesApi
124+
suspend fun <T> List<Deferred<T>>.awaitAll(timeoutMs: Long): List<T> {
125+
val jobs = CopyOnWriteArraySet<Deferred<T>>(this)
126+
val result = ArrayList<T>(size)
127+
val timeout = ticker(timeoutMs)
128+
129129
whileSelect {
130-
ticker.onReceive {
131-
if (log.isDebugEnabled) {
132-
log.debug("Timeout $timeout occurred while executing queries")
130+
jobs.forEach { deferred ->
131+
deferred.onAwait {
132+
jobs.remove(deferred)
133+
result.add(it)
134+
result.size != size
133135
}
134-
deferredList.forEach { deferred -> deferred.cancel() }
135-
false // Stops the whileSelect
136136
}
137-
val isAllCompleted = deferredList.all { it.isCompleted } // true when all are completed
138-
// selects first that is done and returns !false==true for whileSelect until it's !true when all/last is done
139-
deferredList.forEach {
140-
it.onAwait { !isAllCompleted } // Stops the whileSelect
137+
138+
timeout.onReceive {
139+
jobs.forEach { it.cancel() }
140+
throw TimeoutException("Tasks ${size} cancelled after timeout of $timeoutMs ms.")
141141
}
142142
}
143-
val exceptionMessages = deferredList
144-
.mapNotNull { it.getCompletionExceptionOrNull() }
145-
.map { it.message }
146-
.joinToString("\n")
147-
if (exceptionMessages.isNotBlank()) {
148-
throw ConnectException(exceptionMessages)
143+
144+
return result
145+
}
146+
147+
@ExperimentalCoroutinesApi
148+
fun <T> Deferred<T>.errors() = when {
149+
isCompleted -> getCompletionExceptionOrNull()
150+
isCancelled -> getCompletionExceptionOrNull() // was getCancellationException()
151+
isActive -> RuntimeException("Job $this still active")
152+
else -> null }
153+
154+
suspend fun writeData(data: Map<String, List<List<StreamsSinkEntity>>>) {
155+
val errors = if (config.parallelBatches) writeDataAsync(data) else writeDataSync(data);
156+
157+
if (errors.isNotEmpty()) {
158+
throw ConnectException(errors.map { it.message }.distinct().joinToString("\n", "Errors executing ${data.values.map { it.size }.sum()} jobs:\n"))
149159
}
150160
}
161+
162+
@ExperimentalCoroutinesApi
163+
@ObsoleteCoroutinesApi
164+
suspend fun writeDataAsync(data: Map<String, List<List<StreamsSinkEntity>>>) = coroutineScope {
165+
val jobs = data
166+
.flatMap { (topic, records) ->
167+
records.map { async(Dispatchers.IO) { writeForTopic(topic, it) } }
168+
}
169+
170+
jobs.awaitAll(config.batchTimeout)
171+
jobs.mapNotNull { it.errors() }
172+
}
173+
174+
fun writeDataSync(data: Map<String, List<List<StreamsSinkEntity>>>) =
175+
data.flatMap { (topic, records) ->
176+
records.map {
177+
try {
178+
writeForTopic(topic, it)
179+
} catch (e: Exception) {
180+
e
181+
}
182+
}.filterIsInstance<Throwable>()
183+
}
151184
}

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
6262

6363
val batchTimeout: Long
6464
val batchSize: Int
65+
val parallelBatches: Boolean
6566

6667
// val cdcTopics: Map<TopicType, Set<String>>
6768
//
@@ -107,6 +108,7 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
107108
topics = Topics.from(originals, "streams.sink.", "neo4j.")
108109
strategyMap = TopicUtils.toStrategyMap(topics, sourceIdStrategyConfig)
109110

111+
parallelBatches = getBoolean(BATCH_PARALLELIZE)
110112
validateAllTopics(originals)
111113
}
112114

@@ -149,6 +151,7 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
149151

150152
const val BATCH_SIZE = "neo4j.batch.size"
151153
const val BATCH_TIMEOUT_MSECS = "neo4j.batch.timeout.msecs"
154+
const val BATCH_PARALLELIZE = "neo4j.batch.parallelize"
152155

153156
const val RETRY_BACKOFF_MSECS = "neo4j.retry.backoff.msecs"
154157
const val RETRY_MAX_ATTEMPTS = "neo4j.retry.max.attemps"
@@ -331,6 +334,10 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
331334
.documentation(PropertiesUtil.getProperty(TOPIC_CDC_SCHEMA)).importance(ConfigDef.Importance.HIGH)
332335
.defaultValue("").group(ConfigGroup.TOPIC_CYPHER_MAPPING)
333336
.build())
337+
.define(ConfigKeyBuilder.of(BATCH_PARALLELIZE, ConfigDef.Type.BOOLEAN)
338+
.documentation(PropertiesUtil.getProperty(BATCH_PARALLELIZE)).importance(ConfigDef.Importance.MEDIUM)
339+
.defaultValue(true).group(ConfigGroup.BATCH)
340+
.build())
334341
}
335342
}
336343
}

kafka-connect-neo4j/src/main/resources/kafka-connect-sink.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# limitations under the License.
1414
##
1515

16-
error.reporting=Type: enum[logging, throwing, deadletter];\nDescription: Error Reporting Mode, one of: logging, throwing, deadletter
1716
neo4j.server.uri=Type: String;\nDescription: The Bolt URI (default bolt://localhost:7687)
1817
neo4j.authentication.type=Type: enum[NONE, BASIC, KERBEROS];\nDescription: The authentication type (default BASIC)
1918
neo4j.batch.size=Type: Int;\nDescription: The max number of events processed by the Cypher query (default 1000)
@@ -35,4 +34,5 @@ neo4j.retry.max.attemps=Type: Int;\nDescription: The maximum number of times to
3534
neo4j.topic.cdc.sourceId=Type: String;\nDescription: The topic that manages CDC events with the `SourceId` strategy
3635
neo4j.topic.cdc.sourceId.labelName=Type: String;\nDescription: The label name attached to the events with the `SourceId` strategy (default SourceEvent)
3736
neo4j.topic.cdc.sourceId.idName=Type: String;\nDescription: The id property name attached to the events with the `SourceId` strategy (default sourceId)
38-
neo4j.topic.cdc.schema=Type: String;\nDescription: The topic that manages CDC events with the `Schema` strategy
37+
neo4j.topic.cdc.schema=Type: String;\nDescription: The topic that manages CDC events with the `Schema` strategy
38+
neo4j.batch.parallelize=Type: Boolean;\nDescription: If enabled messages are processed concurrently in the sink. Non concurrent execution supports in-order processing, e.g. for CDC (default true)

0 commit comments

Comments
 (0)