Skip to content

Commit d7e1fa5

Browse files
moxiousjexp
authored andcommitted
issue #221 allow more than 1 URI to be given to connect serverUri, re… (#226)
* issue #221 allow more than 1 URI to be given to connect serverUri, resulting in a routingDriver * PR feedback from @jexp; update driver, use improved addr approach
1 parent 71c2523 commit d7e1fa5

File tree

4 files changed

+35
-7
lines changed

4 files changed

+35
-7
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import org.neo4j.driver.v1.Driver
1414
import org.neo4j.driver.v1.GraphDatabase
1515
import org.neo4j.driver.v1.exceptions.ClientException
1616
import org.neo4j.driver.v1.exceptions.TransientException
17+
import org.neo4j.driver.v1.net.ServerAddress
1718
import org.slf4j.Logger
1819
import org.slf4j.LoggerFactory
1920
import streams.kafka.connect.sink.converters.Neo4jValueConverter
@@ -66,8 +67,10 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
6667
configBuilder.withConnectionAcquisitionTimeout(this.config.connectionAcquisitionTimeout, TimeUnit.MILLISECONDS)
6768
configBuilder.withLoadBalancingStrategy(this.config.loadBalancingStrategy)
6869
configBuilder.withMaxTransactionRetryTime(config.retryBackoff, TimeUnit.MILLISECONDS)
70+
configBuilder.withResolver { address -> this.config.serverUri.map { ServerAddress.of(it.host, it.port) }.toSet() }
6971
val neo4jConfig = configBuilder.toConfig()
70-
this.driver = GraphDatabase.driver(this.config.serverUri, authToken, neo4jConfig)
72+
73+
this.driver = GraphDatabase.driver(this.config.serverUri.get(0), authToken, neo4jConfig)
7174
}
7275

