Skip to content

Commit a947457

Browse files
authored
fixes #358: Dynamic Configuration (#394)
* fixes #358: Dynamic Configuration * added streams.configuration.remove procedure
1 parent 0fc73fc commit a947457

File tree

53 files changed

+1603
-473
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1603
-473
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package streams.configuration
2+
3+
import org.neo4j.kernel.internal.GraphDatabaseAPI
4+
import org.neo4j.logging.Log
5+
import org.neo4j.logging.internal.LogService
6+
import org.neo4j.plugin.configuration.ConfigurationLifecycle
7+
import org.neo4j.plugin.configuration.ConfigurationLifecycleUtils
8+
import org.neo4j.plugin.configuration.EventType
9+
import org.neo4j.plugin.configuration.listners.ConfigurationLifecycleListener
10+
import streams.utils.StreamsUtils
11+
import java.io.File
12+
import java.util.concurrent.ConcurrentHashMap
13+
14+
class StreamsConfig(triggerPeriod: Int = DEFAULT_TRIGGER_PERIOD, log: Log) {
15+
16+
companion object {
17+
@JvmStatic private val cache = ConcurrentHashMap<String, StreamsConfig>()
18+
19+
private const val DEFAULT_TRIGGER_PERIOD: Int = 10000
20+
21+
private const val SUN_JAVA_COMMAND = "sun.java.command"
22+
private const val CONF_DIR_ARG = "config-dir="
23+
private const val DEFAULT_PATH = "."
24+
25+
fun getNeo4jConfFolder(): String { // sun.java.command=com.neo4j.server.enterprise.CommercialEntryPoint --home-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02 --config-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02/conf
26+
val command = System.getProperty(SUN_JAVA_COMMAND, "")
27+
return command.split("--")
28+
.map(String::trim)
29+
.filter { it.startsWith(CONF_DIR_ARG) }
30+
.map { it.substring(CONF_DIR_ARG.length) }
31+
.firstOrNull() ?: DEFAULT_PATH
32+
}
33+
34+
fun getInstance(db: GraphDatabaseAPI): StreamsConfig = cache.computeIfAbsent(StreamsUtils.getName(db)) {
35+
StreamsConfig(log = db.dependencyResolver
36+
.resolveDependency(LogService::class.java)
37+
.getUserLog(StreamsConfig::class.java))
38+
}
39+
40+
fun removeInstance(db: GraphDatabaseAPI): StreamsConfig? = cache.remove(StreamsUtils.getName(db))
41+
}
42+
43+
private val configLifecycle: ConfigurationLifecycle
44+
45+
init {
46+
val neo4jConfFolder = System.getenv().getOrDefault("NEO4J_CONF", getNeo4jConfFolder())
47+
configLifecycle = ConfigurationLifecycle(triggerPeriod,
48+
"$neo4jConfFolder${File.separator}streams.conf",
49+
true, log, true, "streams.", "kafka.")
50+
}
51+
52+
fun setProperty(key: String, value: Any, save: Boolean = true) {
53+
configLifecycle.setProperty(key, value, save)
54+
}
55+
56+
fun setProperties(map: Map<String, Any>, save: Boolean = true) {
57+
configLifecycle.setProperties(map, save)
58+
}
59+
60+
fun removeProperty(key: String, save: Boolean = true) {
61+
configLifecycle.removeProperty(key, save)
62+
}
63+
64+
fun removeProperties(keys: Collection<String>, save: Boolean = true) {
65+
configLifecycle.removeProperties(keys, save)
66+
}
67+
68+
fun reload() {
69+
configLifecycle.reload()
70+
}
71+
72+
fun start() {
73+
configLifecycle.start()
74+
}
75+
76+
fun getConfiguration(): MutableMap<String, Any> = ConfigurationLifecycleUtils.toMap(configLifecycle.configuration)
77+
78+
fun addConfigurationLifecycleListener(evt: EventType,
79+
listener: ConfigurationLifecycleListener) {
80+
configLifecycle.addConfigurationLifecycleListener(evt, listener)
81+
}
82+
83+
fun stop(shutdown: Boolean = false) {
84+
configLifecycle.stop(shutdown)
85+
}
86+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package streams.configuration
2+
3+
import org.neo4j.graphdb.GraphDatabaseService
4+
import org.neo4j.kernel.internal.GraphDatabaseAPI
5+
import org.neo4j.logging.Log
6+
import org.neo4j.procedure.*
7+
import streams.events.KeyValueResult
8+
import java.util.stream.Stream
9+
10+
data class StreamsConfigProceduresConfiguration(val save: Boolean) {
11+
constructor(map: Map<String, Any>?): this(map.orEmpty()
12+
.getOrDefault("save", "true")
13+
.toString()
14+
.toBoolean())
15+
}
16+
17+
class StreamsConfigProcedures {
18+
19+
@JvmField @Context
20+
var log: Log? = null
21+
22+
@JvmField @Context
23+
var db: GraphDatabaseService? = null
24+
25+
@Admin
26+
@Procedure
27+
@Description("""
28+
streams.configuration.set(<properties_map>, <config_map>) YIELD name, value
29+
""")
30+
fun set(@Name(value = "properties") properties: Map<String, Any>?,
31+
@Name(value = "config", defaultValue = "{}") config: Map<String, Any>?): Stream<KeyValueResult> {
32+
if (properties.isNullOrEmpty()) {
33+
throw RuntimeException("Property must be not empty")
34+
}
35+
val map = properties.mapValues { it.value.toString() }
36+
val instance = StreamsConfig.getInstance(db as GraphDatabaseAPI)
37+
val cfg = StreamsConfigProceduresConfiguration(config)
38+
instance.setProperties(map, cfg.save)
39+
return get()
40+
}
41+
42+
@Admin
43+
@Procedure
44+
@Description("""
45+
streams.configuration.remove(<properties_list>, <config_map>) YIELD name, value
46+
""")
47+
fun remove(@Name(value = "keys") properties: List<String>,
48+
@Name(value = "config", defaultValue = "{}") config: Map<String, Any>?): Stream<KeyValueResult> {
49+
if (properties.isNullOrEmpty()) {
50+
throw RuntimeException("Property must be not empty")
51+
}
52+
val instance = StreamsConfig.getInstance(db as GraphDatabaseAPI)
53+
val cfg = StreamsConfigProceduresConfiguration(config)
54+
instance.removeProperties(properties, cfg.save)
55+
return get()
56+
}
57+
58+
@Admin
59+
@Procedure
60+
@Description("""
61+
streams.configuration.get() YIELD name, value
62+
""")
63+
fun get(): Stream<KeyValueResult> = StreamsConfig.getInstance(db as GraphDatabaseAPI)
64+
.getConfiguration()
65+
.entries
66+
.map { KeyValueResult(it.key, it.value) }
67+
.stream()
68+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package streams.events
2+
3+
class StreamResult(@JvmField val event: Map<String, *>)
4+
class KeyValueResult(@JvmField val name: String, @JvmField val value: Any?)

common/src/main/kotlin/streams/utils/KafkaValidationUtils.kt

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
package streams.utils
22

3+
import org.apache.kafka.clients.CommonClientConfigs
34
import org.apache.kafka.clients.admin.AdminClient
5+
import org.apache.kafka.clients.admin.AdminClientConfig
6+
import org.apache.kafka.clients.consumer.ConsumerConfig
7+
import org.apache.kafka.clients.producer.ProducerConfig
48
import org.apache.kafka.common.config.ConfigResource
9+
import org.apache.kafka.common.config.SaslConfigs
10+
import org.apache.kafka.common.config.SslConfigs
11+
import org.apache.kafka.common.config.TopicConfig
12+
import java.lang.reflect.Modifier
513
import java.util.Properties
614

715
object KafkaValidationUtils {
@@ -35,4 +43,19 @@ object KafkaValidationUtils {
3543
} catch (e: Exception) {
3644
false
3745
}
46+
47+
private fun getConfigProperties(clazz: Class<*>) = clazz.declaredFields
48+
.filter { Modifier.isStatic(it.modifiers) && it.name.endsWith("_CONFIG") }
49+
.map { it.get(null).toString() }
50+
.toSet()
51+
52+
private fun getBaseConfigs() = (getConfigProperties(CommonClientConfigs::class.java)
53+
+ AdminClientConfig.configNames()
54+
+ getConfigProperties(SaslConfigs::class.java)
55+
+ getConfigProperties(TopicConfig::class.java)
56+
+ getConfigProperties(SslConfigs::class.java))
57+
58+
fun getProducerProperties() = ProducerConfig.configNames() - getBaseConfigs()
59+
60+
fun getConsumerProperties() = ConsumerConfig.configNames() - getBaseConfigs()
3861
}

common/src/main/kotlin/streams/utils/StreamsUtils.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package streams.utils
22

33
import kotlinx.coroutines.runBlocking
44
import kotlinx.coroutines.delay
5+
import org.neo4j.kernel.internal.GraphDatabaseAPI
56

67
object StreamsUtils {
78

@@ -25,7 +26,7 @@ object StreamsUtils {
2526
}
2627
}
2728

28-
fun blockUntilTrueOrTimeout(timeout: Long, delay: Long = 1000, action: () -> Boolean): Boolean = runBlocking {
29+
fun blockUntilFalseOrTimeout(timeout: Long, delay: Long = 1000, action: () -> Boolean): Boolean = runBlocking {
2930
val start = System.currentTimeMillis()
3031
var success = action()
3132
while (System.currentTimeMillis() - start < timeout && !success) {
@@ -35,4 +36,6 @@ object StreamsUtils {
3536
success
3637
}
3738

39+
fun getName(db: GraphDatabaseAPI) = db.databaseLayout().databaseDirectory().absolutePath
40+
3841
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package streams.configuration
2+
3+
import org.junit.After
4+
import org.junit.Before
5+
import org.junit.Test
6+
import org.neo4j.kernel.impl.proc.Procedures
7+
import org.neo4j.kernel.internal.GraphDatabaseAPI
8+
import org.neo4j.test.TestGraphDatabaseFactory
9+
import kotlin.streams.toList
10+
import kotlin.test.assertEquals
11+
12+
@Suppress("DEPRECATION")
13+
class StreamsConfigProceduresIT {
14+
15+
private lateinit var db: GraphDatabaseAPI
16+
17+
@Before
18+
fun setUp() {
19+
db = TestGraphDatabaseFactory()
20+
.newImpermanentDatabaseBuilder()
21+
.newGraphDatabase() as GraphDatabaseAPI
22+
}
23+
24+
@After
25+
fun tearDown() {
26+
db.shutdown()
27+
}
28+
29+
@Test
30+
fun `should set properties`() {
31+
db.dependencyResolver.resolveDependency(Procedures::class.java)
32+
.registerProcedure(StreamsConfigProcedures::class.java, true)
33+
val props = mapOf("streams.procedures.enabled" to "true")
34+
val actual = db.execute("CALL streams.configuration.set(\$props, {save: false})", mapOf("props" to props))
35+
.stream()
36+
.toList()
37+
assertEquals(1, actual.size)
38+
val expected = mapOf("name" to "streams.procedures.enabled", "value" to "true")
39+
assertEquals(expected, actual[0])
40+
}
41+
42+
@Test
43+
fun `should remove properties`() {
44+
db.dependencyResolver.resolveDependency(Procedures::class.java)
45+
.registerProcedure(StreamsConfigProcedures::class.java, true)
46+
val props = mapOf("streams.procedures.enabled" to "true")
47+
val actual = db.execute("CALL streams.configuration.set(\$props, {save: false})", mapOf("props" to props))
48+
.stream()
49+
.toList()
50+
assertEquals(1, actual.size)
51+
val expected = mapOf("name" to "streams.procedures.enabled", "value" to "true")
52+
assertEquals(expected, actual[0])
53+
val actualRemoved = db.execute("CALL streams.configuration.remove(\$props, {save: false})", mapOf("props" to props.keys))
54+
.stream()
55+
.toList()
56+
assertEquals(0, actualRemoved.size)
57+
}
58+
}

consumer/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
<sourceDirs>
123123
<sourceDir>${project.basedir}/src/test/kotlin/integrations</sourceDir>
124124
<sourceDir>${project.basedir}/src/test/kotlin/streams</sourceDir>
125+
<sourceDir>${project.basedir}/src/test/kotlin/extension</sourceDir>
125126
</sourceDirs>
126127
</configuration>
127128
</execution>

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,25 @@
11
package streams
22

3-
import org.neo4j.kernel.configuration.Config
43
import org.neo4j.kernel.internal.GraphDatabaseAPI
54
import org.neo4j.logging.Log
65
import streams.events.StreamsPluginStatus
76

8-
abstract class StreamsEventSink(private val config: Config,
9-
private val queryExecution: StreamsEventSinkQueryExecution,
10-
private val streamsTopicService: StreamsTopicService,
11-
private val log: Log,
12-
private val db: GraphDatabaseAPI) {
7+
abstract class StreamsEventSink(private val config: Map<String, String>,
8+
queryExecution: StreamsEventSinkQueryExecution,
9+
streamsTopicService: StreamsTopicService,
10+
log: Log,
11+
db: GraphDatabaseAPI) {
1312

1413
abstract val mappingKeys: Map<String, String>
15-
abstract val streamsConfigMap: Map<String, String>
14+
abstract val streamsSinkConfiguration: StreamsSinkConfiguration
1615

1716
abstract fun stop()
1817

1918
abstract fun start()
2019

2120
abstract fun getEventConsumerFactory(): StreamsEventConsumerFactory
2221

23-
open fun getEventSinkConfigMapper(): StreamsEventSinkConfigMapper = StreamsEventSinkConfigMapper(streamsConfigMap, mappingKeys)
22+
open fun getEventSinkConfigMapper(): StreamsEventSinkConfigMapper = StreamsEventSinkConfigMapper(config, mappingKeys)
2423

2524
open fun printInvalidTopics() {}
2625

@@ -29,10 +28,13 @@ abstract class StreamsEventSink(private val config: Config,
2928
}
3029

3130
object StreamsEventSinkFactory {
32-
fun getStreamsEventSink(config: Config, streamsQueryExecution: StreamsEventSinkQueryExecution,
33-
streamsTopicService: StreamsTopicService, log: Log, db: GraphDatabaseAPI): StreamsEventSink {
34-
return Class.forName(config.raw.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
35-
.getConstructor(Config::class.java,
31+
fun getStreamsEventSink(config: Map<String, String>,
32+
streamsQueryExecution: StreamsEventSinkQueryExecution,
33+
streamsTopicService: StreamsTopicService,
34+
log: Log,
35+
db: GraphDatabaseAPI): StreamsEventSink {
36+
return Class.forName(config.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
37+
.getConstructor(Map::class.java,
3638
StreamsEventSinkQueryExecution::class.java,
3739
StreamsTopicService::class.java,
3840
Log::class.java,

0 commit comments

Comments
 (0)