Skip to content

Commit e98c0a6

Browse files
committed
fixes #208: Kafka Connect sink Topics Validation must not take count of topic order definition
1 parent 4b96362 commit e98c0a6

File tree

2 files changed

+50
-14
lines changed

2 files changed

+50
-14
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,18 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
109109
private fun validateAllTopics(originals: Map<*, *>) {
110110
TopicUtils.validate<ConfigException>(this.topics)
111111
val topics = if (originals.containsKey(SinkTask.TOPICS_CONFIG)) {
112-
originals["topics"].toString().split(",").map { it.trim() }
112+
originals["topics"].toString()
113+
.split(",")
114+
.map { it.trim() }
115+
.sorted()
113116
} else { // TODO manage regexp
114117
emptyList()
115118
}
116-
val allTopics = this.topics.allTopics()
119+
val allTopics = this.topics
120+
.allTopics()
121+
.sorted()
117122
if (topics != allTopics) {
118-
throw ConfigException("There is a mismatch between topics defined into the property `${SinkTask.TOPICS_CONFIG}` and configured topics ($topics)")
123+
throw ConfigException("There is a mismatch between topics defined into the property `${SinkTask.TOPICS_CONFIG}` ($topics) and configured topics ($allTopics)")
119124
}
120125
}
121126

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfigTest.kt

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,17 @@ import kotlin.test.assertNull
1111

1212
class Neo4jSinkConnectorConfigTest {
1313

14-
@Test(expected = ConfigException::class)
15-
fun `should throw a ConfigException because of mismatch`() {
16-
try {
17-
val originals = mapOf(SinkConnector.TOPICS_CONFIG to "foo, bar",
18-
"${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo" to "CREATE (p:Person{name: event.firstName})")
19-
Neo4jSinkConnectorConfig(originals)
20-
} catch (e: ConfigException) {
21-
assertEquals("There is a mismatch between topics defined into the property `topics` and configured topics ([foo, bar])", e.message)
22-
throw e
23-
}
24-
}
14+
@Test(expected = ConfigException::class)
15+
fun `should throw a ConfigException because of mismatch`() {
16+
try {
17+
val originals = mapOf(SinkConnector.TOPICS_CONFIG to "foo, bar",
18+
"${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo" to "CREATE (p:Person{name: event.firstName})")
19+
Neo4jSinkConnectorConfig(originals)
20+
} catch (e: ConfigException) {
21+
assertEquals("There is a mismatch between topics defined into the property `topics` ([bar, foo]) and configured topics ([foo])", e.message)
22+
throw e
23+
}
24+
}
2525

2626
@Test(expected = ConfigException::class)
2727
fun `should throw a ConfigException because of cross defined topics`() {
@@ -67,4 +67,35 @@ class Neo4jSinkConnectorConfigTest {
6767
assertEquals(Neo4jSinkConnectorConfig.BATCH_TIMEOUT_DEFAULT, config.batchTimeout)
6868
}
6969

70+
@Test
71+
fun `should return the configuration with shuffled topic order`() {
72+
val originals = mapOf(SinkConnector.TOPICS_CONFIG to "bar,foo",
73+
"${Neo4jSinkConnectorConfig.TOPIC_PATTERN_NODE_PREFIX}foo" to "(:Foo{!fooId,fooName})",
74+
"${Neo4jSinkConnectorConfig.TOPIC_PATTERN_NODE_PREFIX}bar" to "(:Bar{!barId,barName})",
75+
Neo4jSinkConnectorConfig.SERVER_URI to "bolt://neo4j:7687",
76+
Neo4jSinkConnectorConfig.BATCH_SIZE to 10,
77+
Neo4jSinkConnectorConfig.AUTHENTICATION_BASIC_USERNAME to "FOO",
78+
Neo4jSinkConnectorConfig.AUTHENTICATION_BASIC_PASSWORD to "BAR")
79+
val config = Neo4jSinkConnectorConfig(originals)
80+
81+
assertEquals(originals["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}foo"], config.topics.cypherTopics["foo"])
82+
assertFalse { config.encryptionEnabled }
83+
assertEquals(originals[Neo4jSinkConnectorConfig.SERVER_URI], config.serverUri.toString())
84+
assertEquals(originals[Neo4jSinkConnectorConfig.BATCH_SIZE], config.batchSize)
85+
assertEquals(Config.TrustStrategy.Strategy.TRUST_ALL_CERTIFICATES, config.encryptionTrustStrategy)
86+
assertEquals(AuthenticationType.BASIC, config.authenticationType)
87+
assertEquals(originals[Neo4jSinkConnectorConfig.AUTHENTICATION_BASIC_USERNAME], config.authenticationUsername)
88+
assertEquals(originals[Neo4jSinkConnectorConfig.AUTHENTICATION_BASIC_PASSWORD], config.authenticationPassword)
89+
assertEquals(originals[Neo4jSinkConnectorConfig.AUTHENTICATION_BASIC_PASSWORD], config.authenticationPassword)
90+
assertEquals("", config.authenticationKerberosTicket)
91+
assertNull(config.encryptionCACertificateFile, "encryptionCACertificateFile should be null")
92+
93+
assertEquals(PoolSettings.DEFAULT_MAX_CONNECTION_LIFETIME, config.connectionMaxConnectionLifetime)
94+
assertEquals(PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, config.connectionLifenessCheckTimeout)
95+
assertEquals(Neo4jSinkConnectorConfig.CONNECTION_POOL_MAX_SIZE_DEFAULT, config.connectionPoolMaxSize)
96+
assertEquals(PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, config.connectionAcquisitionTimeout)
97+
assertEquals(Config.LoadBalancingStrategy.LEAST_CONNECTED, config.loadBalancingStrategy)
98+
assertEquals(Neo4jSinkConnectorConfig.BATCH_TIMEOUT_DEFAULT, config.batchTimeout)
99+
}
100+
70101
}

0 commit comments

Comments
 (0)