Skip to content

Commit a6a14fe

Browse files
Fixed #508: Issue with Neo4j kafka sink connector in combination with Avro schema nesting (#511)
1 parent 50d71a6 commit a6a14fe

File tree

2 files changed

+27
-0
lines changed

2 files changed

+27
-0
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/Neo4jValueConverter.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package streams.kafka.connect.sink.converters
22

3+
import org.apache.kafka.connect.data.Struct
34
import org.neo4j.driver.Value
45
import org.neo4j.driver.Values
56
import java.math.BigDecimal
@@ -52,4 +53,11 @@ class Neo4jValueConverter: MapValueConverter<Value>() {
5253
val localDate = value.toInstant().atZone(UTC).toLocalDate()
5354
setValue(result, fieldName, localDate)
5455
}
56+
57+
override fun setStructField(result: MutableMap<String, Value?>?, fieldName: String, value: Struct) {
58+
val converted = convert(value)
59+
.mapValues { it.value?.asObject() }
60+
.toMutableMap() as MutableMap<Any?, Any?>
61+
setMap(result, fieldName, null, converted)
62+
}
5563
}

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/sink/Neo4jValueConverterTest.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,25 @@ class Neo4jValueConverterTest {
181181
assertEquals(date.toInstant().atZone(ZoneId.of("UTC")).toLocalDateTime(), result["date"]?.asLocalDateTime())
182182
}
183183

184+
@Test
185+
fun `should be able to process a nested AVRO structure`() {
186+
val trainSchema = SchemaBuilder.struct()
187+
.field("internationalTrainNumber", Schema.STRING_SCHEMA)
188+
.field("trainDate", Schema.STRING_SCHEMA).build()
189+
val mySchema = SchemaBuilder.struct()
190+
.field("trainId", trainSchema)
191+
.field("coreId", Schema.STRING_SCHEMA).build()
192+
193+
val trainIdStruct = Struct(trainSchema)
194+
.put("internationalTrainNumber", "46261")
195+
.put("trainDate", "2021-05-20")
196+
val rootStruct = Struct(mySchema)
197+
.put("trainId", trainIdStruct)
198+
.put("coreId", "000000046261")
199+
200+
val result = Neo4jValueConverter().convert(rootStruct) as Map<*, *>
201+
}
202+
184203
companion object {
185204
private val LI_SCHEMA = SchemaBuilder.struct().name("org.neo4j.example.html.LI")
186205
.field("value", Schema.OPTIONAL_STRING_SCHEMA)

0 commit comments

Comments
 (0)