11package streams.config
22
3+ import kotlinx.coroutines.runBlocking
4+ import kotlinx.coroutines.sync.Mutex
5+ import kotlinx.coroutines.sync.withLock
36import org.neo4j.dbms.api.DatabaseManagementService
4- import org.neo4j.kernel.lifecycle.LifecycleAdapter
7+ import org.neo4j.kernel.internal.GraphDatabaseAPI
58import org.neo4j.logging.Log
9+ import org.neo4j.logging.internal.LogService
10+ import org.neo4j.plugin.configuration.ConfigurationLifecycle
11+ import org.neo4j.plugin.configuration.ConfigurationLifecycleUtils
12+ import org.neo4j.plugin.configuration.EventType
13+ import org.neo4j.plugin.configuration.listners.ConfigurationLifecycleListener
14+ import streams.extensions.databaseManagementService
615import streams.extensions.getDefaultDbName
7- import java.io.FileInputStream
8- import java.io.FileNotFoundException
9- import java.util.Properties
16+ import streams.extensions.isAvailable
17+ import streams.utils.Neo4jUtils
18+ import streams.utils.StreamsUtils
19+ import java.io.File
1020import java.util.concurrent.ConcurrentHashMap
11- import java.util.concurrent.TimeUnit
21+ import java.util.concurrent.atomic.AtomicReference
1222
13- data class StreamsConfig (private val log : Log , private val dbms : DatabaseManagementService ) : LifecycleAdapter() {
14-
15- val config = ConcurrentHashMap <String , String >()
16-
17- private lateinit var neo4jConfFolder: String
23+ class StreamsConfig (private val log : Log , private val dbms : DatabaseManagementService ) {
1824
1925 companion object {
20- private lateinit var INSTANCE : StreamsConfig
21- private val SUPPORTED_PREFIXES = listOf (" streams" , " kafka" )
2226 private const val SUN_JAVA_COMMAND = " sun.java.command"
2327 private const val CONF_DIR_ARG = " config-dir="
2428 const val SOURCE_ENABLED = " streams.source.enabled"
@@ -27,123 +31,193 @@ data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagem
2731 const val PROCEDURES_ENABLED_VALUE = true
2832 const val SINK_ENABLED = " streams.sink.enabled"
2933 const val SINK_ENABLED_VALUE = false
30- const val DEFAULT_PATH = " ."
3134 const val CHECK_APOC_TIMEOUT = " streams.check.apoc.timeout"
3235 const val CHECK_APOC_INTERVAL = " streams.check.apoc.interval"
3336 const val CLUSTER_ONLY = " streams.cluster.only"
3437 const val CHECK_WRITEABLE_INSTANCE_INTERVAL = " streams.check.writeable.instance.interval"
3538 const val SYSTEM_DB_WAIT_TIMEOUT = " streams.systemdb.wait.timeout"
3639 const val SYSTEM_DB_WAIT_TIMEOUT_VALUE = 10000L
3740 const val POLL_INTERVAL = " streams.sink.poll.interval"
38- private var afterInitListeners = mutableListOf< ((MutableMap <String , String >) -> Unit )> ()
41+ const val INSTANCE_WAIT_TIMEOUT = " streams.wait.timeout"
42+ const val INSTANCE_WAIT_TIMEOUT_VALUE = 120000L
43+
44+ private const val DEFAULT_TRIGGER_PERIOD : Int = 10000
45+
46+ private const val DEFAULT_PATH = " ."
3947
40- fun registerListener (after : (MutableMap <String , String >) -> Unit ) {
41- afterInitListeners.add(after)
48+ @JvmStatic private val cache = ConcurrentHashMap <String , StreamsConfig >()
49+
50+ private fun getNeo4jConfFolder (): String { // sun.java.command=com.neo4j.server.enterprise.CommercialEntryPoint --home-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02 --config-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02/conf
51+ val command = System .getProperty(SUN_JAVA_COMMAND , " " )
52+ return command.split(" --" )
53+ .map(String ::trim)
54+ .filter { it.startsWith(CONF_DIR_ARG ) }
55+ .map { it.substring(CONF_DIR_ARG .length) }
56+ .firstOrNull() ? : DEFAULT_PATH
4257 }
4358
44- fun getInstance () = INSTANCE
45- }
59+ fun getInstance (db : GraphDatabaseAPI ): StreamsConfig = cache.computeIfAbsent(StreamsUtils .getName(db)) {
60+ StreamsConfig (log = db.dependencyResolver
61+ .resolveDependency(LogService ::class .java)
62+ .getUserLog(StreamsConfig ::class .java), db.databaseManagementService())
63+ }
4664
47- override fun init ( ) {
48- if (log.isDebugEnabled) {
49- log.debug( " Init StreamsConfig... " )
65+ fun removeInstance ( db : GraphDatabaseAPI ) {
66+ val instance = cache.remove( StreamsUtils .getName(db))
67+ instance?.stop( true )
5068 }
51- neo4jConfFolder = getNeo4jConfFolder()
52- loadConfiguration()
53- afterInitListeners.forEach { it(config) }
54- INSTANCE = this
55- }
5669
57- override fun stop () {
58- afterInitListeners.clear()
70+ fun isSourceGloballyEnabled (config : Map <String , Any ?>) = config.getOrDefault(SOURCE_ENABLED , SOURCE_ENABLED_VALUE ).toString().toBoolean()
71+
72+ fun isSourceEnabled (config : Map <String , Any ?>, dbName : String ) = config.getOrDefault(" ${SOURCE_ENABLED } .from.$dbName " , isSourceGloballyEnabled(config)).toString().toBoolean()
73+
74+ fun hasProceduresGloballyEnabled (config : Map <String , Any ?>) = config.getOrDefault(PROCEDURES_ENABLED , PROCEDURES_ENABLED_VALUE ).toString().toBoolean()
75+
76+ fun hasProceduresEnabled (config : Map <String , Any ?>, dbName : String ) = config.getOrDefault(" ${PROCEDURES_ENABLED } .$dbName " , hasProceduresGloballyEnabled(config)).toString().toBoolean()
77+
78+ fun isSinkGloballyEnabled (config : Map <String , Any ?>) = config.getOrDefault(SINK_ENABLED , SINK_ENABLED_VALUE ).toString().toBoolean()
79+
80+ fun isSinkEnabled (config : Map <String , Any ?>, dbName : String ) = config.getOrDefault(" ${SINK_ENABLED } .to.$dbName " , isSinkGloballyEnabled(config)).toString().toBoolean()
81+
82+ fun getSystemDbWaitTimeout (config : Map <String , Any ?>) = config.getOrDefault(SYSTEM_DB_WAIT_TIMEOUT , SYSTEM_DB_WAIT_TIMEOUT_VALUE ).toString().toLong()
83+
84+ fun getInstanceWaitTimeout (config : Map <String , Any ?>) = config.getOrDefault(INSTANCE_WAIT_TIMEOUT , INSTANCE_WAIT_TIMEOUT_VALUE ).toString().toLong()
5985 }
6086
61- private fun loadConfiguration () {
62- val properties = neo4jConfAsProperties()
87+ private val configLifecycle: ConfigurationLifecycle
6388
64- val filteredValues = filterProperties(properties)
65- { key -> ! SUPPORTED_PREFIXES .find { key.toString().startsWith(it) }.isNullOrBlank() }
89+ private enum class Status {RUNNING , STOPPED , CLOSED , UNKNOWN }
6690
67- if (log.isDebugEnabled) {
68- log.debug( " Neo4j Streams Global configuration from neo4j.conf file: $filteredValues " )
69- }
91+ private val status = AtomicReference ( Status . UNKNOWN )
92+
93+ private val mutex = Mutex ()
7094
71- config.putAll(filteredValues)
95+ init {
96+ val neo4jConfFolder = System .getenv().getOrDefault(" NEO4J_CONF" , getNeo4jConfFolder())
97+ configLifecycle = ConfigurationLifecycle (DEFAULT_TRIGGER_PERIOD ,
98+ " $neo4jConfFolder${File .separator} streams.conf" ,
99+ true , log, true , " streams." , " kafka." )
72100 }
73101
74- private fun filterProperties (properties : Properties , filter : (Any ) -> Boolean ) = properties
75- .filterKeys(filter)
76- .mapNotNull {
77- if (it.value == null ) {
78- null
102+ fun start () = runBlocking {
103+ if (log.isDebugEnabled) {
104+ log.debug(" Starting StreamsConfig" )
105+ }
106+ mutex.withLock {
107+ if (status.get() == Status .RUNNING ) return @runBlocking
108+ try {
109+ // wait for all database to be ready
110+ val isInstanceReady = StreamsUtils .blockUntilFalseOrTimeout(getInstanceWaitTimeout()) {
111+ if (log.isDebugEnabled) {
112+ log.debug(" Waiting for the Neo4j instance to be ready..." )
113+ }
114+ dbms.isAvailable(100 )
115+ }
116+ if (! isInstanceReady) {
117+ log.warn(" ${getInstanceWaitTimeout()} ms have passed and the instance is not online, the Streams plugin will not started" )
118+ return @runBlocking
119+ }
120+ if (Neo4jUtils .isCluster(dbms, log)) {
121+ log.info(" We're in cluster instance waiting for the ${StreamsUtils .LEADER } s to be elected in each database" )
122+ // in case is a cluster we wait for the correct cluster formation => LEADER elected
123+ Neo4jUtils .waitForTheLeaders(dbms, log) { configStart() }
79124 } else {
80- it.key.toString() to it.value.toString ()
125+ configStart ()
81126 }
127+ } catch (e: Exception ) {
128+ log.warn(" Cannot start StreamsConfig because of the following exception:" , e)
82129 }
83- .toMap()
84-
85- fun loadStreamsConfiguration () {
86- val properties = neo4jConfAsProperties()
130+ }
131+ }
87132
88- val filteredValues = filterProperties(properties)
89- { key -> key.toString().startsWith(" streams." ) }
133+ private fun configStart () = try {
134+ configLifecycle.start()
135+ status.set(Status .RUNNING )
136+ log.info(" StreamsConfig started" )
137+ } catch (e: Exception ) {
138+ log.error(" Cannot start the StreamsConfig because of the following exception" , e)
139+ }
90140
141+ fun stop (shutdown : Boolean = false) = runBlocking {
91142 if (log.isDebugEnabled) {
92- log.debug(" Neo4j Streams configuration reloaded from neo4j.conf file: $filteredValues " )
143+ log.debug(" Stopping StreamsConfig " )
93144 }
145+ mutex.withLock {
146+ val status = getStopStatus(shutdown)
147+ if (this @StreamsConfig.status.get() == status) return @runBlocking
148+ configStop(shutdown, status)
149+ }
150+ }
94151
95- config.putAll(filteredValues)
152+ private fun configStop (shutdown : Boolean , status : Status ) = try {
153+ configLifecycle.stop(shutdown)
154+ this .status.set(status)
155+ log.info(" StreamsConfig stopped" )
156+ } catch (e: Exception ) {
157+ log.error(" Cannot stop the StreamsConfig because of the following exception" , e)
96158 }
97159
98- private fun neo4jConfAsProperties (): Properties {
99- val neo4jConfFolder = System .getenv().getOrDefault(" NEO4J_CONF" , neo4jConfFolder)
160+ private fun getStopStatus (shutdown : Boolean ) = when (shutdown) {
161+ true -> Status .CLOSED
162+ else -> Status .STOPPED
163+ }
100164
101- val properties = Properties ()
102- try {
103- if (log.isDebugEnabled) {
104- log.debug(" The retrieved NEO4J_CONF dir is $neo4jConfFolder " )
105- }
106- properties.load(FileInputStream (" $neo4jConfFolder /neo4j.conf" ))
107- } catch (e: FileNotFoundException ) {
108- log.error(" The neo4j.conf file is not under the directory defined into the directory $neo4jConfFolder , please set the NEO4J_CONF env correctly" )
109- }
110- return properties
165+ fun setProperty (key : String , value : Any , save : Boolean = true) {
166+ configLifecycle.setProperty(key, value, save)
111167 }
112168
113- // Taken from ApocConfig.java
114- private fun getNeo4jConfFolder (): String { // sun.java.command=com.neo4j.server.enterprise.CommercialEntryPoint --home-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02 --config-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02/conf
115- val command = System .getProperty(SUN_JAVA_COMMAND , " " )
116- val neo4jConfFolder = command.split(" --" )
117- .map(String ::trim)
118- .filter { it.startsWith(CONF_DIR_ARG ) }
119- .map { it.substring(CONF_DIR_ARG .length) }
120- .firstOrNull() ? : DEFAULT_PATH
169+ fun setProperties (map : Map <String , Any >, save : Boolean = true) {
170+ configLifecycle.setProperties(map, save)
171+ }
121172
122- if (neo4jConfFolder == DEFAULT_PATH ) {
123- log.info(" Cannot determine conf folder from sys property $command , assuming $neo4jConfFolder " )
124- } else {
125- log.info(" From system properties: NEO4J_CONF=%s" , neo4jConfFolder)
173+ fun removeProperty (key : String , save : Boolean = true) {
174+ configLifecycle.removeProperty(key, save)
175+ }
176+
177+ fun removeProperties (keys : Collection <String >, save : Boolean = true) {
178+ configLifecycle.removeProperties(keys, save)
179+ }
180+
181+ fun reload () {
182+ configLifecycle.reload()
183+ }
184+
185+ fun addConfigurationLifecycleListener (evt : EventType ,
186+ listener : ConfigurationLifecycleListener ) {
187+ if (log.isDebugEnabled) {
188+ log.debug(" Adding listener for event: $evt " )
126189 }
127- return neo4jConfFolder
190+ configLifecycle.addConfigurationLifecycleListener(evt, listener)
128191 }
129192
193+ fun removeConfigurationLifecycleListener (evt : EventType ,
194+ listener : ConfigurationLifecycleListener ) {
195+ if (log.isDebugEnabled) {
196+ log.debug(" Removing listener for event: $evt " )
197+ }
198+ configLifecycle.removeConfigurationLifecycleListener(evt, listener)
199+ }
200+
201+ fun getConfiguration (): Map <String , Any > = ConfigurationLifecycleUtils .toMap(configLifecycle.configuration)
202+
130203 fun defaultDbName () = this .dbms.getDefaultDbName()
131204
132205 fun isDefaultDb (dbName : String ) = this .defaultDbName() == dbName
133206
134- fun isSourceGloballyEnabled () = this .config.getOrDefault(SOURCE_ENABLED , SOURCE_ENABLED_VALUE ).toString().toBoolean()
207+ fun isSourceGloballyEnabled () = Companion .isSourceGloballyEnabled(getConfiguration())
208+
209+ fun isSourceEnabled (dbName : String ) = Companion .isSourceEnabled(getConfiguration(), dbName)
135210
136- fun isSourceEnabled ( dbName : String ) = this .config.getOrDefault( " ${ SOURCE_ENABLED } .from. $dbName " , isSourceGloballyEnabled()).toString().toBoolean( )
211+ fun hasProceduresGloballyEnabled ( ) = Companion .hasProceduresGloballyEnabled(getConfiguration() )
137212
138- fun hasProceduresGloballyEnabled ( ) = this .config.getOrDefault( PROCEDURES_ENABLED , PROCEDURES_ENABLED_VALUE ).toString().toBoolean( )
213+ fun hasProceduresEnabled ( dbName : String ) = Companion .hasProceduresEnabled(getConfiguration(), dbName )
139214
140- fun hasProceduresEnabled ( dbName : String ) = this .config.getOrDefault( " ${ PROCEDURES_ENABLED } . $dbName " , hasProceduresGloballyEnabled()).toString().toBoolean( )
215+ fun isSinkGloballyEnabled ( ) = Companion .isSinkGloballyEnabled(getConfiguration() )
141216
142- fun isSinkGloballyEnabled ( ) = this .config.getOrDefault( SINK_ENABLED , SINK_ENABLED_VALUE ).toString().toBoolean( )
217+ fun isSinkEnabled ( dbName : String ) = Companion .isSinkEnabled(getConfiguration(), dbName )
143218
144- fun isSinkEnabled ( dbName : String ) = this .config.getOrDefault( " ${ SINK_ENABLED } .to. $dbName " , isSinkGloballyEnabled()).toString().toBoolean( )
219+ fun getSystemDbWaitTimeout ( ) = Companion .getSystemDbWaitTimeout(getConfiguration() )
145220
146- fun getSystemDbWaitTimeout () = this .config.getOrDefault(SYSTEM_DB_WAIT_TIMEOUT , SYSTEM_DB_WAIT_TIMEOUT_VALUE )
147- .toString().toLong()
221+ fun getInstanceWaitTimeout () = Companion .getInstanceWaitTimeout(getConfiguration())
148222
149223}
0 commit comments