Skip to content

Commit 8ccd59e

Browse files
authored
Fixes #429: [CDC] Schema select randomly the uk in case of multiple unique constraints (#440)
* fixes #429 * changed test * added intermediate condition with label name and more tests * simplified names * fix typo * removed forEachs - added comments * minor changes * fix failing test
1 parent 7c7d94f commit 8ccd59e

File tree

6 files changed

+842
-5
lines changed

6 files changed

+842
-5
lines changed

common/src/main/kotlin/streams/utils/SchemaUtils.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ object SchemaUtils {
1313
&& propertyKeys.containsAll(constraint.properties)
1414
&& labels.contains(constraint.label)
1515
}
16-
.minBy { it.properties.size }
16+
// we order first by properties.size, then by label name and finally by properties name alphabetically
17+
// with properties.sorted() we ensure that ("foo", "bar") and ("bar", "foo") are no different
18+
// with toString() we force it.properties to have the natural sort order, that is alphabetically
19+
.minWithOrNull((compareBy({ it.properties.size }, { it.label }, { it.properties.sorted().toString() })))
1720
?.properties
1821
.orEmpty()
19-
// .ifEmpty { propertyKeys }
2022

2123
fun toStreamsTransactionEvent(streamsSinkEntity: StreamsSinkEntity,
2224
evaluation: (StreamsTransactionEvent) -> Boolean)

common/src/test/kotlin/streams/service/sink/strategy/SchemaIngestionStrategyTest.kt

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import streams.events.*
55
import streams.service.StreamsSinkEntity
66
import streams.utils.StreamsUtils
77
import kotlin.test.assertEquals
8+
import kotlin.test.assertTrue
89

