Skip to content

Commit 273c6fa

Browse files
conker84mneedham
authored andcommitted
fixes #162: Conversion error in case of nested field of array<Struct> records #163
1 parent e8dd829 commit 273c6fa

File tree

4 files changed

+251
-31
lines changed

4 files changed

+251
-31
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import streams.utils.StreamsUtils
1919
import streams.utils.retryForException
2020
import org.apache.kafka.connect.errors.ConnectException
2121
import org.apache.kafka.connect.sink.SinkRecord
22+
import org.neo4j.driver.v1.exceptions.Neo4jException
2223
import java.util.concurrent.TimeUnit
2324

2425

@@ -28,6 +29,10 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
2829

2930
private val driver: Driver
3031

32+
private val skipNeo4jErrors = listOf("Neo.ClientError.Statement.PropertyNotFound",
33+
"Neo.ClientError.Statement.SemanticError",
34+
"Neo.ClientError.Statement.ParameterMissing")
35+
3136
init {
3237
val configBuilder = Config.build()
3338
if (this.config.encryptionEnabled) {
@@ -77,17 +82,29 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
7782
runBlocking {
7883
retryForException<Unit>(exceptions = arrayOf(ClientException::class.java, TransientException::class.java),
7984
retries = config.retryMaxAttempts, delayTime = 0) { // we use the delayTime = 0, because we delegate the retryBackoff to the Neo4j Java Driver
80-
session.writeTransaction {
81-
val result = it.run(query, data)
82-
if (log.isDebugEnabled) {
83-
log.debug("Successfully executed query: `$query`. Summary: ${result.summary()}")
85+
try {
86+
session.writeTransaction {
87+
val result = it.run(query, data)
88+
if (log.isDebugEnabled) {
89+
log.debug("Successfully executed query, summary: ${result.summary()}")
90+
}
91+
}
92+
} catch (neoException: Neo4jException) {
93+
if (skipNeo4jErrors.contains(neoException.code())) {
94+
log.info("Skip query: `$query`. Error message `${neoException.message}`. Error code: `${neoException.code()}`")
95+
} else {
96+
throw neoException
8497
}
8598
}
8699
}
87100
}
88101
} catch (e: Exception) {
89102
if (log.isDebugEnabled) {
90-
log.debug("Exception `${e.message}` while executing query: `$query`, with data: `$data`")
103+
if (e is Neo4jException) {
104+
log.debug("Exception `${e.message}` with code `${e.code()}` while executing query `$query`, with data `$data`")
105+
} else {
106+
log.debug("Exception `${e.message}` while executing query `$query`, with data `$data`")
107+
}
91108
}
92109
throw e
93110
}
Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,107 +1,130 @@
11
package streams.kafka.connect.sink
22

33
import com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter
4+
import com.google.common.base.Preconditions
45
import org.apache.kafka.connect.data.Schema
56
import org.apache.kafka.connect.data.Struct
7+
import org.apache.kafka.connect.errors.DataException
68
import org.neo4j.driver.v1.Values
79
import java.math.BigDecimal
10+
import java.math.BigInteger
811
import java.time.LocalTime
912
import java.time.ZoneId
1013
import java.util.*
1114
import java.util.concurrent.TimeUnit
1215

1316

14-
class ValueConverter: AbstractConverter<MutableMap<String, Any>>() {
17+
class ValueConverter: AbstractConverter<MutableMap<String, Any?>>() {
1518

1619
private val UTC = ZoneId.of("UTC")
1720

18-
private fun setValue(result: MutableMap<String, Any>?, fieldName: String?, value: Any?) {
19-
if (result != null && fieldName != null && value != null) {
20-
result[fieldName] = Values.value(value)
21+
private fun setValue(result: MutableMap<String, Any?>?, fieldName: String?, value: Any?) {
22+
if (result != null && fieldName != null) {
23+
result[fieldName] = Values.value(value) ?: Values.NULL
2124
}
2225
}
2326

24-
override fun newValue(): MutableMap<String, Any> {
27+
override fun newValue(): MutableMap<String, Any?> {
2528
return mutableMapOf()
2629
}
2730

28-
override fun setBytesField(result: MutableMap<String, Any>?, fieldName: String?, value: ByteArray?) {
31+
override fun setBytesField(result: MutableMap<String, Any?>?, fieldName: String?, value: ByteArray?) {
2932
setValue(result, fieldName, value)
3033
}
3134

32-
override fun setStringField(result: MutableMap<String, Any>?, fieldName: String?, value: String?) {
35+
override fun setStringField(result: MutableMap<String, Any?>?, fieldName: String?, value: String?) {
3336
setValue(result, fieldName, value)
3437
}
3538

36-
override fun setFloat32Field(result: MutableMap<String, Any>?, fieldName: String?, value: Float?) {
39+
override fun setFloat32Field(result: MutableMap<String, Any?>?, fieldName: String?, value: Float?) {
3740
setValue(result, fieldName, value)
3841
}
3942

40-
override fun setInt32Field(result: MutableMap<String, Any>?, fieldName: String?, value: Int?) {
43+
override fun setInt32Field(result: MutableMap<String, Any?>?, fieldName: String?, value: Int?) {
4144
setValue(result, fieldName, value)
4245
}
4346

44-
override fun setArray(result: MutableMap<String, Any>?, fieldName: String?, schema: Schema?, array: MutableList<Any?>?) {
45-
setValue(result, fieldName, array)
47+
override fun setArray(result: MutableMap<String, Any?>?, fieldName: String?, schema: Schema?, array: MutableList<Any?>?) {
48+
val convertedArray = array?.map { convertInner(it) }
49+
setValue(result, fieldName, convertedArray)
4650
}
4751

48-
override fun setTimestampField(result: MutableMap<String, Any>?, fieldName: String?, value: Date?) {
52+
override fun setTimestampField(result: MutableMap<String, Any?>?, fieldName: String?, value: Date?) {
4953
if (value != null) {
5054
val localDate = value.toInstant().atZone(UTC).toLocalDateTime()
5155
setValue(result, fieldName, localDate)
56+
} else {
57+
setNullField(result, fieldName)
5258
}
5359

5460
}
5561

56-
override fun setTimeField(result: MutableMap<String, Any>?, fieldName: String?, value: Date?) {
62+
override fun setTimeField(result: MutableMap<String, Any?>?, fieldName: String?, value: Date?) {
5763
if (value != null) {
5864
val time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(value.time))
5965
setValue(result, fieldName, time)
66+
} else {
67+
setNullField(result, fieldName)
6068
}
6169
}
6270

63-
override fun setInt8Field(result: MutableMap<String, Any>?, fieldName: String?, value: Byte?) {
71+
override fun setInt8Field(result: MutableMap<String, Any?>?, fieldName: String?, value: Byte?) {
6472
setValue(result, fieldName, value)
6573
}
6674

67-
override fun setStructField(result: MutableMap<String, Any>?, fieldName: String?, value: Struct?) {
75+
override fun setStructField(result: MutableMap<String, Any?>?, fieldName: String?, value: Struct?) {
6876
if (value != null) {
69-
val converted = convert(value) as Map<String, Any>
70-
setValue(result, fieldName, converted)
77+
val converted = convert(value) as MutableMap<Any?, Any?>
78+
setMap(result, fieldName, null, converted)
79+
} else {
80+
setNullField(result, fieldName)
7181
}
7282
}
7383

74-
override fun setMap(result: MutableMap<String, Any>?, fieldName: String?, schema: Schema?, map: MutableMap<Any?, Any?>?) {
75-
setValue(result, fieldName, map)
84+
override fun setMap(result: MutableMap<String, Any?>?, fieldName: String?, schema: Schema?, map: MutableMap<Any?, Any?>?) {
85+
val newMap = map
86+
?.mapKeys { it.key.toString() }
87+
?.mapValues { convertInner(it.value) }
88+
setValue(result, fieldName, newMap)
7689
}
7790

78-
override fun setNullField(result: MutableMap<String, Any>?, fieldName: String?) {}
91+
override fun setNullField(result: MutableMap<String, Any?>?, fieldName: String?) {
92+
setValue(result, fieldName, null)
93+
}
7994

80-
override fun setFloat64Field(result: MutableMap<String, Any>?, fieldName: String?, value: Double?) {
95+
override fun setFloat64Field(result: MutableMap<String, Any?>?, fieldName: String?, value: Double?) {
8196
setValue(result, fieldName, value)
8297
}
8398

84-
override fun setInt16Field(result: MutableMap<String, Any>?, fieldName: String?, value: Short?) {
99+
override fun setInt16Field(result: MutableMap<String, Any?>?, fieldName: String?, value: Short?) {
85100
setValue(result, fieldName, value)
86101
}
87102

88-
override fun setInt64Field(result: MutableMap<String, Any>?, fieldName: String?, value: Long?) {
103+
override fun setInt64Field(result: MutableMap<String, Any?>?, fieldName: String?, value: Long?) {
89104
setValue(result, fieldName, value)
90105
}
91106

92-
override fun setBooleanField(result: MutableMap<String, Any>?, fieldName: String?, value: Boolean?) {
107+
override fun setBooleanField(result: MutableMap<String, Any?>?, fieldName: String?, value: Boolean?) {
93108
setValue(result, fieldName, value)
94109
}
95110

96-
override fun setDecimalField(result: MutableMap<String, Any>?, fieldName: String?, value: BigDecimal?) {
111+
override fun setDecimalField(result: MutableMap<String, Any?>?, fieldName: String?, value: BigDecimal?) {
97112
setValue(result, fieldName, value)
98113
}
99114

100-
override fun setDateField(result: MutableMap<String, Any>?, fieldName: String?, value: Date?) {
115+
override fun setDateField(result: MutableMap<String, Any?>?, fieldName: String?, value: Date?) {
101116
if (value != null) {
102117
val localDate = value.toInstant().atZone(UTC).toLocalDate()
103118
setValue(result, fieldName, localDate)
119+
} else {
120+
setNullField(result, fieldName)
104121
}
105122
}
106123

124+
private fun convertInner(value: Any?): Any? {
125+
return when (value) {
126+
is Struct, is Map<*, *> -> convert(value)
127+
else -> value
128+
}
129+
}
107130
}

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.junit.Before
1313
import org.junit.Test
1414
import org.neo4j.graphdb.Label
1515
import org.neo4j.graphdb.Node
16+
import org.neo4j.graphdb.RelationshipType
1617
import org.neo4j.harness.ServerControls
1718
import org.neo4j.harness.TestServerBuilders
1819
import java.util.*
@@ -56,6 +57,64 @@ class Neo4jSinkTaskTest: EasyMockSupport() {
5657
.field("longitude", Schema.FLOAT32_SCHEMA)
5758
.build()
5859

60+
private val PERSON_SCHEMA_EXT = SchemaBuilder.struct().name("com.example.Person")
61+
.field("firstName", Schema.STRING_SCHEMA)
62+
.field("lastName", Schema.STRING_SCHEMA)
63+
.field("age", Schema.OPTIONAL_INT32_SCHEMA)
64+
.field("bool", Schema.OPTIONAL_BOOLEAN_SCHEMA)
65+
.field("short", Schema.OPTIONAL_INT16_SCHEMA)
66+
.field("byte", Schema.OPTIONAL_INT8_SCHEMA)
67+
.field("long", Schema.OPTIONAL_INT64_SCHEMA)
68+
.field("float", Schema.OPTIONAL_FLOAT32_SCHEMA)
69+
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
70+
.field("modified", Timestamp.SCHEMA)
71+
.field("visited", SchemaBuilder.array(PLACE_SCHEMA))
72+
.build()
73+
74+
75+
76+
@Test
77+
fun `test array of struct`() {
78+
val firstTopic = "neotopic"
79+
val props = mutableMapOf<String, String>()
80+
props[Neo4jSinkConnectorConfig.SERVER_URI] = db.boltURI().toString()
81+
props["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}$firstTopic"] = """
82+
CREATE (b:BODY)
83+
WITH event.p AS paragraphList, event.ul AS ulList, b
84+
FOREACH (paragraph IN paragraphList | CREATE (b)-[:HAS_P]->(p:P{value: paragraph.value}))
85+
86+
WITH ulList, b
87+
UNWIND ulList AS ulElem
88+
CREATE (b)-[:HAS_UL]->(ul:UL)
89+
90+
WITH ulElem, ul
91+
UNWIND ulElem.value AS liElem
92+
CREATE (ul)-[:HAS_LI]->(li:LI{value: liElem.value, class: liElem.class})
93+
""".trimIndent()
94+
props[Neo4jSinkConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
95+
props[Neo4jSinkConnectorConfig.BATCH_SIZE] = 2.toString()
96+
props[SinkTask.TOPICS_CONFIG] = "$firstTopic"
97+
98+
val task = Neo4jSinkTask()
99+
task.initialize(mock<SinkTaskContext, SinkTaskContext>(SinkTaskContext::class.java))
100+
task.start(props)
101+
val input = listOf(SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, ValueConverterTest.getTreeStruct(), 42))
102+
task.put(input)
103+
db.graph().beginTx().use {
104+
assertEquals(1, db.graph().findNodes(Label.label("BODY")).stream().count())
105+
assertEquals(2, db.graph().findNodes(Label.label("P")).stream().count())
106+
assertEquals(2, db.graph().findNodes(Label.label("UL")).stream().count())
107+
assertEquals(4, db.graph().findNodes(Label.label("LI")).stream().count())
108+
109+
assertEquals(2, db.graph().execute("MATCH (b:BODY)-[r:HAS_P]->(p:P) RETURN COUNT(r) AS COUNT").columnAs<Long>("COUNT").next())
110+
assertEquals(2, db.graph().execute("MATCH (b:BODY)-[r:HAS_UL]->(ul:UL) RETURN COUNT(r) AS COUNT").columnAs<Long>("COUNT").next())
111+
assertEquals(4, db.graph().execute("MATCH (ul:UL)-[r:HAS_LI]->(li:LI) RETURN COUNT(r) AS COUNT").columnAs<Long>("COUNT").next())
112+
113+
assertEquals(1, db.graph().execute("MATCH (li:LI{class:['ClassA', 'ClassB']}) RETURN COUNT(li) AS COUNT").columnAs<Long>("COUNT").next())
114+
}
115+
}
116+
117+
59118
@Test
60119
fun `should insert data into Neo4j`() {
61120
val firstTopic = "neotopic"

0 commit comments

Comments
 (0)