Skip to content

Commit 3d03279

Browse files
Fixes #293: streams.sink.polling.interval is possibly confusing and may not be used anyway (#299)
1 parent 0df75fa commit 3d03279

File tree

6 files changed

+17
-41
lines changed

6 files changed

+17
-41
lines changed

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import streams.service.sink.strategy.SourceIdIngestionStrategyConfig
88

99
data class StreamsSinkConfiguration(val enabled: Boolean = StreamsConfig.SINK_ENABLED_VALUE,
1010
val proceduresEnabled: Boolean = StreamsConfig.PROCEDURES_ENABLED_VALUE,
11-
val sinkPollingInterval: Long = 10000,
1211
val topics: Topics = Topics(),
1312
val errorConfig: Map<String,Any?> = emptyMap(),
1413
val sourceIdStrategyConfig: SourceIdIngestionStrategyConfig = SourceIdIngestionStrategyConfig()) {
@@ -42,7 +41,6 @@ data class StreamsSinkConfiguration(val enabled: Boolean = StreamsConfig.SINK_EN
4241

4342
return default.copy(enabled = cfg.isSinkEnabled(dbName),
4443
proceduresEnabled = cfg.hasProceduresEnabled(dbName),
45-
sinkPollingInterval = cfg.config.getOrDefault("streams.sink.polling.interval", default.sinkPollingInterval).toString().toLong(),
4644
topics = topics,
4745
errorConfig = errorHandler,
4846
sourceIdStrategyConfig = sourceIdStrategyConfig)

consumer/src/test/kotlin/streams/StreamsSinkConfigurationTest.kt

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,13 @@ class StreamsSinkConfigurationTest {
3838

3939
@Test
4040
fun `should manage only topics for default db`() {
41-
val pollingInterval = "10"
4241
val topicKey = "streams.sink.topic.cypher.myTopic"
4342
val topicValue = "MERGE (n:Label{ id: event.id })"
4443
val topicKeyNeo = "streams.sink.topic.cypher.myTopicNeo.to.neo4j"
4544
val topicValueNeo = "MERGE (n:Neo4j{ id: event.id })"
4645
val topicKeyFoo = "streams.sink.topic.cypher.myTopicFoo.to.foo"
4746
val topicValueFoo = "MERGE (n:Foo{ id: event.id })"
48-
val config = mapOf("streams.sink.polling.interval" to pollingInterval,
49-
topicKey to topicValue,
47+
val config = mapOf(topicKey to topicValue,
5048
topicKeyNeo to topicValueNeo,
5149
topicKeyFoo to topicValueFoo)
5250
streamsConfig.config.putAll(config)
@@ -57,15 +55,13 @@ class StreamsSinkConfigurationTest {
5755

5856
@Test
5957
fun `should manage only topics for non default db`() {
60-
val pollingInterval = "10"
6158
val topicKey = "streams.sink.topic.cypher.myTopic"
6259
val topicValue = "MERGE (n:Label{ id: event.id })"
6360
val topicKeyNeo = "streams.sink.topic.cypher.myTopicNeo.to.neo4j"
6461
val topicValueNeo = "MERGE (n:Neo4j{ id: event.id })"
6562
val topicKeyFoo = "streams.sink.topic.cypher.myTopicFoo.to.foo"
6663
val topicValueFoo = "MERGE (n:Foo{ id: event.id })"
67-
val config = mapOf("streams.sink.polling.interval" to pollingInterval,
68-
topicKey to topicValue,
64+
val config = mapOf(topicKey to topicValue,
6965
topicKeyNeo to topicValueNeo,
7066
topicKeyFoo to topicValueFoo)
7167
streamsConfig.config.putAll(config)
@@ -82,23 +78,20 @@ class StreamsSinkConfigurationTest {
8278

8379
@Test
8480
fun shouldReturnConfigurationFromMap() {
85-
val pollingInterval = "10"
8681
val topic = "topic-neo"
8782
val cdctopic = "cdctopic"
8883
val topicKey = "streams.sink.topic.cypher.$topic"
8984
val topicValue = "MERGE (n:Label{ id: event.id }) "
9085
val customLabel = "CustomLabel"
9186
val customId = "customId"
92-
val config = mapOf(
93-
"streams.sink.polling.interval" to pollingInterval,
94-
topicKey to topicValue,
87+
val config = mapOf(topicKey to topicValue,
9588
"streams.sink.enabled" to "false",
9689
"streams.sink.topic.cdc.sourceId" to cdctopic,
9790
"streams.sink.topic.cdc.sourceId.labelName" to customLabel,
9891
"streams.sink.topic.cdc.sourceId.idName" to customId)
9992
streamsConfig.config.putAll(config)
10093
val streamsSinkConf = StreamsSinkConfiguration.from(streamsConfig, defalutDbName)
101-
testFromConf(streamsSinkConf, pollingInterval, topic, topicValue)
94+
testFromConf(streamsSinkConf, topic, topicValue)
10295
assertFalse { streamsSinkConf.enabled }
10396
assertEquals(setOf(cdctopic), streamsSinkConf.topics.asMap()[TopicType.CDC_SOURCE_ID])
10497
assertEquals(customLabel, streamsSinkConf.sourceIdStrategyConfig.labelName)
@@ -107,12 +100,10 @@ class StreamsSinkConfigurationTest {
107100

108101
@Test(expected = TopicValidationException::class)
109102
fun shouldFailWithCrossDefinedTopics() {
110-
val pollingInterval = "10"
111103
val topic = "topic-neo"
112104
val topicKey = "streams.sink.topic.cypher.$topic"
113105
val topicValue = "MERGE (n:Label{ id: event.id }) "
114-
val config = mapOf("streams.sink.polling.interval" to pollingInterval,
115-
topicKey to topicValue,
106+
val config = mapOf(topicKey to topicValue,
116107
"streams.sink.topic.pattern.node.nodePatternTopic" to "User{!userId,name,surname,address.city}",
117108
"streams.sink.enabled" to "false",
118109
"streams.sink.topic.cdc.sourceId" to topic)
@@ -122,10 +113,8 @@ class StreamsSinkConfigurationTest {
122113

123114
@Test(expected = TopicValidationException::class)
124115
fun shouldFailWithCrossDefinedCDCTopics() {
125-
val pollingInterval = "10"
126116
val topic = "topic-neo"
127-
val config = mapOf("streams.sink.polling.interval" to pollingInterval,
128-
"streams.sink.enabled" to "false",
117+
val config = mapOf("streams.sink.enabled" to "false",
129118
"streams.sink.topic.cdc.sourceId" to topic,
130119
"streams.sink.topic.cdc.schema" to topic)
131120
streamsConfig.config.putAll(config)
@@ -135,10 +124,8 @@ class StreamsSinkConfigurationTest {
135124
companion object {
136125
fun testDefaultConf(default: StreamsSinkConfiguration) {
137126
assertEquals(emptyMap(), default.topics.cypherTopics)
138-
assertEquals(10000, default.sinkPollingInterval)
139127
}
140-
fun testFromConf(streamsConfig: StreamsSinkConfiguration, pollingInterval: String, topic: String, topicValue: String) {
141-
assertEquals(pollingInterval.toLong(), streamsConfig.sinkPollingInterval)
128+
fun testFromConf(streamsConfig: StreamsSinkConfiguration, topic: String, topicValue: String) {
142129
assertEquals(1, streamsConfig.topics.cypherTopics.size)
143130
assertEquals(topicValue, streamsConfig.topics.cypherTopics[topic])
144131
}

consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ class KafkaSinkConfigurationTest {
5757

5858
@Test
5959
fun `should return configuration from map`() {
60-
val pollingInterval = "10"
6160
val topic = "topic-neo"
6261
val topicKey = "streams.sink.topic.cypher.$topic"
6362
val topicValue = "MERGE (n:Label{ id: event.id }) "
@@ -66,8 +65,7 @@ class KafkaSinkConfigurationTest {
6665
val group = "foo"
6766
val autoOffsetReset = "latest"
6867
val autoCommit = "false"
69-
val config = mapOf("streams.sink.polling.interval" to pollingInterval,
70-
topicKey to topicValue,
68+
val config = mapOf(topicKey to topicValue,
7169
"kafka.zookeeper.connect" to zookeeper,
7270
"kafka.bootstrap.servers" to bootstrap,
7371
"kafka.auto.offset.reset" to autoOffsetReset,
@@ -84,7 +82,7 @@ class KafkaSinkConfigurationTest {
8482

8583
streamsConfig.config.putAll(config)
8684
val kafkaSinkConfiguration = KafkaSinkConfiguration.create(streamsConfig, defaultDbName)
87-
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, pollingInterval, topic, topicValue)
85+
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValue)
8886
assertEquals(emptyMap(), kafkaSinkConfiguration.extraProperties)
8987
assertEquals(zookeeper, kafkaSinkConfiguration.zookeeperConnect)
9088
assertEquals(bootstrap, kafkaSinkConfiguration.bootstrapServers)
@@ -97,15 +95,13 @@ class KafkaSinkConfigurationTest {
9795
assertEquals(expectedMap, resultMap)
9896

9997
val streamsConfig = StreamsSinkConfiguration.from(streamsConfig, defaultDbName)
100-
assertEquals(pollingInterval.toLong(), streamsConfig.sinkPollingInterval)
10198
assertTrue { streamsConfig.topics.cypherTopics.containsKey(topic) }
10299
assertEquals(topicValue, streamsConfig.topics.cypherTopics[topic])
103100
}
104101

105102
@Test
106103
fun `should return configuration from map for non default DB`() {
107104
val dbName = "foo"
108-
val pollingInterval = "10"
109105
val topic = "topic-neo"
110106
val topicKey = "streams.sink.topic.cypher.$topic"
111107
val topicValue = "MERGE (n:Label{ id: event.id })"
@@ -116,8 +112,7 @@ class KafkaSinkConfigurationTest {
116112
val group = "mygroup"
117113
val autoOffsetReset = "latest"
118114
val autoCommit = "false"
119-
val config = mapOf("streams.sink.polling.interval" to pollingInterval,
120-
topicKey to topicValue,
115+
val config = mapOf(topicKey to topicValue,
121116
topicKeyFoo to topicValueFoo,
122117
"kafka.zookeeper.connect" to zookeeper,
123118
"kafka.bootstrap.servers" to bootstrap,
@@ -135,7 +130,7 @@ class KafkaSinkConfigurationTest {
135130

136131
streamsConfig.config.putAll(config)
137132
val kafkaSinkConfiguration = KafkaSinkConfiguration.create(streamsConfig, dbName)
138-
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, pollingInterval, topic, topicValueFoo)
133+
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, topic, topicValueFoo)
139134
assertEquals(emptyMap(), kafkaSinkConfiguration.extraProperties)
140135
assertEquals(zookeeper, kafkaSinkConfiguration.zookeeperConnect)
141136
assertEquals(bootstrap, kafkaSinkConfiguration.bootstrapServers)
@@ -148,7 +143,6 @@ class KafkaSinkConfigurationTest {
148143
assertEquals(expectedMap, resultMap)
149144

150145
val streamsConfig = StreamsSinkConfiguration.from(streamsConfig, dbName)
151-
assertEquals(pollingInterval.toLong(), streamsConfig.sinkPollingInterval)
152146
assertEquals(1, streamsConfig.topics.cypherTopics.size)
153147
assertTrue { streamsConfig.topics.cypherTopics.containsKey(topic) }
154148
assertEquals(topicValueFoo, streamsConfig.topics.cypherTopics[topic])
@@ -160,15 +154,13 @@ class KafkaSinkConfigurationTest {
160154
val zookeeper = "zookeeper:2181"
161155
val bootstrap = "bootstrap:9092"
162156
try {
163-
val pollingInterval = "10"
164157
val topic = "topic-neo"
165158
val topicKey = "streams.sink.topic.cypher.$topic"
166159
val topicValue = "MERGE (n:Label{ id: event.id }) "
167160
val group = "foo"
168161
val autoOffsetReset = "latest"
169162
val autoCommit = "false"
170-
val config = mapOf("streams.sink.polling.interval" to pollingInterval,
171-
topicKey to topicValue,
163+
val config = mapOf(topicKey to topicValue,
172164
"$topicKey.to.foo" to "$topicValue SET n += event.properties",
173165
"kafka.zookeeper.connect" to zookeeper,
174166
"kafka.bootstrap.servers" to bootstrap,
@@ -190,15 +182,13 @@ class KafkaSinkConfigurationTest {
190182
val zookeeper = "zookeeper:2181"
191183
val bootstrap = ""
192184
try {
193-
val pollingInterval = "10"
194185
val topic = "topic-neo"
195186
val topicKey = "streams.sink.topic.cypher.$topic"
196187
val topicValue = "MERGE (n:Label{ id: event.id }) "
197188
val group = "foo"
198189
val autoOffsetReset = "latest"
199190
val autoCommit = "false"
200-
val config = mapOf("streams.sink.polling.interval" to pollingInterval,
201-
topicKey to topicValue,
191+
val config = mapOf(topicKey to topicValue,
202192
"kafka.zookeeper.connect" to zookeeper,
203193
"kafka.bootstrap.servers" to bootstrap,
204194
"kafka.auto.offset.reset" to autoOffsetReset,

doc/asciidoc/docker/index.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ NEO4J_streams_source_schema_polling_interval: 10000
222222
----
223223

224224
this means that every 10 seconds the Streams plugin polls the DB in order to retrieve schema changes and store them.
225-
So after you created the indexes you need almost to wait 10 seconds before the next step, otherwise the
225+
So after you created the indexes you need almost to wait 10 seconds before the next step.
226226

227227
Now lets go to the `Source` and, in order to import the Stackoverflow dataset, execute the following query:
228228

doc/asciidoc/faq/index.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ If you decide to use the Neo4j Streams plugin, then the Neo4j source instance wi
3030
[source, ini]
3131
----
3232
streams.sink.enabled=false
33-
streams.source.schema.polling.interval=10000
33+
streams.source.enabled=true
34+
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
35+
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
3436
----
3537

3638
and the Neo4j sink instance will be configured as follow:

doc/asciidoc/kafka-ssl/index.adoc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ Note that the passwords are stored in plaintext so limit access to this `neo4j.c
8383
kafka.zookeeper.connect=xxx.xxx.xxx.xxx:2181
8484
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9094
8585
streams.sink.enabled=false
86-
streams.sink.polling.interval=1000
8786
8887
streams.source.topic.nodes.neoTest=Person{*}
8988

0 commit comments

Comments
 (0)