Skip to content

Commit 44fbf09

Browse files
authored
Improved Lifecycle management (#344)
* Improved Lifecycle management * sink lifecycle procedures
1 parent c964879 commit 44fbf09

28 files changed

+1583
-291
lines changed

common/src/main/kotlin/streams/service/sink/strategy/IngestionStrategy.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package streams.service.sink.strategy
22

3-
import streams.events.*
3+
import streams.events.Constraint
4+
import streams.events.RelationshipPayload
45
import streams.service.StreamsSinkEntity
5-
import streams.utils.Neo4jUtils
6-
import streams.utils.StreamsUtils
76

87

98
data class QueryEvents(val query: String, val events: List<Map<String, Any?>>)

common/src/main/kotlin/streams/utils/KafkaValidationUtils.kt

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,37 @@ package streams.utils
22

33
import org.apache.kafka.clients.admin.AdminClient
44
import org.apache.kafka.common.config.ConfigResource
5-
import java.util.*
5+
import java.util.Properties
66

77
object KafkaValidationUtils {
88
fun getInvalidTopicsError(invalidTopics: List<String>) = "The BROKER config `auto.create.topics.enable` is false, the following topics need to be created into the Kafka cluster otherwise the messages will be discarded: $invalidTopics"
99

1010
fun getInvalidTopics(kafkaProps: Properties, allTopics: List<String>) = getInvalidTopics(AdminClient.create(kafkaProps), allTopics)
1111

12-
fun getInvalidTopics(client: AdminClient, allTopics: List<String>): List<String> {
12+
fun getInvalidTopics(client: AdminClient, allTopics: List<String>): List<String> = try {
1313
val kafkaTopics = client.listTopics().names().get()
1414
val invalidTopics = allTopics.filter { !kafkaTopics.contains(it) }
15-
return if (invalidTopics.isNotEmpty()) {
16-
if (isAutoCreateTopicsEnabled(client)) {
17-
emptyList()
18-
} else {
19-
invalidTopics
20-
}
15+
if (invalidTopics.isNotEmpty() && isAutoCreateTopicsEnabled(client)) {
16+
emptyList()
2117
} else {
2218
invalidTopics
2319
}
20+
} catch (e: Exception) {
21+
emptyList()
2422
}
2523

2624
fun isAutoCreateTopicsEnabled(kafkaProps: Properties) = isAutoCreateTopicsEnabled(AdminClient.create(kafkaProps))
2725

28-
fun isAutoCreateTopicsEnabled(client: AdminClient): Boolean {
26+
fun isAutoCreateTopicsEnabled(client: AdminClient): Boolean = try {
2927
val firstNodeId = client.describeCluster().nodes().get().first().id()
30-
val configs = client.describeConfigs(listOf(ConfigResource(ConfigResource.Type.BROKER, firstNodeId.toString()))).all().get()
31-
return configs.values
28+
val configResources = listOf(ConfigResource(ConfigResource.Type.BROKER, firstNodeId.toString()))
29+
val configs = client.describeConfigs(configResources).all().get()
30+
configs.values
3231
.flatMap { it.entries() }
3332
.find { it.name() == "auto.create.topics.enable" }
3433
?.value()
3534
?.toBoolean() ?: false
35+
} catch (e: Exception) {
36+
false
3637
}
3738
}

consumer/pom.xml

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,91 @@
4545
<groupId>io.confluent</groupId>
4646
<artifactId>kafka-avro-serializer</artifactId>
4747
</dependency>
48+
<dependency>
49+
<groupId>org.neo4j</groupId>
50+
<artifactId>neo4j-logging</artifactId>
51+
<type>test-jar</type>
52+
<scope>test</scope>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.neo4j</groupId>
56+
<artifactId>neo4j-random-values</artifactId>
57+
<scope>test</scope>
58+
</dependency>
4859
</dependencies>
4960

61+
<profiles>
62+
<profile>
63+
<id>default</id>
64+
<dependencies>
65+
<dependency>
66+
<groupId>com.neo4j</groupId>
67+
<artifactId>neo4j-causal-clustering</artifactId>
68+
<scope>test</scope>
69+
<exclusions>
70+
<exclusion>
71+
<groupId>com.fasterxml.jackson.core</groupId>
72+
<artifactId>jackson-databind</artifactId>
73+
</exclusion>
74+
</exclusions>
75+
</dependency>
76+
<dependency>
77+
<groupId>com.neo4j</groupId>
78+
<artifactId>neo4j-causal-clustering</artifactId>
79+
<type>test-jar</type>
80+
<scope>test</scope>
81+
<exclusions>
82+
<exclusion>
83+
<groupId>com.fasterxml.jackson.core</groupId>
84+
<artifactId>jackson-databind</artifactId>
85+
</exclusion>
86+
</exclusions>
87+
</dependency>
88+
</dependencies>
89+
<activation>
90+
<property>
91+
<name>!env.TRAVIS</name>
92+
</property>
93+
<activeByDefault>true</activeByDefault>
94+
</activation>
95+
</profile>
96+
<profile>
97+
<id>travis</id>
98+
<activation>
99+
<property>
100+
<name>env.TRAVIS</name>
101+
<value>true</value>
102+
</property>
103+
</activation>
104+
<build>
105+
<plugins>
106+
<plugin>
107+
<groupId>org.jetbrains.kotlin</groupId>
108+
<artifactId>kotlin-maven-plugin</artifactId>
109+
<version>${kotlin.version}</version>
110+
<executions>
111+
<execution>
112+
<id>test-compile</id>
113+
<goals>
114+
<goal>test-compile</goal>
115+
</goals>
116+
<configuration>
117+
<!--
118+
As Kotlin Maven Compiler does not provide testExcludes option
119+
we need to workaround specifying the test source dirs and excluding
120+
by hand the `enterprise` directory
121+
-->
122+
<sourceDirs>
123+
<sourceDir>${project.basedir}/src/test/kotlin/integrations</sourceDir>
124+
<sourceDir>${project.basedir}/src/test/kotlin/streams</sourceDir>
125+
</sourceDirs>
126+
</configuration>
127+
</execution>
128+
</executions>
129+
</plugin>
130+
</plugins>
131+
</build>
132+
</profile>
133+
</profiles>
134+
50135
</project>

consumer/src/main/kotlin/streams/StreamsEventConsumer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import streams.service.StreamsSinkEntity
55
import streams.service.errors.ErrorService
66

77

8-
abstract class StreamsEventConsumer(private val log: Log, private val dlqService: ErrorService) {
8+
abstract class StreamsEventConsumer(private val log: Log) {
99

1010
abstract fun stop()
1111

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package streams
2+
3+
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.GlobalScope
5+
import kotlinx.coroutines.launch
6+
import kotlinx.coroutines.runBlocking
7+
import kotlinx.coroutines.sync.Mutex
8+
import kotlinx.coroutines.sync.withLock
9+
import org.neo4j.kernel.availability.AvailabilityListener
10+
import org.neo4j.kernel.internal.GraphDatabaseAPI
11+
import streams.procedures.StreamsSinkProcedures
12+
import streams.service.TopicUtils
13+
import streams.utils.Neo4jUtils
14+
import streams.utils.StreamsUtils
15+
import java.util.concurrent.ConcurrentHashMap
16+
17+
class StreamsEventSinkAvailabilityListener(dependencies: StreamsEventSinkExtensionFactory.Dependencies): AvailabilityListener {
18+
private val db = dependencies.graphdatabaseAPI()
19+
private val logService = dependencies.log()
20+
private val configuration = dependencies.config()
21+
private val log = logService.getUserLog(StreamsEventSinkAvailabilityListener::class.java)
22+
23+
private var eventSink: StreamsEventSink? = null
24+
private val streamsSinkConfiguration: StreamsSinkConfiguration
25+
private val streamsTopicService: StreamsTopicService
26+
private val streamsQueryExecution: StreamsEventSinkQueryExecution
27+
28+
private val mutex = Mutex()
29+
30+
init {
31+
streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration)
32+
streamsTopicService = StreamsTopicService(db)
33+
streamsTopicService.setAll(streamsSinkConfiguration.topics)
34+
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
35+
streamsSinkConfiguration.sourceIdStrategyConfig)
36+
streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db,
37+
logService.getUserLog(StreamsEventSinkQueryExecution::class.java),
38+
strategyMap)
39+
}
40+
41+
42+
override fun available() {
43+
runBlocking {
44+
mutex.withLock {
45+
setAvailable(db, true)
46+
if (eventSink == null) {
47+
// Create the Sink if not exists
48+
eventSink = StreamsEventSinkFactory
49+
.getStreamsEventSink(configuration,
50+
streamsQueryExecution,
51+
streamsTopicService,
52+
log,
53+
db)
54+
}
55+
}
56+
}
57+
try {
58+
log.info("Initialising the Streams Sink module")
59+
60+
// start the Sink
61+
if (Neo4jUtils.isCluster(db, log)) {
62+
log.info("The Sink module is running in a cluster, checking for the ${Neo4jUtils.LEADER}")
63+
Neo4jUtils.waitForTheLeader(db, log) { initSinkModule() }
64+
} else {
65+
runInASingleInstance()
66+
}
67+
} catch (e: Exception) {
68+
log.error("Error initializing the streaming sink:", e)
69+
}
70+
71+
// Register required services for the Procedures
72+
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
73+
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink!!.getEventConsumerFactory())
74+
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink!!.getEventSinkConfigMapper())
75+
StreamsSinkProcedures.registerStreamsEventSink(eventSink!!)
76+
}
77+
78+
private fun runInASingleInstance() {
79+
// check if is writeable instance
80+
Neo4jUtils.executeInWriteableInstance(db) {
81+
if (streamsSinkConfiguration.clusterOnly) {
82+
log.info("""
83+
|Cannot init the Streams Sink module as is forced to work only in a cluster env,
84+
|please check the value of `streams.${StreamsSinkConfigurationConstants.CLUSTER_ONLY}`
85+
""".trimMargin())
86+
} else {
87+
initSinkModule()
88+
}
89+
}
90+
}
91+
92+
private fun initSinkModule() {
93+
if (streamsSinkConfiguration.checkApocTimeout > -1) {
94+
waitForApoc()
95+
} else {
96+
initSink()
97+
}
98+
}
99+
100+
private fun waitForApoc() {
101+
GlobalScope.launch(Dispatchers.IO) {
102+
val success = StreamsUtils.blockUntilTrueOrTimeout(streamsSinkConfiguration.checkApocTimeout, streamsSinkConfiguration.checkApocInterval) {
103+
val hasApoc = Neo4jUtils.hasApoc(db)
104+
if (!hasApoc && log.isDebugEnabled) {
105+
log.debug("APOC not loaded yet, next check in ${streamsSinkConfiguration.checkApocInterval} ms")
106+
}
107+
hasApoc
108+
}
109+
if (success) {
110+
initSink()
111+
} else {
112+
log.info("Streams Sink plugin not loaded as APOC are not installed")
113+
}
114+
}
115+
}
116+
117+
private fun initSink() {
118+
eventSink?.start()
119+
eventSink?.printInvalidTopics()
120+
}
121+
122+
override fun unavailable() = runBlocking {
123+
mutex.withLock {
124+
setAvailable(db, false)
125+
eventSink?.stop()
126+
}
127+
Unit
128+
}
129+
130+
companion object {
131+
@JvmStatic private val available = ConcurrentHashMap<String, Boolean>()
132+
133+
fun isAvailable(db: GraphDatabaseAPI) = available.getOrDefault(db.databaseLayout().databaseDirectory().absolutePath, false)
134+
135+
fun setAvailable(db: GraphDatabaseAPI, isAvailable: Boolean): Unit = available.set(db.databaseLayout().databaseDirectory().absolutePath, isAvailable)
136+
137+
fun remove(db: GraphDatabaseAPI) = available.remove(db.databaseLayout().databaseDirectory().absolutePath)
138+
}
139+
}

0 commit comments

Comments
 (0)