Skip to content

Commit e17d6d6

Browse files
authored
fixes #267: [Multidatabase] Source: Implement “from” syntax / config (#274)
1 parent b935ba7 commit e17d6d6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1789
-466
lines changed
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2007-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import java.net.*;
17+
import java.io.*;
18+
import java.nio.channels.*;
19+
import java.util.Properties;
20+
21+
public class MavenWrapperDownloader {
22+
23+
private static final String WRAPPER_VERSION = "0.5.6";
24+
/**
25+
* Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided.
26+
*/
27+
private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/"
28+
+ WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar";
29+
30+
/**
31+
* Path to the maven-wrapper.properties file, which might contain a downloadUrl property to
32+
* use instead of the default one.
33+
*/
34+
private static final String MAVEN_WRAPPER_PROPERTIES_PATH =
35+
".mvn/wrapper/maven-wrapper.properties";
36+
37+
/**
38+
* Path where the maven-wrapper.jar will be saved to.
39+
*/
40+
private static final String MAVEN_WRAPPER_JAR_PATH =
41+
".mvn/wrapper/maven-wrapper.jar";
42+
43+
/**
44+
* Name of the property which should be used to override the default download url for the wrapper.
45+
*/
46+
private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl";
47+
48+
public static void main(String args[]) {
49+
System.out.println("- Downloader started");
50+
File baseDirectory = new File(args[0]);
51+
System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath());
52+
53+
// If the maven-wrapper.properties exists, read it and check if it contains a custom
54+
// wrapperUrl parameter.
55+
File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH);
56+
String url = DEFAULT_DOWNLOAD_URL;
57+
if(mavenWrapperPropertyFile.exists()) {
58+
FileInputStream mavenWrapperPropertyFileInputStream = null;
59+
try {
60+
mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile);
61+
Properties mavenWrapperProperties = new Properties();
62+
mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream);
63+
url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url);
64+
} catch (IOException e) {
65+
System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'");
66+
} finally {
67+
try {
68+
if(mavenWrapperPropertyFileInputStream != null) {
69+
mavenWrapperPropertyFileInputStream.close();
70+
}
71+
} catch (IOException e) {
72+
// Ignore ...
73+
}
74+
}
75+
}
76+
System.out.println("- Downloading from: " + url);
77+
78+
File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH);
79+
if(!outputFile.getParentFile().exists()) {
80+
if(!outputFile.getParentFile().mkdirs()) {
81+
System.out.println(
82+
"- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'");
83+
}
84+
}
85+
System.out.println("- Downloading to: " + outputFile.getAbsolutePath());
86+
try {
87+
downloadFileFromURL(url, outputFile);
88+
System.out.println("Done");
89+
System.exit(0);
90+
} catch (Throwable e) {
91+
System.out.println("- Error downloading");
92+
e.printStackTrace();
93+
System.exit(1);
94+
}
95+
}
96+
97+
private static void downloadFileFromURL(String urlString, File destination) throws Exception {
98+
if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) {
99+
String username = System.getenv("MVNW_USERNAME");
100+
char[] password = System.getenv("MVNW_PASSWORD").toCharArray();
101+
Authenticator.setDefault(new Authenticator() {
102+
@Override
103+
protected PasswordAuthentication getPasswordAuthentication() {
104+
return new PasswordAuthentication(username, password);
105+
}
106+
});
107+
}
108+
URL website = new URL(urlString);
109+
ReadableByteChannel rbc;
110+
rbc = Channels.newChannel(website.openStream());
111+
FileOutputStream fos = new FileOutputStream(destination);
112+
fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
113+
fos.close();
114+
rbc.close();
115+
}
116+
117+
}

.mvn/wrapper/maven-wrapper.jar

49.5 KB
Binary file not shown.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip
2+
wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar

common/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
<dependency>
2222
<groupId>org.neo4j.driver</groupId>
2323
<artifactId>neo4j-java-driver</artifactId>
24-
<version>${neo4j.java.driver.version}</version>
2524
<scope>provided</scope>
2625
</dependency>
2726