910
class SchemaIngestionStrategyTest {
1011

@@ -225,6 +226,160 @@ class SchemaIngestionStrategyTest {
225226
assertEquals(expectedRelEvents, eventsRelList)
226227
}
227228

229+
@Test
230+
fun `should create the Schema Query Strategy for relationships with multiple unique constraints`() {
231+
// the Schema Query Strategy leverage the first constraint with lowest properties
232+
// with the same size, we take the first sorted properties list alphabetically
233+
234+
// given
235+
// we shuffle the constraints to ensure that the result doesn't depend from the ordering
236+
val constraintsList = listOf(
237+
Constraint(label = "User Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("address")),
238+
Constraint(label = "User Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("country")),
239+
Constraint(label = "User Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("name", "surname")),
240+
Constraint(label = "User Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("profession", "another_one")),
241+
Constraint(label = "Product Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("code")),
242+
Constraint(label = "Product Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("name"))
243+
).shuffled()
244+
245+
val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraintsList)
246+
val idsStart = mapOf("name" to "Sherlock",
247+
"surname" to "Holmes",
248+
"country" to "UK",
249+
"profession" to "detective",
250+
"another_one" to "foo",
251+
"address" to "Baker Street")
252+
val idsEnd = mapOf("name" to "My Awesome Product", "code" to 17294)
253+
254+
val cdcDataRelationship = StreamsTransactionEvent(
255+
meta = Meta(timestamp = System.currentTimeMillis(),
256+
username = "user",
257+
txId = 1,
258+
txEventId = 2,
259+
txEventsCount = 3,
260+
operation = OperationType.updated
261+
),
262+
payload = RelationshipPayload(
263+
id = "2",
264+
start = RelationshipNodeChange(id = "1", labels = listOf("User Ext", "NewLabel"), ids = idsStart),
265+
end = RelationshipNodeChange(id = "2", labels = listOf("Product Ext", "NewLabelA"), ids = idsEnd),
266+
after = RelationshipChange(properties = mapOf("since" to 2014)),
267+
before = null,
268+
label = "HAS BOUGHT"
269+
),
270+
schema = relSchema
271+
)
272+
val cdcQueryStrategy = SchemaIngestionStrategy()
273+
val txEvents = listOf(StreamsSinkEntity(cdcDataRelationship, cdcDataRelationship))
274+
275+
// when
276+
val relationshipEvents = cdcQueryStrategy.mergeRelationshipEvents(txEvents)
277+
val relationshipDeleteEvents = cdcQueryStrategy.deleteRelationshipEvents(txEvents)
278+
279+
// then
280+
assertEquals(0, relationshipDeleteEvents.size)
281+
assertEquals(1, relationshipEvents.size)
282+
val relQuery = relationshipEvents[0].query
283+
val expectedRelQuery = """
284+
|${StreamsUtils.UNWIND}
285+
|MERGE (start:`User Ext`{address: event.start.address})
286+
|MERGE (end:`Product Ext`{code: event.end.code})
287+
|MERGE (start)-[r:`HAS BOUGHT`]->(end)
288+
|SET r = event.properties
289+
""".trimMargin()
290+
assertEquals(expectedRelQuery, relQuery.trimIndent())
291+
val eventsRelList = relationshipEvents[0].events
292+
assertEquals(1, eventsRelList.size)
293+
val expectedRelEvents = listOf(
294+
mapOf("start" to mapOf("address" to "Baker Street"),
295+
"end" to mapOf("code" to 17294),
296+
"properties" to mapOf("since" to 2014))
297+
)
298+
assertEquals(expectedRelEvents, eventsRelList)
299+
}
300+
301+
@Test
302+
fun `should create the Schema Query Strategy for relationships with multiple unique constraints and labels`() {
303+
// the Schema Query Strategy leverage the first constraint with lowest properties
304+
// with the same size, we take the first label in alphabetical order
305+
// finally, with same label name, we take the first sorted properties list alphabetically
306+
307+
// given
308+
// we shuffle the constraints to ensure that the result doesn't depend from the ordering
309+
val constraintsList = listOf(
310+
Constraint(label = "User Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("address")),
311+
Constraint(label = "User Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("country")),
312+
Constraint(label = "User AAA", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("another_two")),
313+
Constraint(label = "User Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("name", "surname")),
314+
Constraint(label = "User Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("profession", "another_one")),
315+
Constraint(label = "Product Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("code")),
316+
Constraint(label = "Product Ext", type = StreamsConstraintType.UNIQUE, properties = linkedSetOf("name"))
317+
).shuffled()
318+
319+
val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraintsList)
320+
val idsStart = mapOf("name" to "Sherlock",
321+
"surname" to "Holmes",
322+
"country" to "UK",
323+
"profession" to "detective",
324+
"another_one" to "foo",
325+
"address" to "Baker Street",
326+
"another_two" to "Dunno")
327+
val idsEnd = mapOf("name" to "My Awesome Product", "code" to 17294)
328+
329+
val cdcDataRelationship = StreamsTransactionEvent(
330+
meta = Meta(timestamp = System.currentTimeMillis(),
331+
username = "user",
332+
txId = 1,
333+
txEventId = 2,
334+
txEventsCount = 3,
335+
operation = OperationType.updated
336+
),
337+
payload = RelationshipPayload(
338+
id = "2",
339+
start = RelationshipNodeChange(id = "1", labels = listOf("User Ext", "User AAA", "NewLabel"), ids = idsStart),
340+
end = RelationshipNodeChange(id = "2", labels = listOf("Product Ext", "NewLabelA"), ids = idsEnd),
341+
after = RelationshipChange(properties = mapOf("since" to 2014)),
342+
before = null,
343+
label = "HAS BOUGHT"
344+
),
345+
schema = relSchema
346+
)
347+
val cdcQueryStrategy = SchemaIngestionStrategy()
348+
val txEvents = listOf(StreamsSinkEntity(cdcDataRelationship, cdcDataRelationship))
349+
350+
// when
351+
val relationshipEvents = cdcQueryStrategy.mergeRelationshipEvents(txEvents)
352+
val relationshipDeleteEvents = cdcQueryStrategy.deleteRelationshipEvents(txEvents)
353+
354+
// then
355+
assertEquals(0, relationshipDeleteEvents.size)
356+
assertEquals(1, relationshipEvents.size)
357+
val relQuery = relationshipEvents[0].query
358+
val expectedRelQueryOne = """
359+
|${StreamsUtils.UNWIND}
360+
|MERGE (start:`User AAA`:`User Ext`{another_two: event.start.another_two})
361+
|MERGE (end:`Product Ext`{code: event.end.code})
362+
|MERGE (start)-[r:`HAS BOUGHT`]->(end)
363+
|SET r = event.properties
364+
""".trimMargin()
365+
val expectedRelQueryTwo = """
366+
|${StreamsUtils.UNWIND}
367+
|MERGE (start:`User Ext`:`User AAA`{another_two: event.start.another_two})
368+
|MERGE (end:`Product Ext`{code: event.end.code})
369+
|MERGE (start)-[r:`HAS BOUGHT`]->(end)
370+
|SET r = event.properties
371+
""".trimMargin()
372+
assertTrue { listOf(expectedRelQueryOne, expectedRelQueryTwo).contains(relQuery.trimIndent()) }
373+
val eventsRelList = relationshipEvents[0].events
374+
assertEquals(1, eventsRelList.size)
375+
val expectedRelEvents = listOf(
376+
mapOf("start" to mapOf("another_two" to "Dunno"),
377+
"end" to mapOf("code" to 17294),
378+
"properties" to mapOf("since" to 2014))
379+
)
380+
assertEquals(expectedRelEvents, eventsRelList)
381+
}
382+
228383
@Test
229384
fun `should create the Schema Query Strategy for node deletes`() {
230385
// given

common/src/test/kotlin/streams/utils/SchemaUtilsTest.kt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,57 @@ class SchemaUtilsTest {
2020
assertEquals(setOf("foo"), keys)
2121
}
2222

23+
@Test
24+
fun `getNodeKeys should return the key sorted properly`() {
25+
// the method getNodeKeys should select (with multiple labels) the constraint with lowest properties
26+
// with the same size, we take the first label in alphabetical order
27+
// finally, with same label name, we take the first sorted properties list alphabetically
28+
29+
val pair1 = "LabelX" to setOf("foo", "aaa")
30+
val pair2 = "LabelB" to setOf("bar", "foo")
31+
val pair3 = "LabelC" to setOf("baz", "bar")
32+
val pair4 = "LabelB" to setOf("bar", "bez")
33+
val pair5 = "LabelA" to setOf("bar", "baa", "xcv")
34+
val pair6 = "LabelC" to setOf("aaa", "baa", "xcz")
35+
val pair7 = "LabelA" to setOf("foo", "aac")
36+
val pair8 = "LabelA" to setOf("foo", "aab")
37+
val props = listOf(pair1, pair2, pair3, pair4, pair5, pair6, pair7, pair8)
38+
39+
// we shuffle the constraints to ensure that the result doesn't depend from the ordering
40+
val constraints = props.map {
41+
Constraint(label = it.first, properties = it.second, type = StreamsConstraintType.UNIQUE)
42+
}.shuffled()
43+
44+
val propertyKeys = setOf("prop", "prop2", "foo", "bar", "baz", "bez", "aaa", "aab", "baa", "aac", "xcz", "xcv")
45+
val actualKeys = getNodeKeys(props.map { it.first }, propertyKeys, constraints)
46+
val expectedKeys = setOf("aab", "foo")
47+
48+
assertEquals(expectedKeys, actualKeys)
49+
}
50+
51+
@Test
52+
fun `getNodeKeys should return the key sorted properly (with one label)`() {
53+
// the method getNodeKeys should select the constraint with lowest properties
54+
// with the same size, we take the first sorted properties list alphabetically
55+
56+
val pair1 = "LabelA" to setOf("foo", "bar")
57+
val pair2 = "LabelA" to setOf("bar", "foo")
58+
val pair3 = "LabelA" to setOf("baz", "bar")
59+
val pair4 = "LabelA" to setOf("bar", "bez")
60+
val props = listOf(pair1, pair2, pair3, pair4)
61+
62+
// we shuffle the constraints to ensure that the result doesn't depend from the ordering
63+
val constraints = props.map {
64+
Constraint(label = it.first, properties = it.second, type = StreamsConstraintType.UNIQUE)
65+
}.shuffled()
66+
67+
val propertyKeys = setOf("prop", "foo", "bar", "baz", "bez")
68+
val actualKeys = getNodeKeys(listOf("LabelA"), propertyKeys, constraints)
69+
val expectedKeys = setOf("bar", "baz")
70+
71+
assertEquals(expectedKeys, actualKeys)
72+
}
73+
2374
@Test
2475
fun `getNodeKeys should return empty in case it didn't match anything`() {
2576
val props = mapOf("LabelA" to setOf("foo", "bar"),

0 commit comments

Comments
 (0)