Skip to content

Commit a7b73bd

Browse files
conker84mneedham
authored andcommitted
fixes 160: change the streams.sink.enabled to false (#176)
1 parent 54138b1 commit a7b73bd

File tree

6 files changed

+7
-5
lines changed

6 files changed

+7
-5
lines changed

consumer/src/main/kotlin/streams/StreamsSinkConfiguration.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ private object StreamsSinkConfigurationConstants {
1111
const val PROCEDURES_ENABLED = "procedures.enabled"
1212
}
1313

14-
data class StreamsSinkConfiguration(val enabled: Boolean = true,
14+
data class StreamsSinkConfiguration(val enabled: Boolean = false,
1515
val proceduresEnabled: Boolean = true,
1616
val sinkPollingInterval: Long = 10000,
1717
val topics: Map<String, String> = emptyMap()) {

consumer/src/test/kotlin/integrations/KafkaEventSinkIT.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class KafkaEventSinkIT {
8484
var graphDatabaseBuilder = TestGraphDatabaseFactory()
8585
.newImpermanentDatabaseBuilder()
8686
.setConfig("kafka.bootstrap.servers", kafka.bootstrapServers)
87+
.setConfig("streams.sink.enabled", "true")
8788
if (!testName.methodName.endsWith(EXCLUDE_LOAD_TOPIC_METHOD_SUFFIX)) {
8889
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.shouldWriteCypherQuery", cypherQueryTemplate)
8990
}

consumer/src/test/kotlin/streams/StreamsSinkConfigurationTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ class StreamsSinkConfigurationTest {
2323
val config = Config.builder()
2424
.withSetting("streams.sink.polling.interval", pollingInterval)
2525
.withSetting(topicKey, topicValue)
26-
.withSetting("streams.sink.enabled", "false")
26+
.withSetting("streams.sink.enabled", "true")
2727
.build()
2828
val streamsConfig = StreamsSinkConfiguration.from(config)
2929
testFromConf(streamsConfig, pollingInterval, topic, topicValue)
30-
assertFalse { streamsConfig.enabled }
30+
assertTrue { streamsConfig.enabled }
3131
}
3232

3333
companion object {

doc/asciidoc/consumer/configuration.adoc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ kafka.group.id=neo4j
99
1010
streams.sink.polling.interval=<The time, in milliseconds, spent waiting in poll if data is not available in the buffer. default=Long.MAX_VALUE>
1111
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
12-
streams.sink.enabled=<true/false, default=true>
13-
----
12+
streams.sink.enabled=<true/false, default=false>
1413
1514
See the https://kafka.apache.org/documentation/#brokerconfigs[Apache Kafka documentation] for details on these settings.

doc/asciidoc/docker/index.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ In case of you are using the Sink you can define your topic/cypher-query combina
6868
[source,yaml]
6969
----
7070
environment:
71+
NEO4J_streams_sink_enabled: "true"
7172
NEO4J_streams_sink_topic_neo4j:
7273
"WITH event.value.payload AS payload, event.value.meta AS meta
7374
FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |

performance/docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ services:
4242
NEO4J_dbms_memory_heap_max__size: 2G
4343
NEO4J_kafka_max_poll_records: 16384
4444
NEO4J_streams_source_enabled: "false"
45+
NEO4J_streams_sink_enabled: "true"
4546
NEO4J_streams_sink_topic_cypher_neo4j: "WITH event.payload AS payload, event.meta AS meta CALL apoc.do.case( [
4647
payload.type = 'node' AND meta.operation = 'created', \
4748
'CREATE (x:Performance {received_time: apoc.date.currentTimestamp()}) SET x+=props RETURN count(x)']

0 commit comments

Comments
 (0)