Skip to content

Commit 1f65632

Browse files
jexpmoxious
authored andcommitted
Make sure nested arrays are handled correctly. (#234)
* Make sure nested arrays are handled correctly. Trying to reproduce the issue in #229 * Removed commented out code
1 parent a4c752e commit 1f65632

File tree

2 files changed

+177
-6
lines changed

2 files changed

+177
-6
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/converters/MapValueConverter.kt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,13 @@ open class MapValueConverter<T>: AbstractConverter<MutableMap<String, T?>>() {
6161
}
6262
}
6363

64-
override fun setMap(result: MutableMap<String, T?>?, fieldName: String?, schema: Schema?, map: MutableMap<Any?, Any?>?) {
65-
val newMap = map
66-
?.mapKeys { it.key.toString() }
67-
?.mapValues { convertInner(it.value) }
68-
setValue(result, fieldName, newMap)
64+
override fun setMap(result: MutableMap<String, T?>?, fieldName: String?, schema: Schema?, value: MutableMap<Any?, Any?>?) {
65+
if (value != null) {
66+
val converted = convert(value) as MutableMap<Any?, Any?>
67+
setValue(result, fieldName, converted)
68+
} else {
69+
setNullField(result, fieldName)
70+
}
6971
}
7072

7173
override fun setNullField(result: MutableMap<String, T?>?, fieldName: String?) {
@@ -96,9 +98,11 @@ open class MapValueConverter<T>: AbstractConverter<MutableMap<String, T?>>() {
9698
setValue(result, fieldName, value)
9799
}
98100

99-
private fun convertInner(value: Any?): Any? {
101+
open fun convertInner(value: Any?): Any? {
100102
return when (value) {
101103
is Struct, is Map<*, *> -> convert(value)
104+
is Collection<*> -> value.map(::convertInner)
105+
is Array<*> -> if (value.javaClass.componentType.isPrimitive) value else value.map(::convertInner)
102106
else -> value
103107
}
104108
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package streams.kafka.connect.sink
2+
3+
import org.apache.kafka.connect.data.SchemaBuilder
4+
import org.apache.kafka.connect.data.Struct
5+
import org.junit.Test
6+
import org.neo4j.driver.v1.Value
7+
import org.neo4j.driver.v1.Values
8+
import streams.kafka.connect.sink.converters.Neo4jValueConverter
9+
import streams.serialization.JSONUtils
10+
import java.time.Instant
11+
import java.time.ZonedDateTime
12+
import java.util.*
13+
import kotlin.test.assertEquals
14+
15+
class Neo4jValueConverterNestedStructTest {
16+
17+
@Test
18+
fun `should convert nested map into map of neo4j values`() {
19+
// given
20+
val body = JSONUtils.readValue<Map<String, Any?>>(data).mapValues(::convertDate)
21+
22+
// when
23+
val result = Neo4jValueConverter().convert(body) as Map<*, *>
24+
25+
// then
26+
val expected = getExpectedMap()
27+
assertEquals(expected, result)
28+
}
29+
30+
@Test
31+
fun `should convert nested struct into map of neo4j values`() {
32+
33+
val body = getTreeStruct()
34+
35+
// when
36+
val result = Neo4jValueConverter().convert(body) as Map<*, *>
37+
38+
// then
39+
val expected = getExpectedMap()
40+
assertEquals(expected, result)
41+
}
42+
43+
companion object {
44+
45+
private val PREF_SCHEMA = SchemaBuilder.struct().name("org.neo4j.example.email.Preference")
46+
.field("preferenceType", SchemaBuilder.string())
47+
.field("endEffectiveDate", org.apache.kafka.connect.data.Timestamp.SCHEMA)
48+
.build()
49+
50+
private val EMAIL_SCHEMA = SchemaBuilder.struct().name("org.neo4j.example.email.Email")
51+
.field("email", SchemaBuilder.string())
52+
.field("preferences", SchemaBuilder.array(PREF_SCHEMA))
53+
.build()
54+
55+
private val TN_SCHEMA = SchemaBuilder.struct().name("org.neo4j.example.email.Transaction")
56+
.field("tn", SchemaBuilder.string())
57+
.field("preferences", SchemaBuilder.array(PREF_SCHEMA))
58+
.build()
59+
60+
private val EVENT_SCHEMA = SchemaBuilder.struct().name("org.neo4j.example.email.Event")
61+
.field("eventId", SchemaBuilder.string())
62+
.field("eventTimestamp", org.apache.kafka.connect.data.Timestamp.SCHEMA)
63+
.field("emails", SchemaBuilder.array(EMAIL_SCHEMA).optional())
64+
.field("tns", SchemaBuilder.array(TN_SCHEMA).optional())
65+
.build()
66+
67+
fun getTreeStruct(): Struct? {
68+
val source = JSONUtils.readValue<Map<String, Any?>>(data).mapValues(::convertDate)
69+
70+
val emails = source["emails"] as List<Map<String,Any>>
71+
val email = Struct(EMAIL_SCHEMA)
72+
.put("email",emails[0]["email"])
73+
.put("preferences",
74+
(emails[0]["preferences"] as List<Map<String,Any>>).map { Struct(PREF_SCHEMA).put("preferenceType", it["preferenceType"]).put("endEffectiveDate",it["endEffectiveDate"]) })
75+
76+
val emailList = listOf(email)
77+
val tnsList =
78+
(source["tns"] as List<Map<String,Any>>).map {
79+
Struct(TN_SCHEMA).put("tn",it["tn"])
80+
.put("preferences", (it["preferences"] as List<Map<String,Any>>).map{ Struct(PREF_SCHEMA).put("preferenceType", it["preferenceType"]).put("endEffectiveDate",it["endEffectiveDate"]) }) }
81+
82+
return Struct(EVENT_SCHEMA)
83+
.put("eventId", source["eventId"])
84+
.put("eventTimestamp", source["eventTimestamp"])
85+
.put("emails", emailList)
86+
.put("tns", tnsList)
87+
}
88+
89+
fun getExpectedMap(): Map<String, Value> {
90+
return JSONUtils.readValue<Map<String, Any?>>(data).mapValues(::convertDateNew).mapValues { Values.value(it.value) }
91+
}
92+
93+
fun convertDate(it: Map.Entry<String,Any?>) : Any? =
94+
when {
95+
it.value is Map<*,*> -> (it.value as Map<String,Any?>).mapValues(::convertDate)
96+
it.value is Collection<*> -> (it.value as Collection<Any?>).map{ x-> convertDate(AbstractMap.SimpleEntry(it.key, x)) }
97+
it.key.endsWith("Date") -> Date.from(Instant.parse(it.value.toString()))
98+
it.key.endsWith("Timestamp") -> Date.from(Instant.parse(it.value.toString()))
99+
else -> it.value
100+
}
101+
fun convertDateNew(it: Map.Entry<String,Any?>) : Any? =
102+
when {
103+
it.value is Map<*,*> -> (it.value as Map<String,Any?>).mapValues(::convertDateNew)
104+
it.value is Collection<*> -> (it.value as Collection<Any?>).map{ x-> convertDateNew(AbstractMap.SimpleEntry(it.key, x)) }
105+
it.key.endsWith("Date") -> ZonedDateTime.parse(it.value.toString()).toLocalDateTime()
106+
it.key.endsWith("Timestamp") -> ZonedDateTime.parse(it.value.toString()).toLocalDateTime()
107+
else -> it.value
108+
}
109+
110+
val data : String = """
111+
{
112+
"eventId": "d70f306a-71d2-48d9-aea3-87b3808b764b",
113+
"eventTimestamp": "2019-08-21T22:29:22.151Z",
114+
"emails": [
115+
{
116+
"email": "[email protected]",
117+
"preferences": [
118+
{
119+
"preferenceType": "repair_subscription",
120+
"endEffectiveDate": "2019-05-08T14:51:26.116Z"
121+
},
122+
{
123+
"preferenceType": "ordering_subscription",
124+
"endEffectiveDate": "2019-05-08T14:51:26.116Z"
125+
},
126+
{
127+
"preferenceType": "marketing_subscription",
128+
"endEffectiveDate": "2019-05-08T14:51:26.116Z"
129+
}
130+
]
131+
}
132+
],
133+
"tns": [
134+
{
135+
"tn": "1122334455",
136+
"preferences": [
137+
{
138+
"preferenceType": "billing_subscription",
139+
"endEffectiveDate": "2019-10-22T14:51:26.116Z"
140+
},
141+
{
142+
"preferenceType": "repair_subscription",
143+
"endEffectiveDate": "2019-10-22T14:51:26.116Z"
144+
},
145+
{
146+
"preferenceType": "sms",
147+
"endEffectiveDate": "2019-10-22T14:51:26.116Z"
148+
}
149+
]
150+
},
151+
{
152+
"tn": "5544332211",
153+
"preferences": [
154+
{
155+
"preferenceType": "acct_lookup",
156+
"endEffectiveDate": "2019-10-22T14:51:26.116Z"
157+
}
158+
]
159+
}
160+
]
161+
}
162+
""".trimIndent()
163+
164+
}
165+
166+
}
167+

0 commit comments

Comments
 (0)