Skip to content

Commit 0fbf715

Browse files
authored
fixes #264: Neo4j Sink Connector: CDC ingestion with schema strategy not working properly (#265)
1 parent 0e8209d commit 0fbf715

File tree

2 files changed

+98
-11
lines changed

2 files changed

+98
-11
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,19 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
7676
driver.close()
7777
}
7878

79-
override fun getTopicType(topic: String): TopicType? = TopicType.values()
80-
.filter { topicType ->
81-
when (topicType.group) {
82-
TopicTypeGroup.CUD -> config.topics.cudTopics.contains(topic)
83-
TopicTypeGroup.CDC -> (config.topics.cdcSourceIdTopics.contains(topic))
84-
|| (config.topics.cdcSchemaTopics.contains(topic))
85-
TopicTypeGroup.CYPHER -> config.topics.cypherTopics.containsKey(topic)
86-
TopicTypeGroup.PATTERN -> (topicType == TopicType.PATTERN_NODE && config.topics.nodePatternTopics.containsKey(topic))
87-
|| (topicType == TopicType.PATTERN_RELATIONSHIP && config.topics.relPatternTopics.containsKey(topic))
79+
override fun getTopicType(topic: String): TopicType? {
80+
val topicConfigMap = config.topics.asMap()
81+
return TopicType.values()
82+
.filter { topicType ->
83+
val topicConfig = topicConfigMap.getOrDefault(topicType, emptyList<Any>())
84+
when (topicConfig) {
85+
is Collection<*> -> topicConfig.contains(topic)
86+
is Map<*, *> -> topicConfig.containsKey(topic)
87+
else -> false
88+
}
8889
}
89-
}
90-
.firstOrNull()
90+
.firstOrNull()
91+
}
9192

9293
override fun getCypherTemplate(topic: String): String? = "${StreamsUtils.UNWIND} ${config.topics.cypherTopics[topic]}"
9394

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import streams.service.sink.strategy.CUDNodeRel
2424
import streams.service.sink.strategy.CUDOperations
2525
import streams.service.sink.strategy.CUDRelationship
2626
import java.util.*
27+
import java.util.stream.Collectors
28+
import java.util.stream.StreamSupport
2729
import kotlin.test.assertEquals
2830
import kotlin.test.assertFalse
2931
import kotlin.test.assertTrue
@@ -313,6 +315,90 @@ class Neo4jSinkTaskTest {
313315
}
314316
}
315317

318+
@Test
319+
fun `should insert data into Neo4j from CDC events with schema strategy`() {
320+
val firstTopic = "neotopic"
321+
val props = mapOf(Neo4jSinkConnectorConfig.SERVER_URI to db.boltURI().toString(),
322+
Neo4jSinkConnectorConfig.TOPIC_CDC_SCHEMA to firstTopic,
323+
Neo4jSinkConnectorConfig.AUTHENTICATION_TYPE to AuthenticationType.NONE.toString(),
324+
SinkTask.TOPICS_CONFIG to firstTopic)
325+
326+
val constraints = listOf(Constraint(label = "User", type = StreamsConstraintType.UNIQUE, properties = setOf("name", "surname")))
327+
val relSchema = Schema(properties = mapOf("since" to "Long"), constraints = constraints)
328+
val nodeSchema = Schema(properties = mapOf("name" to "String", "surname" to "String", "comp@ny" to "String"),
329+
constraints = constraints)
330+
val cdcDataStart = StreamsTransactionEvent(
331+
meta = Meta(timestamp = System.currentTimeMillis(),
332+
username = "user",
333+
txId = 1,
334+
txEventId = 0,
335+
txEventsCount = 3,
336+
operation = OperationType.created
337+
),
338+
payload = NodePayload(id = "0",
339+
before = null,
340+
after = NodeChange(properties = mapOf("name" to "Andrea", "surname" to "Santurbano", "comp@ny" to "LARUS-BA"), labels = listOf("User"))
341+
),
342+
schema = nodeSchema
343+
)
344+
val cdcDataEnd = StreamsTransactionEvent(
345+
meta = Meta(timestamp = System.currentTimeMillis(),
346+
username = "user",
347+
txId = 1,
348+
txEventId = 1,
349+
txEventsCount = 3,
350+
operation = OperationType.created
351+
),
352+
payload = NodePayload(id = "1",
353+
before = null,
354+
after = NodeChange(properties = mapOf("name" to "Michael", "surname" to "Hunger", "comp@ny" to "Neo4j"), labels = listOf("User"))
355+
),
356+
schema = nodeSchema
357+
)
358+
val cdcDataRelationship = StreamsTransactionEvent(
359+
meta = Meta(timestamp = System.currentTimeMillis(),
360+
username = "user",
361+
txId = 1,
362+
txEventId = 2,
363+
txEventsCount = 3,
364+
operation = OperationType.created
365+
),
366+
payload = RelationshipPayload(
367+
id = "2",
368+
start = RelationshipNodeChange(id = "0", labels = listOf("User"), ids = mapOf("name" to "Andrea", "surname" to "Santurbano")),
369+
end = RelationshipNodeChange(id = "1", labels = listOf("User"), ids = mapOf("name" to "Michael", "surname" to "Hunger")),
370+
after = RelationshipChange(properties = mapOf("since" to 2014)),
371+
before = null,
372+
label = "KNOWS WHO"
373+
),
374+
schema = relSchema
375+
)
376+
377+
val task = Neo4jSinkTask()
378+
task.initialize(mock(SinkTaskContext::class.java))
379+
task.start(props)
380+
val input = listOf(SinkRecord(firstTopic, 1, null, null, null, cdcDataStart, 42),
381+
SinkRecord(firstTopic, 1, null, null, null, cdcDataEnd, 43),
382+
SinkRecord(firstTopic, 1, null, null, null, cdcDataRelationship, 44))
383+
task.put(input)
384+
db.graph().beginTx().use {
385+
val query = """
386+
|MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
387+
|RETURN count(p) AS count
388+
|""".trimMargin()
389+
db.graph().execute(query)
390+
.columnAs<Long>("count").use {
391+
assertTrue { it.hasNext() }
392+
val count = it.next()
393+
assertEquals(1, count)
394+
assertFalse { it.hasNext() }
395+
}
396+
397+
val labels = db.graph().allLabels.stream().map { it.name() }.collect(Collectors.toSet())
398+
assertEquals(setOf("User"), labels)
399+
}
400+
}
401+
316402
@Test
317403
fun `should delete data into Neo4j from CDC events`() {
318404
db.graph().beginTx().use {

0 commit comments

Comments
 (0)