Lines changed: 76 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,31 @@
11
package streams.config
22

3+
import org.neo4j.dbms.api.DatabaseManagementService
34
import org.neo4j.kernel.lifecycle.LifecycleAdapter
45
import org.neo4j.logging.internal.LogService
6+
import streams.extensions.getDefaultDbName
57
import java.io.FileInputStream
68
import java.io.FileNotFoundException
79
import java.util.Properties
810
import java.util.concurrent.ConcurrentHashMap
9-
import java.util.regex.Matcher
10-
import java.util.regex.Pattern
1111

12-
class StreamsConfig(logService: LogService) : LifecycleAdapter() {
12+
class StreamsConfig(logService: LogService, private val dbms: DatabaseManagementService) : LifecycleAdapter() {
1313

1414
val config = ConcurrentHashMap<String, String>()
1515

16-
private val log = logService.getInternalLog(StreamsConfig::class.java)
16+
private val log = logService.getUserLog(StreamsConfig::class.java)
1717

18-
private val SUN_JAVA_COMMAND = "sun.java.command"
19-
private val CONF_DIR_PATTERN = Pattern.compile("--config-dir=(\\S+)")
18+
private lateinit var neo4jConfFolder: String
2019

2120
companion object {
21+
private val SUPPORTED_PREFIXES = listOf("streams", "kafka")
22+
private const val SUN_JAVA_COMMAND = "sun.java.command"
23+
private const val CONF_DIR_ARG = "config-dir="
24+
const val SOURCE_ENABLED = "streams.source.enabled"
25+
const val SOURCE_ENABLED_VALUE = true
26+
const val PROCEDURES_ENABLED = "streams.procedures.enabled"
27+
const val PROCEDURES_ENABLED_VALUE = true
28+
const val DEFAULT_PATH = "."
2229
private var afterInitListeners = mutableListOf<((MutableMap<String, String>) -> Unit)>()
2330

2431
fun registerListener(after: (MutableMap<String, String>) -> Unit) {
@@ -30,6 +37,7 @@ class StreamsConfig(logService: LogService) : LifecycleAdapter() {
3037
if (log.isDebugEnabled) {
3138
log.debug("Init StreamsConfig...")
3239
}
40+
neo4jConfFolder = getNeo4jConfFolder()
3341
loadConfiguration()
3442
afterInitListeners.forEach { it(config) }
3543
}
@@ -39,7 +47,44 @@ class StreamsConfig(logService: LogService) : LifecycleAdapter() {
3947
}
4048

4149
private fun loadConfiguration() {
42-
val neo4jConfFolder = System.getenv().getOrDefault("NEO4J_CONF", determineNeo4jConfFolder())
50+
val properties = neo4jConfAsProperties()
51+
52+
val filteredValues = filterProperties(properties,
53+
{ key -> !SUPPORTED_PREFIXES.find { key.toString().startsWith(it) }.isNullOrBlank() })
54+
55+
if (log.isDebugEnabled) {
56+
log.debug("Neo4j Streams Global configuration from neo4j.conf file: $filteredValues")
57+
}
58+
59+
config.putAll(filteredValues)
60+
}
61+
62+
private fun filterProperties(properties: Properties, filter: (Any) -> Boolean) = properties
63+
.filterKeys(filter)
64+
.mapNotNull {
65+
if (it.value == null) {
66+
null
67+
} else {
68+
it.key.toString() to it.value.toString()
69+
}
70+
}
71+
.toMap()
72+
73+
fun loadStreamsConfiguration() {
74+
val properties = neo4jConfAsProperties()
75+
76+
val filteredValues = filterProperties(properties,
77+
{ key -> key.toString().startsWith("streams.") })
78+
79+
if (log.isDebugEnabled) {
80+
log.debug("Neo4j Streams configuration reloaded from neo4j.conf file: $filteredValues")
81+
}
82+
83+
config.putAll(filteredValues)
84+
}
85+
86+
private fun neo4jConfAsProperties(): Properties {
87+
val neo4jConfFolder = System.getenv().getOrDefault("NEO4J_CONF", neo4jConfFolder)
4388

4489
val properties = Properties()
4590
try {
@@ -48,34 +93,35 @@ class StreamsConfig(logService: LogService) : LifecycleAdapter() {
4893
} catch (e: FileNotFoundException) {
4994
log.error("the neo4j.conf file is not under the directory defined into the directory $neo4jConfFolder, please set the NEO4J_CONF env correctly")
5095
}
51-
52-
val filteredValues = properties
53-
.filterKeys { it.toString().startsWith("streams") || it.toString().startsWith("kafka") }
54-
.mapNotNull {
55-
if (it.value == null) {
56-
null
57-
} else {
58-
it.key.toString() to it.value.toString()
59-
}
60-
}
61-
.toMap()
62-
log.debug("Neo4j Streams configuration from neo4j.conf file: $filteredValues")
63-
64-
config.putAll(filteredValues)
96+
return properties
6597
}
6698

6799
// Taken from ApocConfig.java
68-
private fun determineNeo4jConfFolder(): String? { // sun.java.command=com.neo4j.server.enterprise.CommercialEntryPoint --home-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02 --config-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02/conf
69-
val command = System.getProperty(SUN_JAVA_COMMAND)
70-
val matcher: Matcher = CONF_DIR_PATTERN.matcher(command)
71-
return if (matcher.find()) {
72-
val neo4jConfFolder = matcher.group(1)
73-
log.info("from system properties: NEO4J_CONF=%s", neo4jConfFolder)
74-
neo4jConfFolder
100+
private fun getNeo4jConfFolder(): String { // sun.java.command=com.neo4j.server.enterprise.CommercialEntryPoint --home-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02 --config-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02/conf
101+
val command = System.getProperty(SUN_JAVA_COMMAND, "")
102+
val neo4jConfFolder = command.split("--")
103+
.map(String::trim)
104+
.filter { it.startsWith(CONF_DIR_ARG) }
105+
.map { it.substring(CONF_DIR_ARG.length) }
106+
.firstOrNull() ?: DEFAULT_PATH
107+
108+
if (neo4jConfFolder == DEFAULT_PATH) {
109+
log.info("Cannot determine conf folder from sys property $command, assuming $neo4jConfFolder")
75110
} else {
76-
log.info("cannot determine conf folder from sys property %s, assuming '.' ", command)
77-
"."
111+
log.info("From system properties: NEO4J_CONF=%s", neo4jConfFolder)
78112
}
113+
return neo4jConfFolder
79114
}
80115

116+
fun defaultDbName() = this.dbms.getDefaultDbName()
117+
118+
fun isDefaultDb(dbName: String) = this.defaultDbName() == dbName
119+
120+
fun isSourceGloballyEnabled() = this.config.getOrDefault(SOURCE_ENABLED, SOURCE_ENABLED_VALUE).toString().toBoolean()
121+
122+
fun isSourceEnabled(dbName: String) = this.config.getOrDefault("${SOURCE_ENABLED}.from.$dbName", isSourceGloballyEnabled()).toString().toBoolean()
123+
124+
fun hasProceduresGloballyEnabled() = this.config.getOrDefault(PROCEDURES_ENABLED, PROCEDURES_ENABLED_VALUE).toString().toBoolean()
125+
126+
fun hasProceduresEnabled(dbName: String) = this.config.getOrDefault("${PROCEDURES_ENABLED}.$dbName", hasProceduresGloballyEnabled()).toString().toBoolean()
81127
}

common/src/main/kotlin/streams/config/StreamsConfigExtensionFactory.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package streams.config
22

3-
import org.neo4j.configuration.Config
43
import org.neo4j.dbms.api.DatabaseManagementService
4+
import org.neo4j.graphdb.GraphDatabaseService
55
import org.neo4j.kernel.extension.ExtensionFactory
66
import org.neo4j.kernel.extension.ExtensionType
77
import org.neo4j.kernel.extension.context.ExtensionContext
8+
import org.neo4j.kernel.internal.GraphDatabaseAPI
89
import org.neo4j.kernel.lifecycle.Lifecycle
910
import org.neo4j.logging.internal.LogService
1011

@@ -15,6 +16,6 @@ class StreamsConfigExtensionFactory: ExtensionFactory<StreamsConfigExtensionFact
1516
}
1617

1718
override fun newInstance(context: ExtensionContext, dependencies: Dependencies): Lifecycle {
18-
return StreamsConfig(dependencies.log())
19+
return StreamsConfig(dependencies.log(), dependencies.dbms())
1920
}
2021
}
Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,21 @@
11
package streams.extensions
22

33
import org.neo4j.dbms.api.DatabaseManagementService
4+
import org.neo4j.dbms.api.DatabaseNotFoundException
45
import org.neo4j.kernel.internal.GraphDatabaseAPI
56
import streams.utils.Neo4jUtils
67

7-
fun DatabaseManagementService.getSystemDb() = this.database(Neo4jUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI
8+
fun DatabaseManagementService.getSystemDb() = this.database(Neo4jUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI
9+
10+
fun DatabaseManagementService.getDefaultDbName() = getSystemDb().let {
11+
it.beginTx().use {
12+
val col = it.execute("SHOW DEFAULT DATABASE").columnAs<String>("name")
13+
if (col.hasNext()) {
14+
col.next()
15+
} else {
16+
null
17+
}
18+
}
19+
}
20+
21+
fun DatabaseManagementService.getDefaultDb() = getDefaultDbName()?.let { this.database(it) as GraphDatabaseAPI }

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ object Neo4jUtils {
2727
return false
2828
}
2929

30-
val role = db.execute("CALL dbms.cluster.role()") { it.columnAs<String>("role").next() }
30+
val role = getMemberRole(db)
3131
return role.equals(LEADER, ignoreCase = true)
3232
} catch (e: QueryExecutionException) {
3333
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
@@ -44,7 +44,7 @@ object Neo4jUtils {
4444

4545
fun isCluster(db: GraphDatabaseAPI): Boolean {
4646
try {
47-
db.execute("CALL dbms.cluster.role()") { it.columnAs<String>("role").next() }
47+
getMemberRole(db)
4848
return true
4949
} catch (e: QueryExecutionException) {
5050
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
@@ -54,12 +54,15 @@ object Neo4jUtils {
5454
}
5555
}
5656

57+
private fun getMemberRole(db: GraphDatabaseAPI) = db.execute("CALL dbms.cluster.role(\$database)",
58+
mapOf("database" to db.databaseName())) { it.columnAs<String>("role").next() }
59+
5760
fun clusterHasLeader(db: GraphDatabaseAPI): Boolean {
5861
try {
5962
return db.execute("""
60-
CALL dbms.cluster.overview() YIELD role
61-
RETURN role
62-
""".trimIndent()) {
63+
|CALL dbms.cluster.overview() YIELD databases
64+
|RETURN databases[${'$'}database] AS role
65+
""".trimMargin(), mapOf("database" to db.databaseName())) {
6366
it.columnAs<String>("role")
6467
.stream()
6568
.toList()

consumer/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
<dependency>
3636
<groupId>org.neo4j</groupId>
3737
<artifactId>neo4j-streams-common</artifactId>
38-
<version>4.0.0</version>
38+
<version>${project.version}</version>
3939
<scope>provided</scope>
4040
</dependency>
4141
<dependency>
@@ -46,6 +46,12 @@
4646
<groupId>io.confluent</groupId>
4747
<artifactId>kafka-avro-serializer</artifactId>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.neo4j</groupId>
51+
<artifactId>neo4j-streams-test-support</artifactId>
52+
<version>${project.version}</version>
53+
<scope>test</scope>
54+
</dependency>
4955
</dependencies>
5056

5157
</project>

0 commit comments

Comments
 (0)