Skip to content

Commit 0e8209d

Browse files
F-Guardianconker84
authored andcommitted
Fixes an issue with wrong ids value in cdc when updating relationships (#261)
* fixed a bug with wrong ids value in cdc when updating relationships * add a test case for update rel with diff node constraint
1 parent 6e4f80f commit 0e8209d

File tree

2 files changed

+91
-1
lines changed

2 files changed

+91
-1
lines changed

producer/src/main/kotlin/streams/events/PreviousTransactionData.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class PreviousTransactionDataBuilder {
139139
.withId(it.id.toString())
140140
.withName(it.type.name())
141141
.withStartNode(it.startNode.id.toString(), startLabels, it.startNode.getProperties(*startNodeKeys))
142-
.withEndNode(it.endNode.id.toString(), endLabels, it.startNode.getProperties(*endNodeKeys))
142+
.withEndNode(it.endNode.id.toString(), endLabels, it.endNode.getProperties(*endNodeKeys))
143143
.withBefore(beforeNode)
144144
.withAfter(afterNode)
145145
.build()

producer/src/test/kotlin/streams/StreamsTransactionEventHandlerRelTest.kt

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,4 +313,94 @@ class StreamsTransactionEventHandlerRelTest {
313313
assertEquals(0,previous.deletedPayload.size)
314314
assertEquals(0,previous.createdPayload.size)
315315
}
316+
317+
@Test
318+
fun beforeUpdateRelWithDiffNodeConstraint() = runBlocking {
319+
320+
val startLabel = "Person"
321+
val endLabel = "City"
322+
val relType = "LIVES_IN"
323+
val relationshipProperties = mapOf("since" to 2016)
324+
val mockedStartNode = Mockito.mock(Node::class.java)
325+
Mockito.`when`(mockedStartNode.id).thenReturn(1)
326+
Mockito.`when`(mockedStartNode.labels).thenReturn(listOf(Label.label(startLabel)))
327+
val startNodeProperties = mapOf("name" to "James", "surname" to "Chu", "age" to 25)
328+
Mockito.`when`(mockedStartNode.allProperties).thenReturn(startNodeProperties)
329+
Mockito.`when`(mockedStartNode.propertyKeys).thenReturn(startNodeProperties.keys)
330+
Mockito.`when`(mockedStartNode.getProperties("name", "surname")).thenReturn(startNodeProperties.filterKeys { it == "name" || it == "surname" })
331+
val nodeStartConstraintDefinition = Mockito.mock(ConstraintDefinition::class.java)
332+
Mockito.`when`(nodeStartConstraintDefinition.constraintType).thenReturn(ConstraintType.NODE_KEY)
333+
Mockito.`when`(nodeStartConstraintDefinition.propertyKeys).thenReturn(listOf("name", "surname"))
334+
Mockito.`when`(nodeStartConstraintDefinition.relationshipType).thenThrow(IllegalStateException("Constraint is associated with nodes"))
335+
Mockito.`when`(nodeStartConstraintDefinition.label).thenReturn(Label.label("Person"))
336+
337+
val mockedEndNode = Mockito.mock(Node::class.java)
338+
Mockito.`when`(mockedEndNode.id).thenReturn(2)
339+
Mockito.`when`(mockedEndNode.labels).thenReturn(listOf(Label.label(endLabel)))
340+
val endNodeProperties = mapOf("name" to "Beijing", "postal_code" to "100000")
341+
Mockito.`when`(mockedEndNode.allProperties).thenReturn(endNodeProperties)
342+
Mockito.`when`(mockedEndNode.propertyKeys).thenReturn(endNodeProperties.keys)
343+
Mockito.`when`(mockedEndNode.getProperties("name")).thenReturn(endNodeProperties.filterKeys { it == "name" })
344+
val nodeEndConstraintDefinition = Mockito.mock(ConstraintDefinition::class.java)
345+
Mockito.`when`(nodeEndConstraintDefinition.constraintType).thenReturn(ConstraintType.UNIQUENESS)
346+
Mockito.`when`(nodeEndConstraintDefinition.propertyKeys).thenReturn(listOf("name"))
347+
Mockito.`when`(nodeEndConstraintDefinition.relationshipType).thenThrow(IllegalStateException("Constraint is associated with nodes"))
348+
Mockito.`when`(nodeEndConstraintDefinition.label).thenReturn(Label.label("City"))
349+
350+
val mockedRel = Mockito.mock(Relationship::class.java)
351+
Mockito.`when`(mockedRel.id).thenReturn(1)
352+
Mockito.`when`(mockedRel.type).thenReturn(RelationshipType.withName(relType))
353+
Mockito.`when`(mockedRel.startNode).thenReturn(mockedStartNode)
354+
Mockito.`when`(mockedRel.endNode).thenReturn(mockedEndNode)
355+
Mockito.`when`(mockedRel.allProperties).thenReturn(relationshipProperties)
356+
357+
val constraintDefinition = Mockito.mock(ConstraintDefinition::class.java)
358+
Mockito.`when`(constraintDefinition.constraintType).thenReturn(ConstraintType.RELATIONSHIP_PROPERTY_EXISTENCE)
359+
Mockito.`when`(constraintDefinition.relationshipType).thenReturn(RelationshipType.withName(relType))
360+
Mockito.`when`(constraintDefinition.propertyKeys).thenReturn(listOf("since"))
361+
Mockito.`when`(constraintDefinition.label).thenThrow(IllegalStateException("Constraint is associated with relationships"))
362+
363+
Mockito.`when`(schemaMock.getConstraints(RelationshipType.withName(relType))).thenReturn(listOf(constraintDefinition))
364+
Mockito.`when`(schemaMock.getConstraints(Label.label(startLabel))).thenReturn(listOf(nodeStartConstraintDefinition))
365+
Mockito.`when`(schemaMock.getConstraints(Label.label(endLabel))).thenReturn(listOf(nodeEndConstraintDefinition))
366+
Mockito.`when`(schemaMock.constraints).thenReturn(listOf(nodeStartConstraintDefinition, nodeEndConstraintDefinition, constraintDefinition))
367+
368+
delay(500) // wait the StreamsConstraintsService to load the constraints
369+
370+
val assignedProp = Mockito.mock(PropertyEntry::class.java) as PropertyEntry<Relationship>
371+
Mockito.`when`(assignedProp.entity()).thenReturn(mockedRel)
372+
Mockito.`when`(assignedProp.key()).thenReturn("since")
373+
Mockito.`when`(assignedProp.value()).thenReturn(2016)
374+
Mockito.`when`(assignedProp.previouslyCommitedValue()).thenReturn(2008)
375+
376+
val txd = Mockito.mock(TransactionData::class.java)
377+
Mockito.`when`(txd.removedRelationshipProperties()).thenReturn(mutableListOf())
378+
Mockito.`when`(txd.assignedRelationshipProperties()).thenReturn(mutableListOf(assignedProp))
379+
Mockito.`when`(txd.username()).thenReturn("mock")
380+
381+
val previous = handler.beforeCommit(txd).relData
382+
383+
assertEquals(1,previous.updatedPayloads.size)
384+
385+
val rel = previous.updatedPayloads[0]
386+
assertEquals("1",rel.id)
387+
assertEquals("LIVES_IN",rel.label)
388+
assertNotNull(rel.before)
389+
assertNotNull(rel.after)
390+
391+
assertEquals("1", rel.start.id)
392+
assertEquals(mapOf("name" to "James", "surname" to "Chu"), rel.start.ids)
393+
assertEquals(listOf("Person"), rel.start.labels)
394+
assertEquals("2", rel.end.id)
395+
assertEquals(mapOf("name" to "Beijing"), rel.end.ids)
396+
assertEquals(listOf("City"), rel.end.labels)
397+
398+
val after : RelationshipChange = rel.after as RelationshipChange
399+
assertEquals(mapOf("since" to 2016),after.properties)
400+
val before : RelationshipChange = rel.before as RelationshipChange
401+
assertEquals(mapOf("since" to 2008),before.properties)
402+
403+
assertEquals(0,previous.deletedPayload.size)
404+
assertEquals(0,previous.createdPayload.size)
405+
}
316406
}

0 commit comments

Comments
 (0)