Skip to content

Commit ff9fdee

Browse files
authored
Fixes #454: remove references to ZooKeeper (#463)
* Fixes #454: remove references to ZooKeeper * clean * removed yml conf * restored adoc row * fix assertion
1 parent 85eb844 commit ff9fdee

File tree

22 files changed

+8
-63
lines changed

22 files changed

+8
-63
lines changed

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ class KafkaEventSink(private val config: Map<String, String>,
4848
dbName = db.databaseName(), isDefaultDb = db.isDefaultDb())
4949

5050
override val mappingKeys = mapOf(
51-
"zookeeper" to "kafka.zookeeper.connect",
5251
"broker" to "kafka.${ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG}",
5352
"from" to "kafka.${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}",
5453
"autoCommit" to "kafka.${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG}",

consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ private fun validateDeserializers(config: KafkaSinkConfiguration) {
3030
}
3131
}
3232

33-
data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181",
34-
val bootstrapServers: String = "localhost:9092",
33+
data class KafkaSinkConfiguration(val bootstrapServers: String = "localhost:9092",
3534
val keyDeserializer: String = "org.apache.kafka.common.serialization.ByteArrayDeserializer",
3635
val valueDeserializer: String = "org.apache.kafka.common.serialization.ByteArrayDeserializer",
3736
val groupId: String = "neo4j",
@@ -67,8 +66,7 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
6766
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configMap = cfg, dbName = dbName, isDefaultDb = isDefaultDb)
6867

6968

70-
return default.copy(zookeeperConnect = config.getOrDefault("zookeeper.connect",default.zookeeperConnect),
71-
keyDeserializer = config.getOrDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, default.keyDeserializer),
69+
return default.copy(keyDeserializer = config.getOrDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, default.keyDeserializer),
7270
valueDeserializer = config.getOrDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, default.valueDeserializer),
7371
bootstrapServers = config.getOrDefault(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, default.bootstrapServers),
7472
autoOffsetReset = config.getOrDefault(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, default.autoOffsetReset),
@@ -81,7 +79,6 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
8179
}
8280