7376
fun close() {

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import streams.service.Topics
1919
import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
2020
import java.io.File
2121
import java.net.URI
22+
import java.net.URISyntaxException
2223
import java.util.concurrent.TimeUnit
2324

2425
enum class AuthenticationType {
@@ -46,7 +47,7 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
4647
val authenticationRealm: String
4748
val authenticationKerberosTicket: String
4849

49-
val serverUri: URI
50+
val serverUri: List<URI>
5051
val connectionMaxConnectionLifetime: Long
5152
val connectionLifenessCheckTimeout: Long
5253
val connectionPoolMaxSize: Int
@@ -85,7 +86,7 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
8586
authenticationPassword = getPassword(AUTHENTICATION_BASIC_PASSWORD).value()
8687
authenticationKerberosTicket = getPassword(AUTHENTICATION_KERBEROS_TICKET).value()
8788

88-
serverUri = ConfigUtils.uri(this, SERVER_URI)
89+
serverUri = listOfURIs(SERVER_URI, getString(SERVER_URI))
8990
connectionLifenessCheckTimeout = getLong(CONNECTION_LIVENESS_CHECK_TIMEOUT_MSECS)
9091
connectionMaxConnectionLifetime = getLong(CONNECTION_MAX_CONNECTION_LIFETIME_MSECS)
9192
connectionPoolMaxSize = getInt(CONNECTION_POOL_MAX_SIZE)
@@ -106,6 +107,10 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
106107
validateAllTopics(originals)
107108
}
108109

110+
private fun listOfURIs(key: String, value: String): List<URI> {
111+
return value.split(",").map { URI(it) }
112+
}
113+
109114
private fun validateAllTopics(originals: Map<*, *>) {
110115
TopicUtils.validate<ConfigException>(this.topics)
111116
val topics = if (originals.containsKey(SinkTask.TOPICS_CONFIG)) {

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfigTest.kt

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,21 @@ class Neo4jSinkConnectorConfigTest {
3939

4040
@Test
4141
fun `should return the configuration`() {
42+
val a = "bolt://neo4j:7687"
43+
val b = "bolt://neo4j2:7687"
44+
4245
val originals = mapOf(SinkConnector.TOPICS_CONFIG to "foo",
4346
"${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo" to "CREATE (p:Person{name: event.firstName})",
44-
Neo4jSinkConnectorConfig.SERVER_URI to "bolt://neo4j:7687",
47+
Neo4jSinkConnectorConfig.SERVER_URI to "$a,$b", // Check for string trimming
4548
Neo4jSinkConnectorConfig.BATCH_SIZE to 10,
4649
Neo4jSinkConnectorConfig.AUTHENTICATION_BASIC_USERNAME to "FOO",
4750
Neo4jSinkConnectorConfig.AUTHENTICATION_BASIC_PASSWORD to "BAR")
4851
val config = Neo4jSinkConnectorConfig(originals)
4952

5053
assertEquals(originals["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo"], config.topics.cypherTopics["foo"])
5154
assertFalse { config.encryptionEnabled }
52-
assertEquals(originals[Neo4jSinkConnectorConfig.SERVER_URI], config.serverUri.toString())
55+
assertEquals(a, config.serverUri.get(0).toString())
56+
assertEquals(b, config.serverUri.get(1).toString())
5357
assertEquals(originals[Neo4jSinkConnectorConfig.BATCH_SIZE], config.batchSize)
5458
assertEquals(Config.TrustStrategy.Strategy.TRUST_ALL_CERTIFICATES, config.encryptionTrustStrategy)
5559
assertEquals(AuthenticationType.BASIC, config.authenticationType)
@@ -67,6 +71,22 @@ class Neo4jSinkConnectorConfigTest {
6771
assertEquals(Neo4jSinkConnectorConfig.BATCH_TIMEOUT_DEFAULT, config.batchTimeout)
6872
}
6973

74+
@Test
75+
fun `should return valid configuration with multiple URIs`() {
76+
val a = "bolt://neo4j:7687"
77+
val b = "bolt://neo4j2:7687"
78+
val c = "bolt://neo4j3:7777"
79+
80+
val originals = mapOf(SinkConnector.TOPICS_CONFIG to "foo",
81+
"${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo" to "CREATE (p:Person{name: event.firstName})",
82+
Neo4jSinkConnectorConfig.SERVER_URI to "$a,$b,$c")
83+
val config = Neo4jSinkConnectorConfig(originals)
84+
85+
assertEquals(a, config.serverUri.get(0).toString())
86+
assertEquals(b, config.serverUri.get(1).toString())
87+
assertEquals(c, config.serverUri.get(2).toString())
88+
}
89+
7090
@Test
7191
fun `should return the configuration with shuffled topic order`() {
7292
val originals = mapOf(SinkConnector.TOPICS_CONFIG to "bar,foo",
@@ -80,7 +100,7 @@ class Neo4jSinkConnectorConfigTest {
80100

81101
assertEquals(originals["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo"], config.topics.cypherTopics["foo"])
82102
assertFalse { config.encryptionEnabled }
83-
assertEquals(originals[Neo4jSinkConnectorConfig.SERVER_URI], config.serverUri.toString())
103+
assertEquals(originals[Neo4jSinkConnectorConfig.SERVER_URI], config.serverUri.get(0).toString())
84104
assertEquals(originals[Neo4jSinkConnectorConfig.BATCH_SIZE], config.batchSize)
85105
assertEquals(Config.TrustStrategy.Strategy.TRUST_ALL_CERTIFICATES, config.encryptionTrustStrategy)
86106
assertEquals(AuthenticationType.BASIC, config.authenticationType)

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
<kafka.version>1.0.1</kafka.version>
4040
<jackson.version>2.9.7</jackson.version>
4141
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
42-
<neo4j.java.driver.version>1.6.3</neo4j.java.driver.version>
42+
<neo4j.java.driver.version>1.7.5</neo4j.java.driver.version>
4343
<testcontainers.version>1.9.1</testcontainers.version>
4444
</properties>
4545

0 commit comments

Comments
 (0)