Skip to content

Commit 60b1851

Browse files
authored
fixes #537: Kafka sink connector does not work with CUD ingestion strategy and Avro format data (#582)
1 parent ed0ed7f commit 60b1851

File tree

4 files changed

+247
-77
lines changed

4 files changed

+247
-77
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,23 @@ import java.util.Date
1010
import java.util.concurrent.TimeUnit
1111

1212

13-
class Neo4jValueConverter: MapValueConverter<Value>() {
13+
class Neo4jValueConverter: MapValueConverter<Any>() {
1414

1515
companion object {
1616
@JvmStatic private val UTC = ZoneId.of("UTC")
1717
}
1818

19-
override fun setValue(result: MutableMap<String, Value?>?, fieldName: String, value: Any?) {
19+
override fun setValue(result: MutableMap<String, Any?>?, fieldName: String, value: Any?) {
2020
if (result != null) {
21-
result[fieldName] = Values.value(value) ?: Values.NULL
21+
result[fieldName] = value
2222
}
2323
}
2424

25-
override fun newValue(): MutableMap<String, Value?> {
25+
override fun newValue(): MutableMap<String, Any?> {
2626
return mutableMapOf()
2727
}
2828

29-
override fun setDecimalField(result: MutableMap<String, Value?>?, fieldName: String, value: BigDecimal) {
29+
override fun setDecimalField(result: MutableMap<String, Any?>?, fieldName: String, value: BigDecimal) {
3030
val doubleValue = value.toDouble()
3131
val fitsScale = doubleValue != Double.POSITIVE_INFINITY
3232
&& doubleValue != Double.NEGATIVE_INFINITY
@@ -38,26 +38,24 @@ class Neo4jValueConverter: MapValueConverter<Value>() {
3838
}
3939
}
4040

41-
override fun setTimestampField(result: MutableMap<String, Value?>?, fieldName: String, value: Date) {
41+
override fun setTimestampField(result: MutableMap<String, Any?>?, fieldName: String, value: Date) {
4242
val localDate = value.toInstant().atZone(UTC).toLocalDateTime()
4343
setValue(result, fieldName, localDate)
44-
4544
}
4645

47-
override fun setTimeField(result: MutableMap<String, Value?>?, fieldName: String, value: Date) {
46+
override fun setTimeField(result: MutableMap<String, Any?>?, fieldName: String, value: Date) {
4847
val time = LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(value.time))
4948
setValue(result, fieldName, time)
5049
}
5150

52-
override fun setDateField(result: MutableMap<String, Value?>?, fieldName: String, value: Date) {
51+
override fun setDateField(result: MutableMap<String, Any?>?, fieldName: String, value: Date) {
5352
val localDate = value.toInstant().atZone(UTC).toLocalDate()
5453
setValue(result, fieldName, localDate)
5554
}
5655

57-
override fun setStructField(result: MutableMap<String, Value?>?, fieldName: String, value: Struct) {
58-
val converted = convert(value)
59-
.mapValues { it.value?.asObject() }
60-
.toMutableMap() as MutableMap<Any?, Any?>
61-
setMap(result, fieldName, null, converted)
56+
override fun setStructField(result: MutableMap<String, Any?>?, fieldName: String, value: Struct) {
57+
val converted = convert(value).toMutableMap() as MutableMap<Any?, Any?>
58+
setValue(result, fieldName, converted)
6259
}
60+
6361
}

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jSinkTaskTest.kt

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,25 +82,27 @@ class Neo4jSinkTaskTest {
8282
task.initialize(mock(SinkTaskContext::class.java))
8383
}
8484

85-
private val PERSON_SCHEMA = SchemaBuilder.struct().name("com.example.Person")
86-
.field("firstName", Schema.STRING_SCHEMA)
87-
.field("lastName", Schema.STRING_SCHEMA)
88-
.field("age", Schema.OPTIONAL_INT32_SCHEMA)
89-
.field("bool", Schema.OPTIONAL_BOOLEAN_SCHEMA)
90-
.field("short", Schema.OPTIONAL_INT16_SCHEMA)
91-
.field("byte", Schema.OPTIONAL_INT8_SCHEMA)
92-
.field("long", Schema.OPTIONAL_INT64_SCHEMA)
93-
.field("float", Schema.OPTIONAL_FLOAT32_SCHEMA)
94-
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
95-
.field("modified", Timestamp.SCHEMA)
96-
.build()
97-
9885
private val PLACE_SCHEMA = SchemaBuilder.struct().name("com.example.Place")
9986
.field("name", Schema.STRING_SCHEMA)
10087
.field("latitude", Schema.FLOAT32_SCHEMA)
10188
.field("longitude", Schema.FLOAT32_SCHEMA)
89+
.field("modified", Timestamp.SCHEMA)
10290
.build()
10391

92+
private val PERSON_SCHEMA = SchemaBuilder.struct().name("com.example.Person")
93+
.field("firstName", Schema.STRING_SCHEMA)
94+
.field("lastName", Schema.STRING_SCHEMA)
95+
.field("age", Schema.OPTIONAL_INT32_SCHEMA)
96+
.field("bool", Schema.OPTIONAL_BOOLEAN_SCHEMA)
97+
.field("short", Schema.OPTIONAL_INT16_SCHEMA)
98+
.field("byte", Schema.OPTIONAL_INT8_SCHEMA)
99+
.field("long", Schema.OPTIONAL_INT64_SCHEMA)
100+
.field("float", Schema.OPTIONAL_FLOAT32_SCHEMA)
101+
.field("double", Schema.OPTIONAL_FLOAT64_SCHEMA)
102+
.field("modified", Timestamp.SCHEMA)
103+
.field("place", PLACE_SCHEMA)
104+
.build()
105+
104106
@Test
105107
fun `test array of struct`() {
106108
val firstTopic = "neotopic"
@@ -1351,6 +1353,54 @@ class Neo4jSinkTaskTest {
13511353
}
13521354
}
13531355

1356+
@Test
1357+
fun `should successfully parse nested timestamps`() {
1358+
val firstTopic = "neotopic"
1359+
val secondTopic = "foo"
1360+
val props = mutableMapOf<String, String>()
1361+
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
1362+
props["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}$firstTopic"] = "CREATE (n:PersonExt {modified: event.place.modified})"
1363+
props["${Neo4jSinkConnectorConfig.TOPIC_CYPHER_PREFIX}$secondTopic"] = "CREATE (n:Person {modified: event.place.modified})"
1364+
props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
1365+
props[Neo4jConnectorConfig.BATCH_SIZE] = 2.toString()
1366+
props[SinkTask.TOPICS_CONFIG] = "$firstTopic,$secondTopic"
1367+
1368+
val place = Struct(PLACE_SCHEMA)
1369+
.put("name", "San Mateo (CA)")
1370+
.put("latitude", 37.5629917.toFloat())
1371+
.put("longitude", -122.3255254.toFloat())
1372+
.put("modified", Date(1474661402123L))
1373+
val struct = Struct(PERSON_SCHEMA)
1374+
.put("firstName", "Alex")
1375+
.put("lastName", "Smith")
1376+
.put("bool", true)
1377+
.put("short", 1234.toShort())
1378+
.put("byte", (-32).toByte())
1379+
.put("long", 12425436L)
1380+
.put("float", 2356.3.toFloat())
1381+
.put("double", -2436546.56457)
1382+
.put("age", 21)
1383+
.put("modified", Date(1474661402123L))
1384+
.put("place", place)
1385+
1386+
task.start(props)
1387+
val input = listOf(SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
1388+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
1389+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
1390+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
1391+
SinkRecord(firstTopic, 1, null, null, PERSON_SCHEMA, struct, 42),
1392+
SinkRecord(secondTopic, 1, null, null, PERSON_SCHEMA, struct, 43))
1393+
task.put(input)
1394+
session.beginTransaction().use {
1395+
val personCount = it.run("MATCH (p:Person) RETURN COUNT(p) as COUNT").single()["COUNT"].asLong()
1396+
val expectedPersonCount = input.filter { it.topic() == secondTopic }.size
1397+
assertEquals(expectedPersonCount, personCount.toInt())
1398+
val personExtCount = it.run("MATCH (p:PersonExt) RETURN COUNT(p) as COUNT").single()["COUNT"].asLong()
1399+
val expectedPersonExtCount = input.filter { it.topic() == firstTopic }.size
1400+
assertEquals(expectedPersonExtCount, personExtCount.toInt())
1401+
}
1402+
}
1403+
13541404
private fun countFooPersonEntities(expected: Int) {
13551405
val personCount = session.run("MATCH (p:Person) RETURN count(p) as count")
13561406
.single()["count"]

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterNestedStructTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ class Neo4jValueConverterNestedStructTest {
8686
.put("tns", tnsList)
8787
}
8888

89-
fun getExpectedMap(): Map<String, Value> {
90-
return JSONUtils.readValue<Map<String, Any?>>(data).mapValues(::convertDateNew).mapValues { Values.value(it.value) }
89+
fun getExpectedMap(): Map<String, Any?> {
90+
return JSONUtils.readValue<Map<String, Any?>>(data).mapValues(::convertDateNew)
9191
}
9292

9393
fun convertDate(it: Map.Entry<String,Any?>) : Any? =

0 commit comments

Comments
 (0)