Skip to content

Commit 776d912

Browse files
committed
fixed tests
1 parent 73fe355 commit 776d912

File tree

21 files changed

+123
-161
lines changed

21 files changed

+123
-161
lines changed

common/src/main/kotlin/streams/config/StreamsConfig.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@ class StreamsConfig(private val neo4jConfig: Config, logService: LogService) : L
2929

3030
override fun init() {
3131
log.debug("Init StreamsConfig")
32-
3332
loadConfiguration()
3433
afterInitListeners.forEach { it(config) }
34+
println("Neo4j Streams configuration initialised: $config")
3535
}
3636

37-
38-
3937
private fun loadConfiguration() {
4038
val neo4jConfFolder = System.getenv().getOrDefault("NEO4J_CONF", determineNeo4jConfFolder())
4139

common/src/test/kotlin/streams/utils/JSONUtilsTest.kt

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
package streams.serialization
22

33
import org.junit.Test
4-
import org.neo4j.driver.v1.Values
5-
import org.neo4j.values.storable.CoordinateReferenceSystem.*
4+
import org.neo4j.driver.Values
5+
import org.neo4j.values.storable.CoordinateReferenceSystem.Cartesian
6+
import org.neo4j.values.storable.CoordinateReferenceSystem.Cartesian_3D
7+
import org.neo4j.values.storable.CoordinateReferenceSystem.WGS84
8+
import org.neo4j.values.storable.CoordinateReferenceSystem.WGS84_3D
69
import org.neo4j.values.storable.DateTimeValue.datetime
710
import org.neo4j.values.storable.DateValue.date
811
import org.neo4j.values.storable.TimeValue.time
912
import org.neo4j.values.storable.Values.pointValue
10-
import streams.events.*
13+
import streams.events.EntityType
14+
import streams.events.Meta
15+
import streams.events.NodeChange
16+
import streams.events.NodePayload
17+
import streams.events.OperationType
18+
import streams.events.Schema
19+
import streams.events.StreamsTransactionEvent
1120
import java.time.ZoneOffset.UTC
1221
import kotlin.test.assertEquals
1322
import kotlin.test.assertFails

consumer/src/main/kotlin/streams/StreamsTopicService.kt

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,6 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
3636
else -> throw RuntimeException("Unsupported data $data for topic type $topicType")
3737
}
3838
node.setProperty("data", newData)
39-
// if (properties.hasProperty(topicType.key)) {
40-
// val topicData = JSONUtils.readValue<Any>(properties.getProperty(topicType.key))
41-
// val newData = when (topicData) {
42-
// is Map<*, *> -> topicData + (data as Map<String, Any?>)
43-
// is Collection<*> -> topicData + (data as Collection<String>)
44-
// else -> throw RuntimeException("Unsupported data $data for topic type $topicType")
45-
// }
46-
// properties.setProperty(topicType.key, JSONUtils.writeValueAsString(newData))
47-
// } else {
48-
// properties.setProperty(topicType.key, JSONUtils.writeValueAsString(data))
49-
// }
50-
// it.success()
5139
}
5240
}
5341

@@ -76,25 +64,6 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
7664
} else {
7765
node.setProperty(topicType.key, filteredData)
7866
}
79-
// if (properties.hasProperty(topicType.key)) {
80-
// val topicData = JSONUtils.readValue<Any>(properties.getProperty(topicType.key))
81-
// val newData = when (topicData) {
82-
// is Map<*, *> -> topicData.filterKeys { it.toString() != topic }
83-
// is Collection<*> -> topicData.filter { it.toString() != topic }
84-
// else -> throw RuntimeException("Unsupported data $topicData for topic type $topicType")
85-
// }
86-
// val isEmpty = when (newData) {
87-
// is Map<*, *> -> newData.isEmpty()
88-
// is Collection<*> -> newData.isEmpty()
89-
// else -> throw RuntimeException("Unsupported data $topicData for topic type $topicType")
90-
// }
91-
// if (isEmpty) {
92-
// properties.removeProperty(topicType.key)
93-
// } else {
94-
// properties.setProperty(topicType.key, JSONUtils.writeValueAsString(newData))
95-
// }
96-
// }
97-
// it.success()
9867
}
9968
}
10069

