Skip to content

Commit e165b2c

Browse files
committed
migration to Neo4j v4.0
1 parent 9518f19 commit e165b2c

File tree

51 files changed

+621
-400
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+621
-400
lines changed

common/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
<artifactId>neo4j-streams-common</artifactId>
99
<name>Neo4j Streams - Common</name>
1010
<description>Neo4j Streams - Commons Package</description>
11-
<version>3.5.4</version>
11+
<version>4.0.0</version>
1212
<packaging>jar</packaging>
1313

1414
<parent>
1515
<groupId>org.neo4j</groupId>
1616
<artifactId>neo4j-streams-parent</artifactId>
17-
<version>3.5.4</version>
17+
<version>4.0.0</version>
1818
</parent>
1919

2020
<dependencies>
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package streams.extensions
2+
3+
import org.neo4j.configuration.Config
4+
import org.neo4j.graphdb.GraphDatabaseService
5+
6+
fun GraphDatabaseService.execute(cypher: String) = this.execute(cypher, emptyMap())
7+
8+
fun GraphDatabaseService.execute(cypher: String, params: Map<String, Any>) = this.beginTx().use {
9+
val resp = it.execute(cypher, params)
10+
it.commit()
11+
resp
12+
}
13+
14+
fun Config.raw() = this.values.mapNotNull {
15+
val value = it.value?.toString()
16+
if (value == null) {
17+
null
18+
} else {
19+
val key = it.key.name()
20+
key to value
21+
}
22+
}.toMap()

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import kotlinx.coroutines.delay
66
import kotlinx.coroutines.launch
77
import org.neo4j.graphdb.QueryExecutionException
88
import org.neo4j.kernel.internal.GraphDatabaseAPI
9-
import org.neo4j.logging.internal.LogService
109
import org.neo4j.logging.Log
11-
import org.neo4j.logging.NullLog
10+
import org.neo4j.logging.internal.LogService
11+
import streams.extensions.execute
1212
import java.lang.reflect.InvocationTargetException
1313
import kotlin.streams.toList
1414

