Skip to content

Commit 9f34850

Browse files
authored
fixes bug delete with pattern filtering (#366)
1 parent 40b5c2c commit 9f34850

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

producer/src/main/kotlin/streams/StreamsTransactionEventHandler.kt

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,11 @@ class StreamsTransactionEventHandler(private val router: StreamsEventRouter,
126126
.toMap()
127127

128128
// returns a Map<Boolean, List<Node>> where the K is true if the node has been deleted
129-
val removedNodeProps = allOrFiltered(txd.removedNodeProperties(), nodeAll)
130-
{ it.entity().labelNames().any { nodeRoutingLabels.contains(it) } }
129+
val removedNodeProps = txd.removedNodeProperties()
131130
.map { txd.deletedNodes().contains(it.entity()) to it }
132131
.groupBy({ it.first }, { it.second })
133132
.toMap()
134-
val removedLbls = allOrFiltered(txd.removedLabels(), nodeAll)
135-
{ nodeRoutingLabels.contains(it.label().name()) }
133+
val removedLbls = txd.removedLabels()
136134
.map { txd.deletedNodes().contains(it.node()) to it }
137135
.groupBy({ it.first }, { it.second })
138136
.toMap()
@@ -147,12 +145,10 @@ class StreamsTransactionEventHandler(private val router: StreamsEventRouter,
147145
.map { labelEntry -> labelEntry.node().id to labelEntry.label().name() } // [ (nodeId, [label]) ]
148146
.groupBy({it.first},{it.second}) // { nodeId -> [label] }
149147

150-
151148
val removedNodeProperties = removedNodeProps.getOrDefault(false, emptyList())
152149
val removedLabels = removedLbls.getOrDefault(false, emptyList())
153150

154-
val deletedPayload = allOrFiltered(txd.deletedNodes(), nodeAll)
155-
{ it.labels.any { nodeRoutingLabels.contains(it.name()) } }
151+
val deletedPayload = txd.deletedNodes()
156152
.map {
157153
val beforeNode = NodeChangeBuilder()
158154
.withLabels(deletedLabels.getOrDefault(it.id, emptyList()))

producer/src/test/kotlin/streams/integrations/KafkaEventRouterSimpleTSE.kt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package streams.integrations
22

33
import kotlinx.coroutines.async
44
import kotlinx.coroutines.runBlocking
5+
import org.hamcrest.Matchers
56
import org.junit.Test
7+
import org.neo4j.function.ThrowingSupplier
8+
import streams.Assert
69
import streams.events.EntityType
710
import streams.events.NodeChange
811
import streams.events.NodePayload
@@ -14,6 +17,7 @@ import streams.KafkaTestUtils
1417
import streams.serialization.JSONUtils
1518
import streams.setConfig
1619
import streams.start
20+
import java.util.concurrent.TimeUnit
1721
import kotlin.test.assertEquals
1822

1923
class KafkaEventRouterSimpleTSE: KafkaEventRouterBaseTSE() {
@@ -131,4 +135,36 @@ class KafkaEventRouterSimpleTSE: KafkaEventRouterBaseTSE() {
131135
}
132136
})
133137
}
138+
139+
@Test
140+
fun testDeleteNode() = runBlocking {
141+
val topic = "testdeletetopic"
142+
db.setConfig("streams.source.topic.nodes.$topic.from.neo4j", "Person:Neo4j{*}")
143+
.setConfig("streams.source.topic.relationships.$topic.from.neo4j", "KNOWS{*}")
144+
.start()
145+
kafkaConsumer.subscribe(listOf(topic))
146+
db.execute("CREATE (:Person:ToRemove {name:'John Doe', age:42})-[:KNOWS]->(:Person {name:'Jane Doe', age:36})")
147+
Assert.assertEventually(ThrowingSupplier {
148+
kafkaConsumer.poll(5000).count() > 0
149+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
150+
db.execute("MATCH (p:Person {name:'John Doe', age:42}) REMOVE p:ToRemove")
151+
Assert.assertEventually(ThrowingSupplier {
152+
kafkaConsumer.poll(5000)
153+
.map { JSONUtils.asStreamsTransactionEvent(it.value()) }
154+
.filter { it.meta.operation == OperationType.updated }
155+
.count() > 0
156+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
157+
db.execute("MATCH (p:Person) DETACH DELETE p")
158+
val count = db.execute("MATCH (p:Person {name:'John Doe', age:42}) RETURN count(p) AS count") {
159+
it.columnAs<Long>("count").next()
160+
}
161+
assertEquals(0, count)
162+
Assert.assertEventually(ThrowingSupplier {
163+
val count = kafkaConsumer.poll(5000)
164+
.map { JSONUtils.asStreamsTransactionEvent(it.value()) }
165+
.filter { it.meta.operation == OperationType.deleted }
166+
.count()
167+
count > 0
168+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
169+
}
134170
}

0 commit comments

Comments
 (0)