8381
private fun validate(config: KafkaSinkConfiguration) {
84-
validateConnection(config.zookeeperConnect, "zookeeper.connect", false)
8582
validateConnection(config.bootstrapServers, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, false)
8683
val schemaRegistryUrlKey = "schema.registry.url"
8784
if (config.extraProperties.containsKey(schemaRegistryUrlKey)) {

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkPatternTSE.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ class KafkaEventSinkPatternTSE : KafkaEventSinkBaseTSE() {
138138

139139
val kafkaProperties = Properties()
140140
kafkaProperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEventSinkSuiteIT.kafka.bootstrapServers
141-
kafkaProperties["zookeeper.connect"] = KafkaEventSinkSuiteIT. kafka.envMap["KAFKA_ZOOKEEPER_CONNECT"] ?: "localhost:2181"
142141
kafkaProperties["group.id"] = "neo4j"
143142
kafkaProperties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
144143
kafkaProperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java

consumer/src/test/kotlin/integrations/kafka/SchemaRegistryContainer.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import org.testcontainers.containers.GenericContainer
44
import org.testcontainers.containers.wait.strategy.Wait
55
import org.testcontainers.containers.KafkaContainer
66
import org.testcontainers.containers.KafkaContainer.KAFKA_PORT
7-
import org.testcontainers.containers.KafkaContainer.ZOOKEEPER_PORT
87
import org.testcontainers.containers.Network
98
import org.testcontainers.containers.SocatContainer
109
import java.util.stream.Stream

consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ class KafkaSinkConfigurationTest {
2929
val default = KafkaSinkConfiguration()
3030
StreamsSinkConfigurationTest.testDefaultConf(default.streamsSinkConfiguration)
3131

32-
assertEquals("localhost:2181", default.zookeeperConnect)
3332
assertEquals("localhost:9092", default.bootstrapServers)
3433
assertEquals("neo4j", default.groupId)
3534
assertEquals("earliest", default.autoOffsetReset)
@@ -45,21 +44,19 @@ class KafkaSinkConfigurationTest {
4544
val topic = "topic-neo"
4645
val topicKey = "streams.sink.topic.cypher.$topic"
4746
val topicValue = "MERGE (n:Label{ id: event.id }) "
48-
val zookeeper = "zookeeper:2181"
4947
val bootstrap = "bootstrap:9092"
5048
val group = "foo"
5149
val autoOffsetReset = "latest"
5250
val autoCommit = "false"
5351
val config = mapOf(topicKey to topicValue,
54-
"kafka.zookeeper.connect" to zookeeper,
5552
"kafka.bootstrap.servers" to bootstrap,
5653
"kafka.auto.offset.reset" to autoOffsetReset,
5754
"kafka.enable.auto.commit" to autoCommit,
5855
"kafka.group.id" to group,
5956
"kafka.streams.async.commit" to "true",
6057
"kafka.key.deserializer" to ByteArrayDeserializer::class.java.name,
6158
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name)
62-
val expectedMap = mapOf("zookeeper.connect" to zookeeper, "bootstrap.servers" to bootstrap,
59+
val expectedMap = mapOf("bootstrap.servers" to bootstrap,
6360
"auto.offset.reset" to autoOffsetReset, "enable.auto.commit" to autoCommit, "group.id" to group,
6461
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
6562
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
@@ -70,7 +67,6 @@ class KafkaSinkConfigurationTest {
7067
val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config, defaultDbName, isDefaultDb = true)
7168
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValue)
7269
assertEquals(emptyMap(), kafkaSinkConfiguration.extraProperties)
73-
assertEquals(zookeeper, kafkaSinkConfiguration.zookeeperConnect)
7470
assertEquals(bootstrap, kafkaSinkConfiguration.bootstrapServers)
7571
assertEquals(autoOffsetReset, kafkaSinkConfiguration.autoOffsetReset)
7672
assertEquals(group, kafkaSinkConfiguration.groupId)
@@ -93,23 +89,21 @@ class KafkaSinkConfigurationTest {
9389
val topicValue = "MERGE (n:Label{ id: event.id })"
9490
val topicKeyFoo = "streams.sink.topic.cypher.$topic.to.foo"
9591
val topicValueFoo = "MERGE (n:Foo{ id: event.id })"
96-
val zookeeper = "zookeeper:2181"
9792
val bootstrap = "bootstrap:9092"
9893
val group = "mygroup"
9994
val autoOffsetReset = "latest"
10095
val autoCommit = "false"
10196
val asyncCommit = "true"
10297
val config = mapOf(topicKey to topicValue,
10398
topicKeyFoo to topicValueFoo,
104-
"kafka.zookeeper.connect" to zookeeper,
10599
"kafka.bootstrap.servers" to bootstrap,
106100
"kafka.auto.offset.reset" to autoOffsetReset,
107101
"kafka.enable.auto.commit" to autoCommit,
108102
"kafka.group.id" to group,
109103
"kafka.streams.async.commit" to asyncCommit,
110104
"kafka.key.deserializer" to ByteArrayDeserializer::class.java.name,
111105
"kafka.value.deserializer" to KafkaAvroDeserializer::class.java.name)
112-
val expectedMap = mapOf("zookeeper.connect" to zookeeper, "bootstrap.servers" to bootstrap,
106+
val expectedMap = mapOf("bootstrap.servers" to bootstrap,
113107
"auto.offset.reset" to autoOffsetReset, "enable.auto.commit" to autoCommit, "group.id" to "$group-$dbName",
114108
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
115109
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java.toString(),
@@ -120,7 +114,6 @@ class KafkaSinkConfigurationTest {
120114
val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config, dbName, isDefaultDb = false)
121115
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValueFoo)
122116
assertEquals(emptyMap(), kafkaSinkConfiguration.extraProperties)
123-
assertEquals(zookeeper, kafkaSinkConfiguration.zookeeperConnect)
124117
assertEquals(bootstrap, kafkaSinkConfiguration.bootstrapServers)
125118
assertEquals(autoOffsetReset, kafkaSinkConfiguration.autoOffsetReset)
126119
assertTrue { kafkaSinkConfiguration.streamsAsyncCommit }
@@ -140,7 +133,6 @@ class KafkaSinkConfigurationTest {
140133
@Test(expected = TopicValidationException::class)
141134
@Ignore("Disabled, use Kafka to deal with availability of the configured services")
142135
fun `should not validate the configuration because of unreachable kafka bootstrap server`() {
143-
val zookeeper = "zookeeper:2181"
144136
val bootstrap = "bootstrap:9092"
145137
try {
146138
val topic = "topic-neo"
@@ -151,7 +143,6 @@ class KafkaSinkConfigurationTest {
151143
val autoCommit = "false"
152144
val config = mapOf(topicKey to topicValue,
153145
"$topicKey.to.foo" to "$topicValue SET n += event.properties",
154-
"kafka.zookeeper.connect" to zookeeper,
155146
"kafka.bootstrap.servers" to bootstrap,
156147
"kafka.auto.offset.reset" to autoOffsetReset,
157148
"kafka.enable.auto.commit" to autoCommit,
@@ -167,7 +158,6 @@ class KafkaSinkConfigurationTest {
167158

168159
@Test(expected = RuntimeException::class)
169160
fun `should not validate the configuration because of empty kafka bootstrap server`() {
170-
val zookeeper = "zookeeper:2181"
171161
val bootstrap = ""
172162
try {
173163
val topic = "topic-neo"
@@ -177,7 +167,6 @@ class KafkaSinkConfigurationTest {
177167
val autoOffsetReset = "latest"
178168
val autoCommit = "false"
179169
val config = mapOf(topicKey to topicValue,
180-
"kafka.zookeeper.connect" to zookeeper,
181170
"kafka.bootstrap.servers" to bootstrap,
182171
"kafka.auto.offset.reset" to autoOffsetReset,
183172
"kafka.enable.auto.commit" to autoCommit,

doc/docs/modules/ROOT/pages/configuration.adoc

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,6 @@ You'll have this following output (it's related to *your* configuration 😄)
106106
|"streams.sink.errors.log.enable"
107107
|"true"
108108

109-
|"kafka.zookeeper.connect"
110-
|"zookeeper:2181"
111-
112109
|===
113110

114111
==== `streams.configuration.set`
@@ -195,9 +192,6 @@ You'll have this following output (it's related to *your* configuration 😄)
195192
|"streams.sink.errors.log.enable"
196193
|"true"
197194

198-
|"kafka.zookeeper.connect"
199-
|"zookeeper:2181"
200-
201195
|===
202196

203197
==== `streams.configuration.remove`
@@ -252,7 +246,7 @@ So given the following procedure call:
252246

253247
[source,cypher]
254248
----
255-
CALL streams.configuration.remove([`kafka.zookeeper.connect`], {save: false}) YIELD name, value
249+
CALL streams.configuration.remove([`kafka.acks`], {save: false}) YIELD name, value
256250
RETURN name, value
257251
----
258252

doc/docs/modules/ROOT/pages/consumer-configuration.adoc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ You can set the following Kafka configuration values in your `neo4j.conf`, here
55
.neo4j.conf
66
[source,subs="verbatim,attributes"]
77
----
8-
kafka.zookeeper.connect=localhost:2181
98
kafka.bootstrap.servers=localhost:9092
109
kafka.auto.offset.reset=earliest
1110
kafka.group.id=neo4j

doc/docs/modules/ROOT/pages/examples.adoc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ control-center is [UP]
6161
.neo4j.conf
6262
[source, properties]
6363
----
64-
kafka.zookeeper.connect=localhost:2181
6564
kafka.bootstrap.servers=localhost:9092
6665
kafka.auto.offset.reset=earliest
6766
kafka.group.id=neo4j

doc/docs/modules/ROOT/pages/kafka-ssl.adoc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ Note that the passwords are stored in plaintext so limit access to this `neo4j.c
8282

8383
[source, properties]
8484
----
85-
kafka.zookeeper.connect=xxx.xxx.xxx.xxx:2181
8685
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9094
8786
streams.sink.enabled=false
8887

doc/docs/modules/ROOT/pages/procedures.adoc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,6 @@ If not specified it inherits the underlying `kafka.enable.auto.commit` value
240240
|Boolean (default `true`)
241241
|In case of `autoCommit` is set to `false` you can decide if you want to commit the data.
242242

243-
|`zookeeper`
244-
|String
245-
|The comma separated string of Zookeeper nodes url.
246-
If not specified it inherits the underlying `kafka.zookeeper.connect` value
247-
248243
|`broker`
249244
|String
250245
|The comma separated string of Kafka nodes url.

0 commit comments

Comments
 (0)