Skip to content

Commit 614a7ab

Browse files
authored
fixes #342: 4.1.(0, 1) cluster does not form in the presence of neo4j-streams-4.0.2.jar (#347)
* first commit * improvements * fixes #342: 4.1.(0, 1) cluster does not form in the presence of neo4j-streams-4.0.2.jar * moved Assert class into test-support
1 parent 499d884 commit 614a7ab

File tree

53 files changed

+1947
-514
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1947
-514
lines changed

common/src/main/kotlin/streams/config/StreamsConfig.kt

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagem
1313

1414
val config = ConcurrentHashMap<String, String>()
1515

16-
1716
private lateinit var neo4jConfFolder: String
1817

1918
companion object {
@@ -28,8 +27,12 @@ data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagem
2827
const val SINK_ENABLED = "streams.sink.enabled"
2928
const val SINK_ENABLED_VALUE = false
3029
const val DEFAULT_PATH = "."
31-
const val CHECK_APOC_TIMEOUT = "check.apoc.timeout"
32-
const val CHECK_APOC_INTERVAL = "check.apoc.interval"
30+
const val CHECK_APOC_TIMEOUT = "streams.check.apoc.timeout"
31+
const val CHECK_APOC_INTERVAL = "streams.check.apoc.interval"
32+
const val CLUSTER_ONLY = "streams.cluster.only"
33+
const val CHECK_WRITEABLE_INSTANCE_INTERVAL = "streams.check.writeable.instance.interval"
34+
const val SYSTEM_DB_WAIT_TIMEOUT = "streams.systemdb.wait.timeout"
35+
const val SYSTEM_DB_WAIT_TIMEOUT_VALUE = 10000L
3336
private var afterInitListeners = mutableListOf<((MutableMap<String, String>) -> Unit)>()
3437

3538
fun registerListener(after: (MutableMap<String, String>) -> Unit) {
@@ -80,8 +83,8 @@ data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagem
8083
fun loadStreamsConfiguration() {
8184
val properties = neo4jConfAsProperties()
8285

83-
val filteredValues = filterProperties(properties,
84-
{ key -> key.toString().startsWith("streams.") })
86+
val filteredValues = filterProperties(properties)
87+
{ key -> key.toString().startsWith("streams.") }
8588

8689
if (log.isDebugEnabled) {
8790
log.debug("Neo4j Streams configuration reloaded from neo4j.conf file: $filteredValues")
@@ -95,7 +98,9 @@ data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagem
9598

9699
val properties = Properties()
97100
try {
98-
log.info("The retrieved NEO4J_CONF dir is $neo4jConfFolder")
101+
if (log.isDebugEnabled) {
102+
log.debug("The retrieved NEO4J_CONF dir is $neo4jConfFolder")
103+
}
99104
properties.load(FileInputStream("$neo4jConfFolder/neo4j.conf"))
100105
} catch (e: FileNotFoundException) {
101106
log.error("The neo4j.conf file is not under the directory defined into the directory $neo4jConfFolder, please set the NEO4J_CONF env correctly")
@@ -136,4 +141,7 @@ data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagem
136141

137142
fun isSinkEnabled(dbName: String) = this.config.getOrDefault("${SINK_ENABLED}.to.$dbName", isSinkGloballyEnabled()).toString().toBoolean()
138143

144+
fun getSystemDbWaitTimeout() = this.config.getOrDefault(SYSTEM_DB_WAIT_TIMEOUT, SYSTEM_DB_WAIT_TIMEOUT_VALUE)
145+
.toString().toLong()
146+
139147
}

common/src/main/kotlin/streams/extensions/DatabaseManagementServiceExtensions.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package streams.extensions
22

33
import org.neo4j.dbms.api.DatabaseManagementService
4-
import org.neo4j.dbms.api.DatabaseNotFoundException
54
import org.neo4j.kernel.internal.GraphDatabaseAPI
6-
import streams.utils.Neo4jUtils
5+
import streams.utils.StreamsUtils
76

8-
fun DatabaseManagementService.getSystemDb() = this.database(Neo4jUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI
7+
fun DatabaseManagementService.getSystemDb() = this.database(StreamsUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI
98

109
fun DatabaseManagementService.getDefaultDbName() = getSystemDb().let {
1110
it.beginTx().use {

common/src/main/kotlin/streams/extensions/GraphDatabaseServerExtensions.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package streams.extensions
22

33
import org.neo4j.graphdb.GraphDatabaseService
44
import org.neo4j.graphdb.Result
5-
import streams.utils.Neo4jUtils
5+
import streams.utils.StreamsUtils
66

77
fun GraphDatabaseService.execute(cypher: String) = this.execute(cypher, emptyMap())
88
fun GraphDatabaseService.execute(cypher: String, params: Map<String, Any>) = this.executeTransactionally(cypher, params)
@@ -12,4 +12,4 @@ fun <T> GraphDatabaseService.execute(cypher: String,
1212
params: Map<String, Any>,
1313
lambda: ((Result) -> T)) = this.executeTransactionally(cypher, params, lambda)
1414

15-
fun GraphDatabaseService.isSystemDb() = this.databaseName() == Neo4jUtils.SYSTEM_DATABASE_NAME
15+
fun GraphDatabaseService.isSystemDb() = this.databaseName() == StreamsUtils.SYSTEM_DATABASE_NAME

common/src/main/kotlin/streams/service/StreamsSinkService.kt

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ enum class TopicType(val group: TopicTypeGroup, val key: String) {
1818

1919
data class StreamsSinkEntity(val key: Any?, val value: Any?)
2020

21-
abstract class StreamsSinkService(private val strategyMap: Map<TopicType, Any>) {
22-
21+
abstract class StreamsStrategyStorage {
2322
abstract fun getTopicType(topic: String): TopicType?
24-
abstract fun getCypherTemplate(topic: String): String?
23+
24+
abstract fun getStrategy(topic: String): IngestionStrategy
25+
}
26+
27+
abstract class StreamsSinkService(private val streamsStrategyStorage: StreamsStrategyStorage) {
28+
2529
abstract fun write(query: String, events: Collection<Any>)
2630

2731
private fun writeWithStrategy(data: Collection<StreamsSinkEntity>, strategy: IngestionStrategy) {
@@ -32,20 +36,7 @@ abstract class StreamsSinkService(private val strategyMap: Map<TopicType, Any>)
3236
strategy.deleteRelationshipEvents(data).forEach { write(it.query, it.events) }
3337
}
3438

35-
private fun writeWithCypherTemplate(topic: String, params: Collection<StreamsSinkEntity>) {
36-
val query = getCypherTemplate(topic) ?: return
37-
write(query, params.mapNotNull { it.value })
38-
}
39-
4039
fun writeForTopic(topic: String, params: Collection<StreamsSinkEntity>) {
41-
val topicType = getTopicType(topic) ?: return
42-
when (topicType.group) {
43-
TopicTypeGroup.CYPHER -> writeWithCypherTemplate(topic, params)
44-
TopicTypeGroup.CDC, TopicTypeGroup.CUD -> writeWithStrategy(params, strategyMap.getValue(topicType) as IngestionStrategy)
45-
TopicTypeGroup.PATTERN -> {
46-
val strategyMap = strategyMap[topicType] as Map<String, IngestionStrategy>
47-
writeWithStrategy(params, strategyMap.getValue(topic))
48-
}
49-
}
40+
writeWithStrategy(params, streamsStrategyStorage.getStrategy(topic))
5041
}
5142
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package streams.service.sink.strategy
2+
3+
import streams.service.StreamsSinkEntity
4+
import streams.utils.StreamsUtils
5+
6+
class CypherTemplateStrategy(query: String): IngestionStrategy {
7+
private val fullQuery = "${StreamsUtils.UNWIND} $query"
8+
override fun mergeNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
9+
return listOf(QueryEvents(fullQuery, events.mapNotNull { it.value as? Map<String, Any> }))
10+
}
11+
12+
override fun deleteNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> = emptyList()
13+
14+
override fun mergeRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> = emptyList()
15+
16+
override fun deleteRelationshipEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> = emptyList()
17+
18+
}

common/src/main/kotlin/streams/service/sink/strategy/IngestionStrategy.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package streams.service.sink.strategy
22

3-
import streams.events.*
3+
import streams.events.Constraint
4+
import streams.events.RelationshipPayload
45
import streams.service.StreamsSinkEntity
5-
import streams.utils.Neo4jUtils
6-
import streams.utils.StreamsUtils
76

87

98
data class QueryEvents(val query: String, val events: List<Map<String, Any?>>)

common/src/main/kotlin/streams/utils/KafkaValidationUtils.kt

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,45 @@ package streams.utils
22

33
import org.apache.kafka.clients.admin.AdminClient
44
import org.apache.kafka.common.config.ConfigResource
5-
import java.util.*
5+
import java.util.Properties
66

77
object KafkaValidationUtils {
88
fun getInvalidTopicsError(invalidTopics: List<String>) = "The BROKER config `auto.create.topics.enable` is false, the following topics need to be created into the Kafka cluster otherwise the messages will be discarded: $invalidTopics"
99

10-
fun getInvalidTopics(kafkaProps: Properties, allTopics: List<String>) = getInvalidTopics(AdminClient.create(kafkaProps), allTopics)
10+
fun getInvalidTopics(kafkaProps: Properties, allTopics: List<String>): List<String> = try {
11+
getInvalidTopics(AdminClient.create(kafkaProps), allTopics)
12+
} catch (e: Exception) {
13+
emptyList()
14+
}
1115

12-
fun getInvalidTopics(client: AdminClient, allTopics: List<String>): List<String> {
16+
fun getInvalidTopics(client: AdminClient, allTopics: List<String>): List<String> = try {
1317
val kafkaTopics = client.listTopics().names().get()
1418
val invalidTopics = allTopics.filter { !kafkaTopics.contains(it) }
15-
return if (invalidTopics.isNotEmpty()) {
16-
if (isAutoCreateTopicsEnabled(client)) {
17-
emptyList()
18-
} else {
19-
invalidTopics
20-
}
19+
if (invalidTopics.isNotEmpty() && isAutoCreateTopicsEnabled(client)) {
20+
emptyList()
2121
} else {
2222
invalidTopics
2323
}
24+
} catch (e: Exception) {
25+
emptyList()
2426
}
2527

26-
fun isAutoCreateTopicsEnabled(kafkaProps: Properties) = isAutoCreateTopicsEnabled(AdminClient.create(kafkaProps))
28+
fun isAutoCreateTopicsEnabled(kafkaProps: Properties):Boolean = try {
29+
isAutoCreateTopicsEnabled(AdminClient.create(kafkaProps))
30+
} catch (e: Exception) {
31+
false
32+
}
2733

28-
fun isAutoCreateTopicsEnabled(client: AdminClient): Boolean {
34+
fun isAutoCreateTopicsEnabled(client: AdminClient): Boolean = try {
2935
val firstNodeId = client.describeCluster().nodes().get().first().id()
30-
val configs = client.describeConfigs(listOf(ConfigResource(ConfigResource.Type.BROKER, firstNodeId.toString()))).all().get()
31-
return configs.values
36+
val configResources = listOf(ConfigResource(ConfigResource.Type.BROKER, firstNodeId.toString()))
37+
val configs = client.describeConfigs(configResources).all().get()
38+
configs.values
3239
.flatMap { it.entries() }
3340
.find { it.name() == "auto.create.topics.enable" }
3441
?.value()
3542
?.toBoolean() ?: false
43+
} catch (e: Exception) {
44+
false
3645
}
3746
}

common/src/main/kotlin/streams/utils/StreamsUtils.kt

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,25 @@
11
package streams.utils
22

3+
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.GlobalScope
35
import kotlinx.coroutines.runBlocking
46
import kotlinx.coroutines.delay
7+
import kotlinx.coroutines.launch
8+
import org.neo4j.dbms.api.DatabaseManagementService
9+
import streams.config.StreamsConfig
10+
import streams.extensions.getSystemDb
511

612
object StreamsUtils {
713

8-
const val UNWIND: String = "UNWIND \$events AS event"
14+
@JvmStatic val UNWIND: String = "UNWIND \$events AS event"
915

10-
const val STREAMS_CONFIG_PREFIX = "streams."
16+
@JvmStatic val STREAMS_CONFIG_PREFIX = "streams."
1117

12-
const val STREAMS_SINK_TOPIC_PREFIX = "sink.topic.cypher."
18+
@JvmStatic val STREAMS_SINK_TOPIC_PREFIX = "sink.topic.cypher."
19+
20+
@JvmStatic val LEADER = "LEADER"
21+
22+
@JvmStatic val SYSTEM_DATABASE_NAME = "system"
1323

1424
fun <T> ignoreExceptions(action: () -> T, vararg toIgnore: Class<out Throwable>): T? {
1525
return try {
@@ -35,4 +45,19 @@ object StreamsUtils {
3545
success
3646
}
3747

48+
fun executeWhenSystemDbIsAvailable(databaseManagementService: DatabaseManagementService,
49+
configuration: StreamsConfig,
50+
actionIfAvailable: () -> Unit,
51+
actionIfNotAvailable: (() -> Unit)?) {
52+
val systemDb = databaseManagementService.getSystemDb()
53+
val systemDbWaitTimeout = configuration.getSystemDbWaitTimeout()
54+
GlobalScope.launch(Dispatchers.IO) {
55+
if (systemDb.isAvailable(systemDbWaitTimeout)) {
56+
actionIfAvailable()
57+
} else if (actionIfNotAvailable != null) {
58+
actionIfNotAvailable()
59+
}
60+
}
61+
}
62+
3863
}

consumer/pom.xml

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,86 @@
5252
<version>${project.version}</version>
5353
<scope>test</scope>
5454
</dependency>
55+
<dependency>
56+
<groupId>org.neo4j</groupId>
57+
<artifactId>log-test-utils</artifactId>
58+
<scope>test</scope>
59+
</dependency>
60+
5561
</dependencies>
5662

63+
<profiles>
64+
<profile>
65+
<id>default</id>
66+
<dependencies>
67+
<dependency>
68+
<groupId>com.neo4j</groupId>
69+
<artifactId>neo4j-causal-clustering</artifactId>
70+
<scope>test</scope>
71+
<exclusions>
72+
<exclusion>
73+
<groupId>com.fasterxml.jackson.core</groupId>
74+
<artifactId>jackson-databind</artifactId>
75+
</exclusion>
76+
</exclusions>
77+
</dependency>
78+
<dependency>
79+
<groupId>com.neo4j</groupId>
80+
<artifactId>neo4j-causal-clustering</artifactId>
81+
<type>test-jar</type>
82+
<scope>test</scope>
83+
<exclusions>
84+
<exclusion>
85+
<groupId>com.fasterxml.jackson.core</groupId>
86+
<artifactId>jackson-databind</artifactId>
87+
</exclusion>
88+
</exclusions>
89+
</dependency>
90+
</dependencies>
91+
<activation>
92+
<property>
93+
<name>!env.TRAVIS</name>
94+
</property>
95+
<activeByDefault>true</activeByDefault>
96+
</activation>
97+
</profile>
98+
<profile>
99+
<id>travis</id>
100+
<activation>
101+
<property>
102+
<name>env.TRAVIS</name>
103+
<value>true</value>
104+
</property>
105+
</activation>
106+
<build>
107+
<plugins>
108+
<plugin>
109+
<groupId>org.jetbrains.kotlin</groupId>
110+
<artifactId>kotlin-maven-plugin</artifactId>
111+
<version>${kotlin.version}</version>
112+
<executions>
113+
<execution>
114+
<id>test-compile</id>
115+
<goals>
116+
<goal>test-compile</goal>
117+
</goals>
118+
<configuration>
119+
<!--
120+
As Kotlin Maven Compiler does not provide testExcludes option
121+
we need to workaround specifying the test source dirs and excluding
122+
by hand the `enterprise` directory
123+
-->
124+
<sourceDirs>
125+
<sourceDir>${project.basedir}/src/test/kotlin/integrations</sourceDir>
126+
<sourceDir>${project.basedir}/src/test/kotlin/streams</sourceDir>
127+
</sourceDirs>
128+
</configuration>
129+
</execution>
130+
</executions>
131+
</plugin>
132+
</plugins>
133+
</build>
134+
</profile>
135+
</profiles>
136+
57137
</project>

0 commit comments

Comments
 (0)