Skip to content

Commit 59bac57

Browse files
authored
fixes #567: ( Impact on BC/DR - Data loss ) - High priority, Nodes with PointValue property cannot be successfully sink to Neo4j With Neo4j Streams Plugin (#584)
1 parent 7a2d23b commit 59bac57

File tree

2 files changed

+219
-11
lines changed

2 files changed

+219
-11
lines changed

common/src/main/kotlin/streams/utils/JSONUtils.kt

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@ package streams.utils
22

33
import com.fasterxml.jackson.core.JsonGenerator
44
import com.fasterxml.jackson.core.JsonParseException
5+
import com.fasterxml.jackson.core.JsonParser
56
import com.fasterxml.jackson.core.JsonProcessingException
7+
import com.fasterxml.jackson.databind.DeserializationContext
68
import com.fasterxml.jackson.databind.DeserializationFeature
9+
import com.fasterxml.jackson.databind.JsonDeserializer
10+
import com.fasterxml.jackson.databind.JsonNode
711
import com.fasterxml.jackson.databind.JsonSerializer
812
import com.fasterxml.jackson.databind.ObjectMapper
913
import com.fasterxml.jackson.databind.SerializationFeature
@@ -13,10 +17,18 @@ import com.fasterxml.jackson.module.kotlin.convertValue
1317
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
1418
import com.fasterxml.jackson.module.kotlin.readValue
1519
import org.neo4j.driver.Value
20+
import org.neo4j.driver.Values
1621
import org.neo4j.driver.internal.value.PointValue
1722
import org.neo4j.driver.types.Node
1823
import org.neo4j.driver.types.Point
1924
import org.neo4j.driver.types.Relationship
25+
import streams.events.EntityType
26+
import streams.events.Meta
27+
import streams.events.NodePayload
28+
import streams.events.Payload
29+
import streams.events.RecordChange
30+
import streams.events.RelationshipPayload
31+
import streams.events.Schema
2032
import streams.events.StreamsTransactionEvent
2133
import streams.events.StreamsTransactionNodeEvent
2234
import streams.events.StreamsTransactionRelationshipEvent
@@ -94,6 +106,125 @@ class DriverRelationshipSerializer : JsonSerializer<Relationship>() {
94106
}
95107
}
96108

