Skip to content

Commit 7e36185

Browse files
conker84mneedham
authored andcommitted
fixes #14: Add schema reference into Data Events (#151)
1 parent 273c6fa commit 7e36185

29 files changed

+1214
-1035
lines changed

common/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,13 @@
1717
<version>3.5.0</version>
1818
</parent>
1919

20+
<dependencies>
21+
<dependency>
22+
<groupId>org.neo4j.driver</groupId>
23+
<artifactId>neo4j-java-driver</artifactId>
24+
<version>${neo4j.java.driver.version}</version>
25+
<scope>provided</scope>
26+
</dependency>
27+
</dependencies>
28+
2029
</project>

producer/src/main/kotlin/streams/events/StreamsEvent.kt renamed to common/src/main/kotlin/streams/events/StreamsEvent.kt

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ data class Meta(val timestamp: Long,
1616
enum class EntityType { node, relationship }
1717

1818
data class RelationshipNodeChange(val id: String,
19-
val labels: List<String>?)
19+
val labels: List<String>?,
20+
val ids: Map<String, Any>)
2021

2122
abstract class RecordChange{ abstract val properties: Map<String, Any>? }
2223
data class NodeChange(override val properties: Map<String, Any>?,
@@ -31,24 +32,26 @@ abstract class Payload {
3132
abstract val after: RecordChange?
3233
}
3334
data class NodePayload(override val id: String,
34-
override val before: RecordChange?,
35-
override val after: RecordChange?,
35+
override val before: NodeChange?,
36+
override val after: NodeChange?,
3637
override val type: EntityType = EntityType.node): Payload()
3738

3839
data class RelationshipPayload(override val id: String,
3940
val start: RelationshipNodeChange,
4041
val end: RelationshipNodeChange,
41-
override val before: RecordChange?,
42-
override val after: RecordChange?,
42+
override val before: RelationshipChange?,
43+
override val after: RelationshipChange?,
4344
val label: String,
4445
override val type: EntityType = EntityType.relationship): Payload()
4546

47+
enum class StreamsConstraintType { UNIQUE, NODE_PROPERTY_EXISTS, RELATIONSHIP_PROPERTY_EXISTS }
48+
4649
data class Constraint(val label: String?,
47-
val properties: List<String>,
48-
val type: ConstraintType)
50+
val properties: Set<String>,
51+
val type: StreamsConstraintType)
4952

50-
data class Schema(val properties: List<String> = emptyList(),
51-
val constraints: List<Constraint>? = null)
53+
data class Schema(val properties: Map<String, String> = emptyMap(),
54+
val constraints: List<Constraint> = emptyList())
5255

5356
open class StreamsEvent(open val payload: Any)
5457
data class StreamsTransactionEvent(val meta: Meta, override val payload: Payload, val schema: Schema): StreamsEvent(payload)

common/src/main/kotlin/streams/utils/JSONUtils.kt

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
package streams.serialization
22

3-
import org.codehaus.jackson.JsonGenerator
4-
import org.codehaus.jackson.JsonProcessingException
5-
import org.codehaus.jackson.Version
6-
import org.codehaus.jackson.map.JsonSerializer
7-
import org.codehaus.jackson.map.ObjectMapper
8-
import org.codehaus.jackson.map.SerializationConfig
9-
import org.codehaus.jackson.map.SerializerProvider
10-
import org.codehaus.jackson.map.module.SimpleModule
3+
import com.fasterxml.jackson.core.JsonGenerator
4+
import com.fasterxml.jackson.core.JsonProcessingException
5+
import com.fasterxml.jackson.databind.JsonSerializer
6+
import com.fasterxml.jackson.databind.ObjectMapper
7+
import com.fasterxml.jackson.databind.SerializationFeature
8+
import com.fasterxml.jackson.databind.SerializerProvider
9+
import com.fasterxml.jackson.databind.module.SimpleModule
10+
import com.fasterxml.jackson.module.kotlin.convertValue
11+
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
12+
import com.fasterxml.jackson.module.kotlin.readValue
13+
import org.neo4j.driver.internal.value.PointValue
1114
import org.neo4j.graphdb.spatial.Point
1215
import org.neo4j.values.storable.CoordinateReferenceSystem
16+
import streams.events.*
17+
import streams.utils.StreamsUtils
1318
import java.io.IOException
1419
import java.time.temporal.TemporalAccessor
1520

@@ -29,6 +34,18 @@ fun Point.toStreamsPoint(): StreamsPoint {
2934
}
3035
}
3136

