Skip to content

Commit 2715c92

Browse files
conker84jexp
authored andcommitted
Rebase v 3.5 to the last version of master branch (#106)
1 parent f87c0ed commit 2715c92

File tree

56 files changed

+1833
-272
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1833
-272
lines changed

common/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.neo4j</groupId>
8+
<artifactId>neo4j-streams-common</artifactId>
9+
<name>Neo4j Streams - Common</name>
10+
<description>Neo4j Streams - Commons Package</description>
11+
<version>3.5.0</version>
12+
<packaging>jar</packaging>
13+
14+
<parent>
15+
<groupId>org.neo4j</groupId>
16+
<artifactId>neo4j-streams-parent</artifactId>
17+
<version>3.5.0</version>
18+
</parent>
19+
20+
</project>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package streams.extensions
2+
3+
import org.neo4j.graphdb.Node
4+
5+
fun Map<String,String>.getInt(name:String, defaultValue: Int) = this.get(name)?.toInt() ?: defaultValue
6+
7+
fun Node.labelNames() : List<String> {
8+
return this.labels.map { it.name() }
9+
}
10+
11+
fun String.toPointCase(): String {
12+
return this.split("(?<=[a-z])(?=[A-Z])".toRegex()).joinToString(separator = ".").toLowerCase()
13+
}

producer/src/main/kotlin/streams/serialization/JacksonUtil.kt renamed to common/src/main/kotlin/streams/utils/JSONUtils.kt

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class TemporalAccessorSerializer : JsonSerializer<TemporalAccessor>() {
5252
}
5353

5454

55-
object JacksonUtil {
55+
object JSONUtils {
5656

5757
private val OBJECT_MAPPER: ObjectMapper = ObjectMapper()
5858

@@ -64,8 +64,24 @@ object JacksonUtil {
6464
OBJECT_MAPPER.disable(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS)
6565
}
6666

67-
fun getMapper(): ObjectMapper {
67+
fun getObjectMapper(): ObjectMapper {
6868
return OBJECT_MAPPER
6969
}
7070

71+
fun asMap(any: Any): Map<String, Any?> {
72+
return OBJECT_MAPPER.convertValue(any, Map::class.java)
73+
.mapKeys { it.key.toString() }
74+
}
75+
76+
fun writeValueAsString(any: Any): String {
77+
return OBJECT_MAPPER.writeValueAsString(any)
78+
}
79+
80+
fun writeValueAsBytes(any: Any): ByteArray {
81+
return OBJECT_MAPPER.writeValueAsBytes(any)
82+
}
83+
84+
fun <T> readValue(value: ByteArray, clazz: Class<T>): T {
85+
return OBJECT_MAPPER.readValue(value, clazz)
86+
}
7187
}

consumer/src/main/kotlin/streams/utils/Neo4jUtils.kt renamed to common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package streams.utils
22

33
import org.neo4j.graphdb.QueryExecutionException
4-
import org.neo4j.kernel.impl.logging.LogService
54
import org.neo4j.kernel.internal.GraphDatabaseAPI
5+
import org.neo4j.logging.internal.LogService
66
import java.lang.reflect.InvocationTargetException
77

88
object Neo4jUtils {

consumer/src/main/kotlin/streams/utils/StreamsUtils.kt renamed to common/src/main/kotlin/streams/utils/StreamsUtils.kt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,20 @@ package streams.utils
22

33
object StreamsUtils {
44

5+
const val UNWIND: String = "UNWIND {events} AS event"
6+
7+
const val STREAMS_CONFIG_PREFIX = "streams."
8+
9+
const val STREAMS_SINK_TOPIC_PREFIX = "sink.topic.cypher."
10+
511
fun <T> ignoreExceptions(action: () -> T, vararg toIgnore: Class<out Throwable>): T? {
612
return try {
713
action()
814
} catch (e: Throwable) {
9-
when (e::class.java) {
15+
if (toIgnore.isEmpty()) {
16+
return null
17+
}
18+
return when (e::class.java) {
1019
in toIgnore -> null
1120
else -> throw e
1221
}

producer/src/test/kotlin/streams/serialization/JacksonUtilTest.kt renamed to common/src/test/kotlin/streams/utils/JSONUtilsTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.neo4j.values.storable.Values.pointValue
99
import java.time.ZoneOffset.UTC
1010
import kotlin.test.assertEquals
1111

12-
class JacksonUtilTest {
12+
class JSONUtilsTest {
1313

1414
@Test
1515
fun shouldSerializeGeometryAndTemporalDataTypes() {
@@ -27,7 +27,7 @@ class JacksonUtilTest {
2727
"dateTime" to datetime(date( 2017, 12, 17), time( 17, 14, 35, 123456789, UTC)))
2828

2929
// When
30-
val jsonString = JacksonUtil.getMapper().writeValueAsString(map)
30+
val jsonString = JSONUtils.writeValueAsString(map)
3131

3232
// Then
3333
assertEquals(expected, jsonString)

consumer/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt renamed to common/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package streams.utils
22

3-
import org.junit.*
4-
import org.neo4j.kernel.impl.logging.LogService
3+
import org.junit.AfterClass
4+
import org.junit.BeforeClass
5+
import org.junit.Test
56
import org.neo4j.kernel.internal.GraphDatabaseAPI
67
import org.neo4j.test.TestGraphDatabaseFactory
78
import kotlin.test.assertTrue

consumer/pom.xml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,23 @@
66

77
<groupId>org.neo4j</groupId>
88
<artifactId>neo4j-streams-consumer</artifactId>
9-
<version>3.4.0</version>
9+
<version>3.5.0</version>
1010
<name>Neo4j Streams - Consumer</name>
1111
<description>Neo4j Streams - Kafka Consumer</description>
1212
<packaging>jar</packaging>
1313

1414
<parent>
1515
<groupId>org.neo4j</groupId>
1616
<artifactId>neo4j-streams-parent</artifactId>
17-
<version>3.4.0</version>
17+
<version>3.5.0</version>
1818
</parent>
1919

20+
<dependencies>
21+
<dependency>
22+
<groupId>org.neo4j</groupId>
23+
<artifactId>neo4j-streams-common</artifactId>
24+
<version>3.5.0</version>
25+
</dependency>
26+
</dependencies>
2027

2128
</project>
Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,51 @@
11
package streams
22

3-
import org.neo4j.kernel.AvailabilityGuard
43
import org.neo4j.kernel.configuration.Config
5-
import org.neo4j.kernel.internal.GraphDatabaseAPI
4+
import org.neo4j.logging.Log
65

76
abstract class StreamsEventSink(private val config: Config,
8-
private val db: GraphDatabaseAPI): AvailabilityGuard.AvailabilityListener {
7+
private val queryExecution: StreamsEventSinkQueryExecution,
8+
private val streamsTopicService: StreamsTopicService,
9+
private val log: Log) {
910

10-
abstract var streamsTopicService: StreamsTopicService?
11+
abstract fun stop()
12+
13+
abstract fun start()
14+
15+
abstract fun getEventConsumerFactory(): StreamsEventConsumerFactory
16+
17+
abstract fun getEventSinkConfigMapper(): StreamsEventSinkConfigMapper
18+
19+
}
20+
21+
abstract class StreamsEventConsumer<T>(private val consumer: T, config: StreamsSinkConfiguration, private val log: Log) {
1122

1223
abstract fun stop()
1324

25+
abstract fun withTopics(topics: Set<String>): StreamsEventConsumer<T>
26+
1427
abstract fun start()
1528

16-
override fun unavailable() {
17-
stop()
18-
}
29+
abstract fun read(): Map<String, List<Map<String, Any?>>>?
1930

20-
override fun available() {
21-
start()
22-
}
31+
}
2332

33+
abstract class StreamsEventConsumerFactory {
34+
abstract fun createStreamsEventConsumer(config: Map<String, String>, log: Log): StreamsEventConsumer<*>
2435
}
2536

2637
object StreamsEventSinkFactory {
27-
fun getStreamsEventSink(config: Config, db: GraphDatabaseAPI): StreamsEventSink {
38+
fun getStreamsEventSink(config: Config, streamsQueryExecution: StreamsEventSinkQueryExecution,
39+
streamsTopicService: StreamsTopicService, log: Log): StreamsEventSink {
2840
return Class.forName(config.raw.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
2941
.getConstructor(Config::class.java,
30-
GraphDatabaseAPI::class.java)
31-
.newInstance(config, db) as StreamsEventSink
42+
StreamsEventSinkQueryExecution::class.java,
43+
StreamsTopicService::class.java,
44+
Log::class.java)
45+
.newInstance(config, streamsQueryExecution, streamsTopicService, log) as StreamsEventSink
3246
}
47+
}
48+
49+
abstract class StreamsEventSinkConfigMapper(private val baseConfiguration: Map<String, String>, private val mapping: Map<String, String>) {
50+
abstract fun convert(config: Map<String, String>): Map<String, String>
3351
}

0 commit comments

Comments
 (0)