109+
class StreamsTransactionRelationshipEventDeserializer : StreamsTransactionEventDeserializer<StreamsTransactionRelationshipEvent, RelationshipPayload>() {
110+
override fun createEvent(meta: Meta, payload: RelationshipPayload, schema: Schema): StreamsTransactionRelationshipEvent {
111+
return StreamsTransactionRelationshipEvent(meta, payload, schema)
112+
}
113+
114+
override fun convertPayload(payloadMap: JsonNode): RelationshipPayload {
115+
return JSONUtils.convertValue<RelationshipPayload>(payloadMap)
116+
}
117+
118+
override fun fillPayload(payload: RelationshipPayload,
119+
beforeProps: Map<String, Any>?,
120+
afterProps: Map<String, Any>?): RelationshipPayload {
121+
return payload.copy(
122+
before = payload.before?.copy(properties = beforeProps),
123+
after = payload.after?.copy(properties = afterProps)
124+
)
125+
}
126+
127+
override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionRelationshipEvent {
128+
val deserialized = super.deserialize(parser, context)
129+
if (deserialized.payload.type == EntityType.node) {
130+
throw IllegalArgumentException("Relationship event expected, but node type found")
131+
}
132+
return deserialized
133+
}
134+
135+
}
136+
137+
class StreamsTransactionNodeEventDeserializer : StreamsTransactionEventDeserializer<StreamsTransactionNodeEvent, NodePayload>() {
138+
override fun createEvent(meta: Meta, payload: NodePayload, schema: Schema): StreamsTransactionNodeEvent {
139+
return StreamsTransactionNodeEvent(meta, payload, schema)
140+
}
141+
142+
override fun convertPayload(payloadMap: JsonNode): NodePayload {
143+
return JSONUtils.convertValue<NodePayload>(payloadMap)
144+
}
145+
146+
override fun fillPayload(payload: NodePayload,
147+
beforeProps: Map<String, Any>?,
148+
afterProps: Map<String, Any>?): NodePayload {
149+
return payload.copy(
150+
before = payload.before?.copy(properties = beforeProps),
151+
after = payload.after?.copy(properties = afterProps)
152+
)
153+
}
154+
155+
override fun deserialize(parser: JsonParser, context: DeserializationContext): StreamsTransactionNodeEvent {
156+
val deserialized = super.deserialize(parser, context)
157+
if (deserialized.payload.type == EntityType.relationship) {
158+
throw IllegalArgumentException("Node event expected, but relationship type found")
159+
}
160+
return deserialized
161+
}
162+
163+
}
164+
165+
abstract class StreamsTransactionEventDeserializer<EVENT, PAYLOAD: Payload> : JsonDeserializer<EVENT>() {
166+
167+
abstract fun createEvent(meta: Meta, payload: PAYLOAD, schema: Schema): EVENT
168+
abstract fun convertPayload(payloadMap: JsonNode): PAYLOAD
169+
abstract fun fillPayload(payload: PAYLOAD,
170+
beforeProps: Map<String, Any>?,
171+
afterProps: Map<String, Any>?): PAYLOAD
172+
173+
@Throws(IOException::class, JsonProcessingException::class)
174+
override fun deserialize(parser: JsonParser, context: DeserializationContext): EVENT {
175+
val root: JsonNode = parser.codec.readTree(parser)
176+
val meta = JSONUtils.convertValue<Meta>(root["meta"])
177+
val schema = JSONUtils.convertValue<Schema>(root["schema"])
178+
val points = schema.properties.filterValues { it == "PointValue" }.keys
179+
var payload = convertPayload(root["payload"])
180+
if (points.isNotEmpty()) {
181+
val beforeProps = convertPoints(payload.before, points)
182+
val afterProps = convertPoints(payload.after, points)
183+
payload = fillPayload(payload, beforeProps, afterProps)
184+
}
185+
return createEvent(meta, payload, schema)
186+
}
187+
188+
private fun convertPoints(
189+
recordChange: RecordChange?,
190+
points: Set<String>
191+
) = recordChange
192+
?.properties
193+
?.mapValues {
194+
if (points.contains(it.key)) {
195+
val pointMap = it.value as Map<String, Any>
196+
when (pointMap["crs"]) {
197+
"cartesian" -> Values.point(
198+
7203,
199+
pointMap["x"].toString().toDouble(),
200+
pointMap["y"].toString().toDouble()
201+
)
202+
"cartesian-3d" -> Values.point(
203+
9157,
204+
pointMap["x"].toString().toDouble(),
205+
pointMap["y"].toString().toDouble(),
206+
pointMap["z"].toString().toDouble()
207+
)
208+
"wgs-84" -> Values.point(
209+
4326,
210+
pointMap["longitude"].toString().toDouble(),
211+
pointMap["latitude"].toString().toDouble()
212+
)
213+
"wgs-84-3d" -> Values.point(
214+
4979,
215+
pointMap["longitude"].toString().toDouble(),
216+
pointMap["latitude"].toString().toDouble(),
217+
pointMap["height"].toString().toDouble()
218+
)
219+
else -> throw IllegalArgumentException("CRS value: ${pointMap["crs"]} not found")
220+
}
221+
} else {
222+
it.value
223+
}
224+
}
225+
226+
}
227+
97228
object JSONUtils {
98229

99230
private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper()
@@ -106,10 +237,13 @@ object JSONUtils {
106237
module.addSerializer(PointValue::class.java, PointValueSerializer())
107238
module.addSerializer(Node::class.java, DriverNodeSerializer())
108239
module.addSerializer(Relationship::class.java, DriverRelationshipSerializer())
240+
module.addDeserializer(StreamsTransactionRelationshipEvent::class.java, StreamsTransactionRelationshipEventDeserializer())
241+
module.addDeserializer(StreamsTransactionNodeEvent::class.java, StreamsTransactionNodeEventDeserializer())
109242
module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer())
110243
OBJECT_MAPPER.registerModule(module)
111244
OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
112245
OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
246+
STRICT_OBJECT_MAPPER.registerModule(module)
113247
}
114248

115249
fun getObjectMapper(): ObjectMapper = OBJECT_MAPPER

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

Lines changed: 85 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,23 @@ class Neo4jSinkTaskTest {
331331
SinkTask.TOPICS_CONFIG to firstTopic)
332332