1515
object Neo4jUtils {
1616
@JvmStatic val LEADER = "LEADER"
17+
@JvmStatic val SYSTEM_DATABASE_NAME = "system"
1718
fun isWriteableInstance(db: GraphDatabaseAPI): Boolean {
1819
try {
1920
val isSlave = StreamsUtils.ignoreExceptions(

common/src/main/kotlin/streams/utils/StreamsUtils.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package streams.utils
22

33
object StreamsUtils {
44

5-
const val UNWIND: String = "UNWIND {events} AS event"
5+
const val UNWIND: String = "UNWIND \$events AS event"
66

77
const val STREAMS_CONFIG_PREFIX = "streams."
88

common/src/test/kotlin/streams/service/sink/strategy/SchemaIngestionStrategyTest.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.junit.Test
44
import streams.events.*
55
import streams.serialization.JSONUtils
66
import streams.service.StreamsSinkEntity
7+
import streams.utils.StreamsUtils
78
import kotlin.test.assertEquals
89

910
class SchemaIngestionStrategyTest {
@@ -76,7 +77,7 @@ class SchemaIngestionStrategyTest {
7677
assertEquals(1, nodeEvents.size)
7778
val nodeQuery = nodeEvents[0].query
7879
val expectedNodeQuery = """
79-
|UNWIND {events} AS event
80+
|${StreamsUtils.UNWIND}
8081
|MERGE (n:User{name: event.properties.name, surname: event.properties.surname})
8182
|SET n = event.properties
8283
""".trimMargin()
@@ -93,7 +94,7 @@ class SchemaIngestionStrategyTest {
9394
assertEquals(1, relationshipEvents.size)
9495
val relQuery = relationshipEvents[0].query
9596
val expectedRelQuery = """
96-
|UNWIND {events} AS event
97+
|${StreamsUtils.UNWIND}
9798
|MERGE (start:User{name: event.start.name, surname: event.start.surname})
9899
|MERGE (end:User{name: event.end.name, surname: event.end.surname})
99100
|MERGE (start)-[r:`KNOWS WHO`]->(end)
@@ -156,7 +157,7 @@ class SchemaIngestionStrategyTest {
156157
assertEquals(1, nodeEvents.size)
157158
val nodeQuery = nodeEvents[0].query
158159
val expectedNodeQuery = """
159-
|UNWIND {events} AS event
160+
|${StreamsUtils.UNWIND}
160161
|MERGE (n:User{name: event.properties.name, surname: event.properties.surname})
161162
|SET n = event.properties
162163
|SET n:NewLabel
@@ -208,7 +209,7 @@ class SchemaIngestionStrategyTest {
208209
assertEquals(1, relationshipEvents.size)
209210
val relQuery = relationshipEvents[0].query
210211
val expectedRelQuery = """
211-
|UNWIND {events} AS event
212+
|${StreamsUtils.UNWIND}
212213
|MERGE (start:`User Ext`{name: event.start.name, surname: event.start.surname})
213214
|MERGE (end:`Product Ext`{name: event.end.name})
214215
|MERGE (start)-[r:`HAS BOUGHT`]->(end)
@@ -272,7 +273,7 @@ class SchemaIngestionStrategyTest {
272273
assertEquals(0, nodeEvents.size)
273274
val nodeQuery = nodeDeleteEvents[0].query
274275
val expectedNodeQuery = """
275-
|UNWIND {events} AS event
276+
|${StreamsUtils.UNWIND}
276277
|MATCH (n:User{name: event.properties.name, surname: event.properties.surname})
277278
|DETACH DELETE n
278279
""".trimMargin()
@@ -321,7 +322,7 @@ class SchemaIngestionStrategyTest {
321322
assertEquals(0, relationshipEvents.size)
322323
val relQuery = relationshipDeleteEvents[0].query
323324
val expectedRelQuery = """
324-
|UNWIND {events} AS event
325+
|${StreamsUtils.UNWIND}
325326
|MATCH (start:User{name: event.start.name, surname: event.start.surname})
326327
|MATCH (end:User{name: event.end.name, surname: event.end.surname})
327328
|MATCH (start)-[r:`KNOWS WHO`]->(end)

common/src/test/kotlin/streams/service/sink/strategy/SourceIdIngestionStrategyTest.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import org.junit.Test
44
import streams.events.*
55
import streams.serialization.JSONUtils
66
import streams.service.StreamsSinkEntity
7+
import streams.utils.StreamsUtils
78
import kotlin.test.assertEquals
89

910
class SourceIdIngestionStrategyTest {
@@ -76,7 +77,7 @@ class SourceIdIngestionStrategyTest {
7677
assertEquals(1, nodeEvents.size)
7778
val nodeQuery = nodeEvents[0].query
7879
val expectedNodeQuery = """
79-
|UNWIND {events} AS event
80+
|${StreamsUtils.UNWIND}
8081
|MERGE (n:`Custom SourceEvent`{`custom Id`: event.id})
8182
|SET n = event.properties
8283
|SET n.`custom Id` = event.id
@@ -95,7 +96,7 @@ class SourceIdIngestionStrategyTest {
9596
assertEquals(1, relationshipEvents.size)
9697
val relQuery = relationshipEvents[0].query
9798
val expectedRelQuery = """
98-
|UNWIND {events} AS event
99+
|${StreamsUtils.UNWIND}
99100
|MERGE (start:`Custom SourceEvent`{`custom Id`: event.start})
100101
|MERGE (end:`Custom SourceEvent`{`custom Id`: event.end})
101102
|MERGE (start)-[r:`KNOWS WHO`{`custom Id`: event.id}]->(end)
@@ -158,7 +159,7 @@ class SourceIdIngestionStrategyTest {
158159
assertEquals(1, nodeEvents.size)
159160
val nodeQuery = nodeEvents[0].query
160161
val expectedNodeQuery = """
161-
|UNWIND {events} AS event
162+
|${StreamsUtils.UNWIND}
162163
|MERGE (n:SourceEvent{sourceId: event.id})
163164
|SET n = event.properties
164165
|SET n.sourceId = event.id
@@ -208,7 +209,7 @@ class SourceIdIngestionStrategyTest {
208209
assertEquals(1, relationshipEvents.size)
209210
val relQuery = relationshipEvents[0].query
210211
val expectedRelQuery = """
211-
|UNWIND {events} AS event
212+
|${StreamsUtils.UNWIND}
212213
|MERGE (start:SourceEvent{sourceId: event.start})
213214
|MERGE (end:SourceEvent{sourceId: event.end})
214215
|MERGE (start)-[r:`KNOWS WHO`{sourceId: event.id}]->(end)
@@ -271,7 +272,7 @@ class SourceIdIngestionStrategyTest {
271272
assertEquals(0, nodeEvents.size)
272273
val nodeQuery = nodeDeleteEvents[0].query
273274
val expectedNodeQuery = """
274-
|UNWIND {events} AS event MATCH (n:SourceEvent{sourceId: event.id}) DETACH DELETE n
275+
|${StreamsUtils.UNWIND} MATCH (n:SourceEvent{sourceId: event.id}) DETACH DELETE n
275276
""".trimMargin()
276277
assertEquals(expectedNodeQuery, nodeQuery.trimIndent())
277278
val eventsNodeList = nodeDeleteEvents[0].events
@@ -316,7 +317,7 @@ class SourceIdIngestionStrategyTest {
316317
assertEquals(0, relationshipEvents.size)
317318
val relQuery = relationshipDeleteEvents[0].query
318319
val expectedRelQuery = """
319-
|UNWIND {events} AS event MATCH ()-[r:`KNOWS WHO`{sourceId: event.id}]-() DELETE r
320+
|${StreamsUtils.UNWIND} MATCH ()-[r:`KNOWS WHO`{sourceId: event.id}]-() DELETE r
320321
""".trimMargin()
321322
assertEquals(expectedRelQuery, relQuery.trimIndent())
322323
val eventsRelList = relationshipDeleteEvents[0].events

common/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,16 @@
11
package streams.utils
22

3-
import org.junit.*
4-
import org.neo4j.kernel.internal.GraphDatabaseAPI
5-
import org.neo4j.test.TestGraphDatabaseFactory
3+
import org.junit.ClassRule
4+
import org.junit.Test
5+
import org.neo4j.test.rule.ImpermanentDbmsRule
66
import kotlin.test.assertFalse
77
import kotlin.test.assertTrue
88

99
class Neo4jUtilsTest {
1010

1111
companion object {
12-
private lateinit var db: GraphDatabaseAPI
13-
@BeforeClass
14-
@JvmStatic
15-
fun setUp() {
16-
db = TestGraphDatabaseFactory()
17-
.newImpermanentDatabaseBuilder()
18-
.newGraphDatabase() as GraphDatabaseAPI
19-
}
20-
21-
@AfterClass
22-
@JvmStatic
23-
fun tearDown() {
24-
db.shutdown()
25-
}
12+
@ClassRule @JvmField
13+
val db = ImpermanentDbmsRule()
2614
}
2715

2816
@Test

consumer/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66

77
<groupId>org.neo4j</groupId>
88
<artifactId>neo4j-streams-consumer</artifactId>
9-
<version>3.5.4</version>
9+
<version>4.0.0</version>
1010
<name>Neo4j Streams - Consumer</name>
1111
<description>Neo4j Streams - Kafka Consumer</description>
1212
<packaging>jar</packaging>
1313

1414
<parent>
1515
<groupId>org.neo4j</groupId>
1616
<artifactId>neo4j-streams-parent</artifactId>
17-
<version>3.5.4</version>
17+
<version>4.0.0</version>
1818
</parent>
1919

2020
<repositories>
@@ -35,7 +35,7 @@
3535
<dependency>
3636
<groupId>org.neo4j</groupId>
3737
<artifactId>neo4j-streams-common</artifactId>
38-
<version>3.5.4</version>
38+
<version>4.0.0</version>
3939
</dependency>
4040
<dependency>
4141
<groupId>org.apache.avro</groupId>

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package streams
22

3-
import org.neo4j.kernel.configuration.Config
3+
import org.neo4j.configuration.Config
44
import org.neo4j.kernel.internal.GraphDatabaseAPI
55
import org.neo4j.logging.Log
6+
import streams.extensions.raw
67

78
abstract class StreamsEventSink(private val config: Config,
89
private val queryExecution: StreamsEventSinkQueryExecution,
@@ -28,7 +29,7 @@ abstract class StreamsEventSink(private val config: Config,
2829
object StreamsEventSinkFactory {
2930
fun getStreamsEventSink(config: Config, streamsQueryExecution: StreamsEventSinkQueryExecution,
3031
streamsTopicService: StreamsTopicService, log: Log, db: GraphDatabaseAPI): StreamsEventSink {
31-
return Class.forName(config.raw.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
32+
return Class.forName(config.raw().getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
3233
.getConstructor(Config::class.java,
3334
StreamsEventSinkQueryExecution::class.java,
3435
StreamsTopicService::class.java,

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package streams
22

33
import org.neo4j.kernel.availability.AvailabilityGuard
44
import org.neo4j.kernel.availability.AvailabilityListener
5-
import org.neo4j.kernel.configuration.Config
5+
import org.neo4j.configuration.Config
66
import org.neo4j.kernel.extension.ExtensionType
7-
import org.neo4j.kernel.extension.KernelExtensionFactory
8-
import org.neo4j.kernel.impl.spi.KernelContext
7+
import org.neo4j.kernel.extension.ExtensionFactory
8+
import org.neo4j.kernel.extension.context.ExtensionContext
99
import org.neo4j.kernel.internal.GraphDatabaseAPI
1010
import org.neo4j.kernel.lifecycle.Lifecycle
1111
import org.neo4j.kernel.lifecycle.LifecycleAdapter
@@ -15,9 +15,9 @@ import streams.service.TopicUtils
1515
import streams.utils.Neo4jUtils
1616
import streams.utils.StreamsUtils
1717

18-
class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSinkExtensionFactory.Dependencies>(ExtensionType.DATABASE,"Streams.Consumer") {
18+
class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtensionFactory.Dependencies>(ExtensionType.DATABASE,"Streams.Consumer") {
1919

20-
override fun newInstance(context: KernelContext, dependencies: Dependencies): Lifecycle {
20+
override fun newInstance(context: ExtensionContext, dependencies: Dependencies): Lifecycle {
2121
return StreamsEventLifecycle(dependencies)
2222
}
2323

0 commit comments

Comments
 (0)