@@ -116,18 +85,6 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
11685
}
11786
}
11887
}
119-
// .find {
120-
// if (!properties.hasProperty(it.key)) {
121-
// false
122-
// } else {
123-
// val data = JSONUtils.readValue<Any>(properties.getProperty(it.key))
124-
// when (data) {
125-
// is Map<*, *> -> data.containsKey(topic)
126-
// is Collection<*> -> data.contains(topic)
127-
// else -> false
128-
// }
129-
// }
130-
// }
13188
}
13289
}
13390

@@ -148,16 +105,6 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
148105
}
149106

150107
}.toSet() as Set<String>
151-
// TopicType.values()
152-
// .filter { properties.hasProperty(it.key) }
153-
// .flatMap {
154-
// val data = JSONUtils.readValue<Any>(properties.getProperty(it.key))
155-
// when (data) {
156-
// is Map<*, *> -> data.keys
157-
// is Collection<*> -> data.toSet()
158-
// else -> emptySet()
159-
// }
160-
// }.toSet() as Set<String>
161108
}
162109

163110
fun setAll(topics: Topics) {
@@ -167,12 +114,6 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
167114
}
168115

169116
fun getCypherTemplate(topic: String) = db.beginTx().use {
170-
// if (properties.hasProperty(TopicType.CYPHER.key)) {
171-
// val data = JSONUtils.readValue<Map<String, String>>(properties.getProperty(TopicType.CYPHER.key))
172-
// data[topic]
173-
// } else {
174-
// null
175-
// }
176117
db.beginTx().use {
177118
val topicTypeLabel = Label.label(TopicType.CYPHER.key)
178119
val findNodes = it.findNodes(topicTypeLabel)
@@ -197,10 +138,6 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
197138
}
198139
}
199140
.toMap()
200-
// TopicType.values()
201-
// .filter { properties.hasProperty(it.key) }
202-
// .map { it to JSONUtils.readValue<Any>(properties.getProperty(it.key)) }
203-
// .toMap()
204141
}
205142

206143
}

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkNoTopicAutocreationIT.kt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ import org.junit.BeforeClass
1212
import org.junit.Test
1313
import org.neo4j.function.ThrowingSupplier
1414
import org.neo4j.kernel.internal.GraphDatabaseAPI
15-
import org.neo4j.test.TestGraphDatabaseFactory
1615
import org.neo4j.test.assertion.Assert
16+
import org.neo4j.test.rule.ImpermanentDbmsRule
1717
import org.testcontainers.containers.KafkaContainer
18+
import streams.extensions.execute
1819
import streams.serialization.JSONUtils
20+
import streams.setConfig
21+
import streams.start
1922
import streams.utils.StreamsUtils
20-
import java.util.*
23+
import java.util.UUID
2124
import java.util.concurrent.TimeUnit
22-
import kotlin.test.assertEquals
2325
import kotlin.test.assertTrue
2426