333333
val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname")))
334-
val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraints)
335-
val nodeSchema = Schema(properties = mapOf("name" to "String", "surname" to "String", "comp@ny" to "String"),
336-
constraints = constraints)
334+
val relSchema = Schema(properties = mapOf(
335+
"since" to "Long",
336+
"where" to "PointValue"
337+
), constraints = constraints)
338+
val nodeSchema = Schema(
339+
properties = mapOf(
340+
"name" to "String",
341+
"surname" to "String",
342+
"comp@ny" to "String",
343+
"bornIn2d" to "PointValue",
344+
"bornIn3d" to "PointValue",
345+
"livesIn2d" to "PointValue",
346+
"livesIn3d" to "PointValue",
347+
"worksIn2d" to "PointValue",
348+
"worksIn3d" to "PointValue"
349+
),
350+
constraints = constraints)
337351
val cdcDataStart = StreamsTransactionEvent(
338352
meta = Meta(timestamp = System.currentTimeMillis(),
339353
username = "user",
@@ -344,7 +358,47 @@ class Neo4jSinkTaskTest {
344358
),
345359
payload = NodePayload(id = "0",
346360
before = null,
347-
after = NodeChange(properties = mapOf("name" to "Andrea", "surname" to "Santurbano", "comp@ny" to "LARUS-BA"), labels = listOf("User"))
361+
after = NodeChange(
362+
properties = mapOf(
363+
"name" to "Andrea",
364+
"surname" to "Santurbano",
365+
"comp@ny" to "LARUS-BA",
366+
"bornIn3d" to mapOf(
367+
"crs" to "wgs-84-3d",
368+
"latitude" to 12.78,
369+
"longitude" to 56.7,
370+
"height" to 100.0,
371+
),
372+
"bornIn2d" to mapOf(
373+
"crs" to "wgs-84",
374+
"latitude" to 12.78,
375+
"longitude" to 56.7
376+
),
377+
"livesIn3d" to mapOf(
378+
"crs" to "wgs-84-3d",
379+
"latitude" to 12.79,
380+
"longitude" to 56.71,
381+
"height" to 100.0,
382+
),
383+
"livesIn2d" to mapOf(
384+
"crs" to "wgs-84",
385+
"latitude" to 12.79,
386+
"longitude" to 56.71
387+
),
388+
"worksIn2d" to mapOf(
389+
"crs" to "cartesian",
390+
"x" to 1.2,
391+
"y" to 10.1
392+
),
393+
"worksIn3d" to mapOf(
394+
"crs" to "cartesian-3d",
395+
"x" to 1.2,
396+
"y" to 10.1,
397+
"z" to 7.1
398+
)
399+
),
400+
labels = listOf("User")
401+
)
348402
),
349403
schema = nodeSchema
350404
)
@@ -371,12 +425,20 @@ class Neo4jSinkTaskTest {
371425
operation = OperationType.created
372426
),
373427
payload = RelationshipPayload(
374-
id = "2",
375-
start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")),
376-
end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")),
377-
after = RelationshipChange(properties = mapOf("since" to 2014)),
378-
before = null,
379-
label = "KNOWS WHO"
428+
id = "2",
429+
start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")),
430+
end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")),
431+
after = RelationshipChange(properties = mapOf(
432+
"since" to 2014,
433+
"where" to mapOf(
434+
"crs" to "wgs-84-3d",
435+
"latitude" to 12.78,
436+
"longitude" to 56.7,
437+
"height" to 80.0,
438+
)
439+
)),
440+
before = null,
441+
label = "MEET"
380442
),
381443
schema = relSchema
382444
)
@@ -388,7 +450,19 @@ class Neo4jSinkTaskTest {
388450
task.put(input)
389451
session.beginTransaction().use {
390452
val query = """
391-
|MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
453+
|MATCH (s:User{
454+
| name:'Andrea',
455+
| surname:'Santurbano',
456+
| `comp@ny`:'LARUS-BA',
457+
| bornIn3d: point({x: 56.7, y: 12.78, z: 100.0, crs: 'wgs-84-3d'}),
458+
| bornIn2d: point({x: 56.7, y: 12.78, crs: 'wgs-84'}),
459+
| livesIn3d: point({longitude: 56.71, latitude: 12.79, height: 100}),
460+
| livesIn2d: point({longitude: 56.71, latitude: 12.79}),
461+
| worksIn2d: point({x: 1.2, y: 10.1, crs: 'cartesian'}),
462+
| worksIn3d: point({x: 1.2, y: 10.1, z: 7.1, crs: 'cartesian-3d'})
463+
|})
464+
|MATCH (t:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
465+
|MATCH p = (s)-[r:MEET{since: 2014, where: point({x: 56.7, y: 12.78, z: 80.0, crs: 'wgs-84-3d'})}]->(t)
392466
|RETURN count(p) AS count
393467
|""".trimMargin()
394468
val result = it.run(query)

0 commit comments

Comments
 (0)