37+
fun PointValue.toStreamsPoint(): StreamsPoint {
38+
val point = this.asPoint()
39+
val crsType = point.srid()
40+
return when (crsType) {
41+
CoordinateReferenceSystem.Cartesian.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian.name, point.x(), point.y())
42+
CoordinateReferenceSystem.Cartesian_3D.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian_3D.name, point.x(), point.y(), point.z())
43+
CoordinateReferenceSystem.WGS84.code -> StreamsPointWgs(CoordinateReferenceSystem.WGS84.name, point.x(), point.y())
44+
CoordinateReferenceSystem.WGS84_3D.code -> StreamsPointWgs(CoordinateReferenceSystem.WGS84_3D.name, point.x(), point.y(), point.z())
45+
else -> throw IllegalArgumentException("Point type $crsType not supported")
46+
}
47+
}
48+
3249
class PointSerializer : JsonSerializer<Point>() {
3350
@Throws(IOException::class, JsonProcessingException::class)
3451
override fun serialize(value: Point?, jgen: JsonGenerator,
@@ -40,6 +57,17 @@ class PointSerializer : JsonSerializer<Point>() {
4057
}
4158
}
4259

60+
class PointValueSerializer : JsonSerializer<PointValue>() {
61+
@Throws(IOException::class, JsonProcessingException::class)
62+
override fun serialize(value: PointValue?, jgen: JsonGenerator,
63+
provider: SerializerProvider) {
64+
if (value == null) {
65+
return
66+
}
67+
jgen.writeObject(value.toStreamsPoint())
68+
}
69+
}
70+
4371
class TemporalAccessorSerializer : JsonSerializer<TemporalAccessor>() {
4472
@Throws(IOException::class, JsonProcessingException::class)
4573
override fun serialize(value: TemporalAccessor?, jgen: JsonGenerator,
@@ -54,14 +82,15 @@ class TemporalAccessorSerializer : JsonSerializer<TemporalAccessor>() {
5482

5583
object JSONUtils {
5684

57-
private val OBJECT_MAPPER: ObjectMapper = ObjectMapper()
85+
private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper()
5886

5987
init {
60-
val module = SimpleModule("Neo4jKafkaSerializer", Version(1, 0, 0, ""))
61-
module.addSerializer(Point::class.java, PointSerializer())
88+
val module = SimpleModule("Neo4jKafkaSerializer")
89+
StreamsUtils.ignoreExceptions({ module.addSerializer(Point::class.java, PointSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
90+
StreamsUtils.ignoreExceptions({ module.addSerializer(PointValue::class.java, PointValueSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
6291
module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer())
6392
OBJECT_MAPPER.registerModule(module)
64-
OBJECT_MAPPER.disable(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS)
93+
OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
6594
}
6695

6796
fun getObjectMapper(): ObjectMapper {
@@ -81,7 +110,50 @@ object JSONUtils {
81110
return OBJECT_MAPPER.writeValueAsBytes(any)
82111
}
83112

84-
fun <T> readValue(value: ByteArray, clazz: Class<T>): T {
85-
return OBJECT_MAPPER.readValue(value, clazz)
113+
inline fun <reified T> readValue(value: ByteArray): T {
114+
return getObjectMapper().readValue(value, T::class.java)
115+
}
116+
117+
inline fun <reified T> readValue(value: Any): T {
118+
val strValue = when (value) {
119+
is String -> value
120+
else -> getObjectMapper().writeValueAsString(value)
121+
}
122+
return getObjectMapper().readValue(strValue)
123+
}
124+
125+
inline fun <reified T> convertValue(value: Any): T {
126+
return getObjectMapper().convertValue(value)
127+
}
128+
129+
@Suppress("UNCHECKED_CAST")
130+
fun asStreamsTransactionEvent(obj: Any): StreamsTransactionEvent {
131+
val value = when (obj) {
132+
is Map<*, *> -> obj as Map<String, Map<String, Any>>
133+
is String -> readValue(obj)
134+
is ByteArray -> readValue(obj)
135+
else -> convertValue(obj)
136+
}
137+
val meta = convertValue<Meta>(value.getValue("meta"))
138+
139+
val schema = convertValue<Schema>(value.getValue("schema"))
140+
141+
val payloadMap = value.getValue("payload")
142+
val type = payloadMap.getValue("type").toString()
143+
val id = payloadMap.getValue("id").toString()
144+
145+
val payload = if (type == "node") {
146+
val before = if (payloadMap["before"] != null) convertValue<NodeChange>(payloadMap["before"]!!) else null
147+
val after = if (payloadMap["after"] != null) convertValue<NodeChange>(payloadMap["after"]!!) else null
148+
NodePayload(id, before, after)
149+
} else {
150+
val label= payloadMap.getValue("label").toString()
151+
val start = convertValue<RelationshipNodeChange>(payloadMap.getValue("start"))
152+
val end = convertValue<RelationshipNodeChange>(payloadMap.getValue("end"))
153+
val before = if (payloadMap["before"] != null) convertValue<RelationshipChange>(payloadMap["before"]!!) else null
154+
val after = if (payloadMap["after"] != null) convertValue<RelationshipChange>(payloadMap["after"]!!) else null
155+
RelationshipPayload(id, start, end, before, after, label)
156+
}
157+
return StreamsTransactionEvent(meta, payload, schema)
86158
}
87159
}

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class KafkaEventConsumer(private val consumer: KafkaConsumer<String, ByteArray>,
120120
if (records != null && !records.isEmpty) {
121121
return records
122122
.map {
123-
it.topic()!! to JSONUtils.readValue(it.value(), Any::class.java)
123+
it.topic()!! to JSONUtils.readValue<Any>(it.value())
124124
}
125125
.groupBy({ it.first }, { it.second })
126126
}

doc/asciidoc/producer/configuration.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ kafka.transactional.id=
1919
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
2020
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
2121
streams.source.enable=<true/false, default=true>
22+
streams.source.schema.polling.interval=<MILLIS, the polling interval for getting the schema information>
2223
----
2324

2425
Note: To use the Kafka transactions please set `kafka.transactional.id` and `kafka.acks` properly

doc/asciidoc/producer/data/node.created.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,22 @@
1616
"after": {
1717
"labels": ["Person"],
1818
"properties": {
19-
"last_name": "Kretchmar",
2019
"email": "[email protected]",
20+
"last_name": "Kretchmar",
2121
"first_name": "Anne Marie"
2222
}
2323
}
24+
},
25+
"schema": {
26+
"properties": {
27+
"last_name": "String",
28+
"email": "String",
29+
"first_name": "String"
30+
},
31+
"constraints": [{
32+
"label": "Person",
33+
"properties": ["first_name", "last_name"],
34+
"type": "UNIQUE"
35+
}]
2436
}
2537
}

doc/asciidoc/producer/data/node.deleted.json

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,21 @@
1919
"last_name": "Kretchmar",
2020
"email": "[email protected]",
2121
"first_name": "Anne Marie",
22-
"geo":[0.123, 46.2222, 32.11111]
22+
"geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
2323
}
2424
}
25+
},
26+
"schema": {
27+
"properties": {
28+
"last_name": "String",
29+
"email": "String",
30+
"first_name": "String",
31+
"geo": "point"
32+
},
33+
"constraints": [{
34+
"label": "Person",
35+
"properties": ["first_name", "last_name"],
36+
"type": "UNIQUE"
37+
}]
2538
}
2639
}

doc/asciidoc/producer/data/node.updated.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,20 @@
2727
"last_name": "Kretchmar",
2828
"email": "[email protected]",
2929
"first_name": "Anne Marie",
30-
"geo":[0.123, 46.2222, 32.11111]
30+
"geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
3131
}
3232
}
33+
},
34+
"schema": {
35+
"properties": {
36+
"last_name": "String",
37+
"email": "String",
38+
"first_name": "String"
39+
},
40+
"constraints": [{
41+
"label": "Person",
42+
"properties": ["first_name", "last_name"],
43+
"type": "UNIQUE"
44+
}]
3345
}
3446
}

doc/asciidoc/producer/data/relationship.created.json

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,34 @@
1616
"label": "KNOWS",
1717
"start": {
1818
"labels": ["Person"],
19-
"id": "123"
19+
"id": "123",
20+
"ids": {
21+
"last_name": "Andrea",
22+
"first_name": "Santurbano"
23+
}
2024
},
2125
"end": {
2226
"labels": ["Person"],
23-
"id": "456"
27+
"id": "456",
28+
"ids": {
29+
"last_name": "Michael",
30+
"first_name": "Hunger"
31+
}
2432
},
2533
"after": {
2634
"properties": {
2735
"since": "2018-04-05T12:34:00[Europe/Berlin]"
2836
}
2937
}
38+
},
39+
"schema": {
40+
"properties": {
41+
"since": "ZonedDateTime"
42+
},
43+
"constraints": [{
44+
"label": "KNOWS",
45+
"properties": ["since"],
46+
"type": "RELATIONSHIP_PROPERTY_EXISTS"
47+
}]
3048
}
3149
}

doc/asciidoc/producer/data/relationship.deleted.json

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,36 @@
1616
"label": "KNOWS",
1717
"start": {
1818
"labels": ["Person"],
19-
"id": "123"
19+
"id": "123",
20+
"ids": {
21+
"last_name": "Andrea",
22+
"first_name": "Santurbano"
23+
}
2024
},
2125
"end": {
2226
"labels": ["Person"],
23-
"id": "456"
27+
"id": "456",
28+
"ids": {
29+
"last_name": "Michael",
30+
"first_name": "Hunger"
31+
}
2432
},
2533
"before": {
2634
"properties": {
2735
"since": "2018-04-05T12:34:00[Europe/Berlin]",
2836
"to": "2019-04-05T23:00:00[Europe/Berlin]"
2937
}
3038
}
39+
},
40+
"schema": {
41+
"properties": {
42+
"since": "ZonedDateTime",
43+
"to": "ZonedDateTime"
44+
},
45+
"constraints": [{
46+
"label": "KNOWS",
47+
"properties": ["since"],
48+
"type": "RELATIONSHIP_PROPERTY_EXISTS"
49+
}]
3150
}
3251
}

0 commit comments

Comments
 (0)