Skip to content

Commit 1c9287b

Browse files
Added support for secured URI schemes (#333)
1 parent b87c73f commit 1c9287b

File tree

4 files changed

+23
-13
lines changed

4 files changed

+23
-13
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,5 @@ Thumbs.db
3333
.cache-main
3434
.cache-tests
3535
bin
36+
doc/node
37+
doc/node_modules

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,24 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
4242

4343
init {
4444
val configBuilder = Config.builder()
45-
if (this.config.encryptionEnabled) {
46-
configBuilder.withEncryption()
47-
val trustStrategy: Config.TrustStrategy = when (this.config.encryptionTrustStrategy) {
48-
Config.TrustStrategy.Strategy.TRUST_ALL_CERTIFICATES -> Config.TrustStrategy.trustAllCertificates()
49-
Config.TrustStrategy.Strategy.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES -> Config.TrustStrategy.trustSystemCertificates()
50-
Config.TrustStrategy.Strategy.TRUST_CUSTOM_CA_SIGNED_CERTIFICATES -> Config.TrustStrategy.trustCustomCertificateSignedBy(this.config.encryptionCACertificateFile)
51-
else -> {
52-
throw ConfigException(Neo4jSinkConnectorConfig.ENCRYPTION_TRUST_STRATEGY, this.config.encryptionTrustStrategy.toString(), "Encryption Trust Strategy is not supported.")
45+
46+
if (!this.config.hasSecuredURI()) {
47+
if (this.config.encryptionEnabled) {
48+
configBuilder.withEncryption()
49+
val trustStrategy: Config.TrustStrategy = when (this.config.encryptionTrustStrategy) {
50+
Config.TrustStrategy.Strategy.TRUST_ALL_CERTIFICATES -> Config.TrustStrategy.trustAllCertificates()
51+
Config.TrustStrategy.Strategy.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES -> Config.TrustStrategy.trustSystemCertificates()
52+
Config.TrustStrategy.Strategy.TRUST_CUSTOM_CA_SIGNED_CERTIFICATES -> Config.TrustStrategy.trustCustomCertificateSignedBy(this.config.encryptionCACertificateFile)
53+
else -> {
54+
throw ConfigException(Neo4jSinkConnectorConfig.ENCRYPTION_TRUST_STRATEGY, this.config.encryptionTrustStrategy.toString(), "Encryption Trust Strategy is not supported.")
55+
}
5356
}
57+
configBuilder.withTrustStrategy(trustStrategy)
58+
} else {
59+
configBuilder.withoutEncryption()
5460
}
55-
configBuilder.withTrustStrategy(trustStrategy)
56-
} else {
57-
configBuilder.withoutEncryption()
5861
}
62+
5963
val authToken = when (this.config.authenticationType) {
6064
AuthenticationType.NONE -> AuthTokens.none()
6165
AuthenticationType.BASIC -> {

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.apache.kafka.common.config.AbstractConfig
1010
import org.apache.kafka.common.config.ConfigDef
1111
import org.apache.kafka.common.config.ConfigException
1212
import org.apache.kafka.connect.sink.SinkTask
13+
import org.neo4j.cypher.internal.logical.plans.`Optional$`
1314
import org.neo4j.driver.internal.async.pool.PoolSettings
1415
import org.neo4j.driver.Config
1516
import streams.kafka.connect.utils.PropertiesUtil
@@ -19,6 +20,7 @@ import streams.service.Topics
1920
import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
2021
import java.io.File
2122
import java.net.URI
23+
import java.util.*
2224
import java.util.concurrent.TimeUnit
2325

2426
enum class AuthenticationType {
@@ -129,6 +131,8 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
129131
}
130132
}
131133

134+
fun hasSecuredURI() = serverUri.any { it.scheme.endsWith("+s", true) || it.scheme.endsWith("+ssc", true) }
135+
132136
companion object {
133137
const val SERVER_URI = "neo4j.server.uri"
134138
const val DATABASE = "neo4j.database"
@@ -220,7 +224,7 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
220224
.importance(ConfigDef.Importance.HIGH)
221225
.defaultValue("bolt://localhost:7687")
222226
.group(ConfigGroup.CONNECTION)
223-
.validator(Validators.validURI("bolt", "bolt+routing", "neo4j"))
227+
.validator(Validators.validURI("bolt", "bolt+routing", "bolt+s", "bolt+ssc","neo4j", "neo4j+s", "neo4j+ssc"))
224228
.build())
225229
.define(ConfigKeyBuilder
226230
.of(CONNECTION_POOL_MAX_SIZE, ConfigDef.Type.INT)

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
<kafka.version>2.3.0</kafka.version>
5555
<jackson.version>2.9.7</jackson.version>
5656
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
57-
<neo4j.java.driver.version>4.0.1</neo4j.java.driver.version>
57+
<neo4j.java.driver.version>4.1.0</neo4j.java.driver.version>
5858
<testcontainers.version>1.11.0</testcontainers.version>
5959
<avro.version>1.8.2</avro.version>
6060
<mokito.version>3.2.0</mokito.version>

0 commit comments

Comments
 (0)