@@ -7,17 +7,26 @@ import org.apache.kafka.clients.producer.ProducerConfig
77import org.apache.kafka.clients.producer.ProducerRecord
88import org.apache.kafka.common.serialization.ByteArraySerializer
99import org.apache.kafka.common.serialization.StringSerializer
10+ import org.hamcrest.Matchers
11+ import org.hamcrest.Matchers.equalTo
1012import org.junit.*
1113import org.junit.rules.TestName
14+ import org.neo4j.function.ThrowingSupplier
1215import org.neo4j.graphdb.Node
16+ import org.neo4j.graphdb.factory.GraphDatabaseBuilder
1317import org.neo4j.graphdb.schema.ConstraintType
1418import org.neo4j.kernel.internal.GraphDatabaseAPI
1519import org.neo4j.test.TestGraphDatabaseFactory
20+ import org.neo4j.test.assertion.Assert.assertEventually
1621import org.testcontainers.containers.KafkaContainer
1722import streams.events.*
1823import streams.serialization.JSONUtils
1924import streams.utils.StreamsUtils
25+ import java.io.File
26+ import java.io.IOException
27+ import java.io.UncheckedIOException
2028import java.util.*
29+ import java.util.concurrent.TimeUnit
2130import kotlin.test.assertEquals
2231import kotlin.test.assertFalse
2332import kotlin.test.assertTrue
@@ -61,20 +70,13 @@ class KafkaEventSinkIT {
6170 }
6271 }
6372
73+ private lateinit var graphDatabaseBuilder: GraphDatabaseBuilder
6474 private lateinit var db: GraphDatabaseAPI
6575
6676 private val cypherQueryTemplate = " MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties"
6777
6878 private val topics = listOf (" shouldWriteCypherQuery" )
6979
70- @Rule
71- @JvmField
72- var testName = TestName ()
73-
74- private val EXCLUDE_LOAD_TOPIC_METHOD_SUFFIX = " WithNoTopicLoaded"
75- private val WITH_CDC_TOPIC_METHOD_SUFFIX = " WithCDCTopic"
76- private val WITH_CDC_SCHEMA_TOPIC_METHOD_SUFFIX = " WithCDCSchemaTopic"
77-
7880 private val kafkaProperties = Properties ()
7981
8082 private lateinit var kafkaProducer: KafkaProducer <String , ByteArray >
@@ -85,25 +87,10 @@ class KafkaEventSinkIT {
8587
8688 @Before
8789 fun setUp () {
88- var graphDatabaseBuilder = TestGraphDatabaseFactory ()
90+ graphDatabaseBuilder = TestGraphDatabaseFactory ()
8991 .newImpermanentDatabaseBuilder()
9092 .setConfig(" kafka.bootstrap.servers" , kafka.bootstrapServers)
9193 .setConfig(" streams.sink.enabled" , " true" )
92- graphDatabaseBuilder = if (! testName.methodName.endsWith(EXCLUDE_LOAD_TOPIC_METHOD_SUFFIX )) {
93- if (testName.methodName.endsWith(WITH_CDC_TOPIC_METHOD_SUFFIX )) {
94- graphDatabaseBuilder.setConfig(" streams.sink.topic.cdc.sourceId" , " cdctopic" )
95- graphDatabaseBuilder.setConfig(" streams.sink.topic.cdc.sourceId.idName" , " customIdN@me" )
96- graphDatabaseBuilder.setConfig(" streams.sink.topic.cdc.sourceId.labelName" , " CustomLabelN@me" )
97- } else if (testName.methodName.endsWith(WITH_CDC_SCHEMA_TOPIC_METHOD_SUFFIX )) {
98- graphDatabaseBuilder.setConfig(" streams.sink.topic.cdc.schema" , " cdctopic" )
99- } else {
100- graphDatabaseBuilder.setConfig(" streams.sink.topic.cypher.shouldWriteCypherQuery" , cypherQueryTemplate)
101- }
102- graphDatabaseBuilder
103- } else {
104- graphDatabaseBuilder
105- }
106- db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
10794
10895 kafkaProperties[ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ] = kafka.bootstrapServers
10996 kafkaProperties[" zookeeper.connect" ] = kafka.envMap[" KAFKA_ZOOKEEPER_CONNECT" ]
@@ -122,9 +109,11 @@ class KafkaEventSinkIT {
122109
123110 @Test
124111 fun shouldWriteDataFromSink () = runBlocking {
112+ graphDatabaseBuilder.setConfig(" streams.sink.topic.cypher.shouldWriteCypherQuery" , cypherQueryTemplate)
113+ db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
114+
125115 val producerRecord = ProducerRecord (topics[0 ], UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(data))
126116 kafkaProducer.send(producerRecord).get()
127- delay(5000 )
128117 val props = data
129118 .flatMap {
130119 if (it.key == " properties" ) {
@@ -135,28 +124,42 @@ class KafkaEventSinkIT {
135124 }
136125 }
137126 .toMap()
138- db.execute(" MATCH (n:Label) WHERE properties(n) = {props} RETURN count(*) AS count" , mapOf (" props" to props))
139- .columnAs<Long >(" count" ).use {
140- assertTrue { it.hasNext() }
141- val count = it.next()
142- assertEquals(1 , count)
143- assertFalse { it.hasNext() }
144- }
127+
128+ assertEventually(ThrowingSupplier <Boolean , Exception > {
129+ val query = """
130+ |MATCH (n:Label) WHERE properties(n) = {props}
131+ |RETURN count(*) AS count""" .trimMargin()
132+ val result = db.execute(query, mapOf (" props" to props)).columnAs<Long >(" count" )
133+ result.hasNext() && result.next() == 1L && ! result.hasNext()
134+ }, equalTo(true ), 30 , TimeUnit .SECONDS )
135+
145136 }
146137
147138 @Test
148139 fun shouldNotWriteDataFromSinkWithNoTopicLoaded () = runBlocking {
140+ db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
141+
149142 val producerRecord = ProducerRecord (topics[0 ], UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(data))
150143 kafkaProducer.send(producerRecord).get()
151144 delay(5000 )
152- db.execute(" MATCH (n:Label) RETURN n" )
153- .columnAs<Node >(" n" ).use {
154- assertFalse { it.hasNext() }
155- }
145+
146+ assertEventually(ThrowingSupplier <Boolean , Exception > {
147+ val query = """
148+ |MATCH (n:Label)
149+ |RETURN n""" .trimMargin()
150+ val result = db.execute(query).columnAs<Node >(" n" )
151+ result.hasNext()
152+ }, equalTo(false ), 30 , TimeUnit .SECONDS )
156153 }
157154
158155 @Test
159156 fun shouldWriteDataFromSinkWithCDCTopic () = runBlocking {
157+ val topic = UUID .randomUUID().toString()
158+ graphDatabaseBuilder.setConfig(" streams.sink.topic.cdc.sourceId" , topic)
159+ graphDatabaseBuilder.setConfig(" streams.sink.topic.cdc.sourceId.idName" , " customIdN@me" )
160+ graphDatabaseBuilder.setConfig(" streams.sink.topic.cdc.sourceId.labelName" , " CustomLabelN@me" )
161+ db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
162+
160163 val cdcDataStart = StreamsTransactionEvent (meta = Meta (timestamp = System .currentTimeMillis(),
161164 username = " user" ,
162165 txId = 1 ,
@@ -200,27 +203,29 @@ class KafkaEventSinkIT {
200203 ),
201204 schema = Schema ()
202205 )
203- var producerRecord = ProducerRecord (" cdctopic " , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataStart))
206+ var producerRecord = ProducerRecord (topic , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataStart))
204207 kafkaProducer.send(producerRecord).get()
205- producerRecord = ProducerRecord (" cdctopic " , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataEnd))
208+ producerRecord = ProducerRecord (topic , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataEnd))
206209 kafkaProducer.send(producerRecord).get()
207- producerRecord = ProducerRecord (" cdctopic " , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataRelationship))
210+ producerRecord = ProducerRecord (topic , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataRelationship))
208211 kafkaProducer.send(producerRecord).get()
209- delay(5000 )
210- db.execute(" MATCH p = (s:User:`CustomLabelN@me`{name:'Andrea', `comp@ny`:'LARUS-BA', `customIdN@me`: '0'})" +
211- " -[r:`KNOWS WHO`{since:2014, `customIdN@me`: '3'}]->" +
212- " (e:`User Ext`:`CustomLabelN@me`{name:'Michael', `comp@ny`:'Neo4j', `customIdN@me`: '1'}) " +
213- " RETURN count(p) AS count" )
214- .columnAs<Long >(" count" ).use {
215- assertTrue { it.hasNext() }
216- val count = it.next()
217- assertEquals(1 , count)
218- assertFalse { it.hasNext() }
219- }
212+
213+ assertEventually(ThrowingSupplier <Boolean , Exception > {
214+ val query = """
215+ |MATCH p = (s:User:`CustomLabelN@me`{name:'Andrea', `comp@ny`:'LARUS-BA', `customIdN@me`: '0'})-[r:`KNOWS WHO`{since:2014, `customIdN@me`: '3'}]->(e:`User Ext`:`CustomLabelN@me`{name:'Michael', `comp@ny`:'Neo4j', `customIdN@me`: '1'})
216+ |RETURN count(p) AS count""" .trimMargin()
217+ val result = db.execute(query).columnAs<Long >(" count" )
218+ result.hasNext() && result.next() == 1L && ! result.hasNext()
219+ }, equalTo(true ), 30 , TimeUnit .SECONDS )
220+
220221 }
221222
222223 @Test
223224 fun shouldWriteDataFromSinkWithCDCSchemaTopic () = runBlocking {
225+ val topic = UUID .randomUUID().toString()
226+ graphDatabaseBuilder.setConfig(" streams.sink.topic.cdc.schema" , topic)
227+ db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
228+
224229 val nodeSchema = Schema (properties = mapOf (" name" to " String" , " surname" to " String" , " comp@ny" to " String" ),
225230 constraints = listOf (Constraint (label = " User" , type = StreamsConstraintType .UNIQUE , properties = setOf (" name" , " surname" ))))
226231 val cdcDataStart = StreamsTransactionEvent (meta = Meta (timestamp = System .currentTimeMillis(),
@@ -266,24 +271,29 @@ class KafkaEventSinkIT {
266271 ),
267272 schema = Schema ()
268273 )
269- var producerRecord = ProducerRecord (" cdctopic " , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataStart))
274+ var producerRecord = ProducerRecord (topic , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataStart))
270275 kafkaProducer.send(producerRecord).get()
271- producerRecord = ProducerRecord (" cdctopic " , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataEnd))
276+ producerRecord = ProducerRecord (topic , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataEnd))
272277 kafkaProducer.send(producerRecord).get()
273- producerRecord = ProducerRecord (" cdctopic " , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataRelationship))
278+ producerRecord = ProducerRecord (topic , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataRelationship))
274279 kafkaProducer.send(producerRecord).get()
275- delay(5000 )
276- db.execute(" MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) RETURN count(p) AS count" )
277- .columnAs<Long >(" count" ).use {
278- assertTrue { it.hasNext() }
279- val count = it.next()
280- assertEquals(1 , count)
281- assertFalse { it.hasNext() }
282- }
280+
281+ assertEventually(ThrowingSupplier <Boolean , Exception > {
282+ val query = """
283+ |MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
284+ |RETURN count(p) AS count
285+ |""" .trimMargin()
286+ val result = db.execute(query).columnAs<Long >(" count" )
287+ result.hasNext() && result.next() == 1L && ! result.hasNext()
288+ }, equalTo(true ), 30 , TimeUnit .SECONDS )
283289 }
284290
285291 @Test
286292 fun shouldDeleteDataFromSinkWithCDCSchemaTopic () = runBlocking {
293+ val topic = UUID .randomUUID().toString()
294+ graphDatabaseBuilder.setConfig(" streams.sink.topic.cdc.schema" , topic)
295+ db = graphDatabaseBuilder.newGraphDatabase() as GraphDatabaseAPI
296+
287297 db.execute(" CREATE (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})" ).close()
288298 val nodeSchema = Schema (properties = mapOf (" name" to " String" , " surname" to " String" , " comp@ny" to " String" ),
289299 constraints = listOf (Constraint (label = " User" , type = StreamsConstraintType .UNIQUE , properties = setOf (" name" , " surname" ))))
@@ -301,16 +311,17 @@ class KafkaEventSinkIT {
301311 schema = nodeSchema
302312 )
303313
304- var producerRecord = ProducerRecord (" cdctopic " , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataStart))
314+ val producerRecord = ProducerRecord (topic , UUID .randomUUID().toString(), JSONUtils .writeValueAsBytes(cdcDataStart))
305315 kafkaProducer.send(producerRecord).get()
306- delay(5000 )
307- db.execute(" MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'}) RETURN count(p) AS count" )
308- .columnAs<Long >(" count" ).use {
309- assertTrue { it.hasNext() }
310- val count = it.next()
311- assertEquals(0 , count)
312- assertFalse { it.hasNext() }
313- }
316+
317+ assertEventually(ThrowingSupplier <Boolean , Exception > {
318+ val query = """
319+ |MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
320+ |RETURN count(p) AS count
321+ |""" .trimMargin()
322+ val result = db.execute(query).columnAs<Long >(" count" )
323+ result.hasNext() && result.next() == 0L && ! result.hasNext()
324+ }, equalTo(true ), 30 , TimeUnit .SECONDS )
314325 }
315326
316327}
0 commit comments