2527
class KafkaEventSinkNoTopicAutoCreationIT {
@@ -75,13 +77,12 @@ class KafkaEventSinkNoTopicAutoCreationIT {
7577
val topicList = client.listTopics().names().get()
7678
val notRegisteredTopic = "notRegistered"
7779
assertTrue { topicList.containsAll(expectedTopics.toSet()) && !topicList.contains(notRegisteredTopic) }
78-
val db = TestGraphDatabaseFactory()
79-
.newImpermanentDatabaseBuilder()
80+
val db = ImpermanentDbmsRule()
8081
.setConfig("kafka.bootstrap.servers", kafka.bootstrapServers)
8182
.setConfig("streams.sink.enabled", "true")
8283
.setConfig("streams.sink.topic.cypher.$notRegisteredTopic", "MERGE (p:NotRegisteredTopic{name: event.name})")
8384
.setConfig("streams.sink.topic.cypher.$topic", "MERGE (p:Person{name: event.name})")
84-
.newGraphDatabase() as GraphDatabaseAPI
85+
.start()
8586
val kafkaProducer: KafkaProducer<String, ByteArray> = createProducer(kafka = kafka)
8687

8788
// when
@@ -91,9 +92,10 @@ class KafkaEventSinkNoTopicAutoCreationIT {
9192

9293
// then
9394
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
94-
val count = db.execute("MATCH (n:Person) RETURN COUNT(n) AS count")
95-
.columnAs<Long>("count")
96-
.next()
95+
val count = db.execute("MATCH (n:Person) RETURN COUNT(n) AS count") {
96+
it.columnAs<Long>("count")
97+
.next()
98+
}
9799
val topics = client.listTopics().names().get()
98100
count == 1L && !topics.contains(notRegisteredTopic)
99101
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
package streams
22

3+
import org.junit.Assume
34
import org.neo4j.test.rule.DbmsRule
45
import streams.config.StreamsConfig
56

67
fun DbmsRule.setConfig(key: String, value: String): DbmsRule {
78
StreamsConfig.registerListener { it.put(key, value) }
89
return this
9-
}
10+
}
11+
12+
fun DbmsRule.start(timeout: Long = 5000): DbmsRule {
13+
val before = DbmsRule::class.java.getDeclaredMethod("before")
14+
before.isAccessible = true
15+
before.invoke(this)
16+
Assume.assumeTrue(this.isAvailable(timeout))
17+
return this
18+
}

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
11
package streams.kafka.connect.sink
22

3-
import kotlinx.coroutines.*
3+
import kotlinx.coroutines.Deferred
4+
import kotlinx.coroutines.Dispatchers
5+
import kotlinx.coroutines.ExperimentalCoroutinesApi
6+
import kotlinx.coroutines.ObsoleteCoroutinesApi
7+
import kotlinx.coroutines.async
48
import kotlinx.coroutines.channels.ticker
9+
import kotlinx.coroutines.coroutineScope
10+
import kotlinx.coroutines.runBlocking
511
import kotlinx.coroutines.selects.whileSelect
612
import org.apache.kafka.common.config.ConfigException
713
import org.apache.kafka.connect.errors.ConnectException
8-
import org.neo4j.driver.v1.AuthTokens
9-
import org.neo4j.driver.v1.Config
10-
import org.neo4j.driver.v1.Driver
11-
import org.neo4j.driver.v1.GraphDatabase
12-
import org.neo4j.driver.v1.exceptions.ClientException
13-
import org.neo4j.driver.v1.exceptions.TransientException
14-
import org.neo4j.driver.v1.net.ServerAddress
14+
import org.neo4j.driver.AuthTokens
15+
import org.neo4j.driver.Config
16+
import org.neo4j.driver.Driver
17+
import org.neo4j.driver.GraphDatabase
18+
import org.neo4j.driver.exceptions.ClientException
19+
import org.neo4j.driver.exceptions.TransientException
20+
import org.neo4j.driver.net.ServerAddress
1521
import org.slf4j.Logger
1622
import org.slf4j.LoggerFactory
1723
import streams.kafka.connect.sink.converters.Neo4jValueConverter
1824
import streams.service.StreamsSinkEntity
1925
import streams.service.StreamsSinkService
2026
import streams.service.TopicType
21-
import streams.service.TopicTypeGroup
2227
import streams.utils.StreamsUtils
2328
import streams.utils.retryForException
2429
import java.util.concurrent.CopyOnWriteArraySet
@@ -35,7 +40,7 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
3540
private val driver: Driver
3641

3742
init {
38-
val configBuilder = Config.build()
43+
val configBuilder = Config.builder()
3944
if (this.config.encryptionEnabled) {
4045
configBuilder.withEncryption()
4146
val trustStrategy: Config.TrustStrategy = when (this.config.encryptionTrustStrategy) {
@@ -64,10 +69,9 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig):
6469
configBuilder.withMaxConnectionPoolSize(this.config.connectionPoolMaxSize)
6570
configBuilder.withMaxConnectionLifetime(this.config.connectionMaxConnectionLifetime, TimeUnit.MILLISECONDS)
6671
configBuilder.withConnectionAcquisitionTimeout(this.config.connectionAcquisitionTimeout, TimeUnit.MILLISECONDS)
67-
configBuilder.withLoadBalancingStrategy(this.config.loadBalancingStrategy)
6872
configBuilder.withMaxTransactionRetryTime(config.retryBackoff, TimeUnit.MILLISECONDS)
6973
configBuilder.withResolver { address -> this.config.serverUri.map { ServerAddress.of(it.host, it.port) }.toSet() }
70-
val neo4jConfig = configBuilder.toConfig()
74+
val neo4jConfig = configBuilder.build()
7175

7276
this.driver = GraphDatabase.driver(this.config.serverUri.firstOrNull(), authToken, neo4jConfig)
7377
}

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfig.kt

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import org.apache.kafka.common.config.ConfigDef
1111
import org.apache.kafka.common.config.ConfigException
1212
import org.apache.kafka.connect.sink.SinkTask
1313
import org.neo4j.driver.internal.async.pool.PoolSettings
14-
import org.neo4j.driver.v1.Config
14+
import org.neo4j.driver.Config
1515
import streams.kafka.connect.utils.PropertiesUtil
1616
import streams.service.TopicType
1717
import streams.service.TopicUtils
@@ -52,7 +52,6 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
5252
val connectionLifenessCheckTimeout: Long
5353
val connectionPoolMaxSize: Int
5454
val connectionAcquisitionTimeout: Long
55-
val loadBalancingStrategy: Config.LoadBalancingStrategy
5655

5756
val retryBackoff: Long
5857
val retryMaxAttempts: Int
@@ -90,8 +89,6 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
9089
connectionMaxConnectionLifetime = getLong(CONNECTION_MAX_CONNECTION_LIFETIME_MSECS)
9190
connectionPoolMaxSize = getInt(CONNECTION_POOL_MAX_SIZE)
9291
connectionAcquisitionTimeout = getLong(CONNECTION_MAX_CONNECTION_ACQUISITION_TIMEOUT_MSECS)
93-
loadBalancingStrategy = ConfigUtils
94-
.getEnum(Config.LoadBalancingStrategy::class.java, this, CONNECTION_LOAD_BALANCE_STRATEGY)
9592

9693
retryBackoff = getLong(RETRY_BACKOFF_MSECS)
9794
retryMaxAttempts = getInt(RETRY_MAX_ATTEMPTS)
@@ -254,14 +251,6 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
254251
.group(ConfigGroup.CONNECTION)
255252
.validator(ConfigDef.Range.atLeast(1))
256253
.build())
257-
.define(ConfigKeyBuilder
258-
.of(CONNECTION_LOAD_BALANCE_STRATEGY, ConfigDef.Type.STRING)
259-
.documentation(PropertiesUtil.getProperty(CONNECTION_LOAD_BALANCE_STRATEGY))
260-
.importance(ConfigDef.Importance.LOW)
261-
.defaultValue(Config.LoadBalancingStrategy.LEAST_CONNECTED.toString())
262-
.group(ConfigGroup.CONNECTION)
263-
.validator(ValidEnum.of(Config.LoadBalancingStrategy::class.java))
264-
.build())
265254
.define(ConfigKeyBuilder
266255
.of(ENCRYPTION_ENABLED, ConfigDef.Type.BOOLEAN)
267256
.documentation(PropertiesUtil.getProperty(ENCRYPTION_ENABLED))

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package streams.kafka.connect.sink.converters
22

3-
import org.neo4j.driver.v1.Value
4-
import org.neo4j.driver.v1.Values
3+
import org.neo4j.driver.Value
4+
import org.neo4j.driver.Values
55
import java.time.LocalTime
66
import java.time.ZoneId
7-
import java.util.*
7+
import java.util.Date
88
import java.util.concurrent.TimeUnit
99

1010

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkConnectorConfigTest.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import org.apache.kafka.common.config.ConfigException
66
import org.apache.kafka.connect.sink.SinkConnector
77
import org.junit.Test
88
import org.neo4j.driver.internal.async.pool.PoolSettings
9-
import org.neo4j.driver.v1.Config
9+
import org.neo4j.driver.Config
1010
import kotlin.test.assertEquals
1111
import kotlin.test.assertFalse
1212
import kotlin.test.assertNull
@@ -73,7 +73,6 @@ class Neo4jSinkConnectorConfigTest {
7373
assertEquals(PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, config.connectionLifenessCheckTimeout)
7474
assertEquals(Neo4jSinkConnectorConfig.CONNECTION_POOL_MAX_SIZE_DEFAULT, config.connectionPoolMaxSize)
7575
assertEquals(PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, config.connectionAcquisitionTimeout)
76-
assertEquals(Config.LoadBalancingStrategy.LEAST_CONNECTED, config.loadBalancingStrategy)
7776
assertEquals(Neo4jSinkConnectorConfig.BATCH_TIMEOUT_DEFAULT, config.batchTimeout)
7877
}
7978

@@ -120,7 +119,6 @@ class Neo4jSinkConnectorConfigTest {
120119
assertEquals(PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, config.connectionLifenessCheckTimeout)
121120
assertEquals(Neo4jSinkConnectorConfig.CONNECTION_POOL_MAX_SIZE_DEFAULT, config.connectionPoolMaxSize)
122121
assertEquals(PoolSettings.DEFAULT_CONNECTION_ACQUISITION_TIMEOUT, config.connectionAcquisitionTimeout)
123-
assertEquals(Config.LoadBalancingStrategy.LEAST_CONNECTED, config.loadBalancingStrategy)
124122
assertEquals(Neo4jSinkConnectorConfig.BATCH_TIMEOUT_DEFAULT, config.batchTimeout)
125123
}
126124

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,20 +372,21 @@ class Neo4jSinkTaskTest {
372372
SinkRecord(firstTopic, 1, null, null, null, cdcDataEnd, 43),
373373
SinkRecord(firstTopic, 1, null, null, null, cdcDataRelationship, 44))
374374
task.put(input)
375-
db.graph().beginTx().use {
375+
db.defaultDatabaseService().beginTx().use {
376376
val query = """
377377
|MATCH p = (s:User{name:'Andrea', surname:'Santurbano', `comp@ny`:'LARUS-BA'})-[r:`KNOWS WHO`{since:2014}]->(e:User{name:'Michael', surname:'Hunger', `comp@ny`:'Neo4j'})
378378
|RETURN count(p) AS count
379379
|""".trimMargin()
380-
db.graph().execute(query)
380+
it.execute(query)
381381
.columnAs<Long>("count").use {
382382
assertTrue { it.hasNext() }
383383
val count = it.next()
384384
assertEquals(1, count)
385385
assertFalse { it.hasNext() }
386386
}
387387

388-
val labels = db.graph().allLabels.stream().map { it.name() }.collect(Collectors.toSet())
388+
val labels = db.defaultDatabaseService().beginTx()
389+
.use { StreamSupport.stream(it.allLabels.spliterator(), false).map { it.name() }.collect(Collectors.toSet()) }
389390
assertEquals(setOf("User"), labels)
390391
}
391392
}

0 commit comments

Comments
 (0)