Skip to content

Commit 2cbfb4a

Browse files
authored
fixes #553: Struct can not be created from an array with optional elements (#554)
1 parent 8fae5f0 commit 2cbfb4a

File tree

3 files changed

+121
-54
lines changed

3 files changed

+121
-54
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/utils/ConnectExtensionFunctions.kt

Lines changed: 74 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -41,81 +41,102 @@ fun Record.asStruct(): Struct {
4141
schema.fields().forEach {
4242
struct.put(it, neo4jToKafka(it.schema(), asMap[it.name()]))
4343
}
44-
struct
4544
return struct
4645
}
4746

48-
private fun neo4jToKafka(schema: Schema, value: Any?): Any? = when (schema.type()) {
49-
Schema.Type.ARRAY -> when (value) {
50-
is Collection<*> -> value.map { neo4jToKafka(neo4jValueSchema(it), it) }
51-
is Array<*> -> value.map { neo4jToKafka(neo4jValueSchema(it), it) }.toTypedArray()
52-
else -> throw IllegalArgumentException("For Schema.Type.ARRAY we support only Collection and Array")
53-
}
54-
Schema.Type.MAP -> when (value) {
55-
is Map<*, *> -> value.mapValues { neo4jToKafka(neo4jValueSchema(it.value), it.value) }
56-
else -> throw IllegalArgumentException("For Schema.Type.MAP we support only Map")
57-
}
58-
Schema.Type.STRUCT -> when (value) {
59-
is Map<*, *> -> {
60-
val struct = Struct(schema)
61-
schema.fields().forEach {
62-
struct.put(it, neo4jToKafka(it.schema(), value[it.name()]))
63-
}
64-
struct
65-
}
66-
is Point -> {
67-
val map = JSONUtils.readValue<Map<String, Any>>(value)
68-
neo4jToKafka(schema, map)
47+
private fun neo4jToKafka(schema: Schema, value: Any?): Any? = if (value == null) {
48+
null
49+
} else {
50+
when (schema.type()) {
51+
Schema.Type.ARRAY -> when (value) {
52+
is Collection<*> -> value.map { neo4jToKafka(schema.valueSchema(), it) }
53+
is Array<*> -> value.map { neo4jToKafka(schema.valueSchema(), it) }.toTypedArray()
54+
else -> throw IllegalArgumentException("For Schema.Type.ARRAY we support only Collection and Array")
6955
}
70-
is Node -> {
71-
val map = value.asStreamsMap()
72-
neo4jToKafka(schema, map)
56+
Schema.Type.MAP -> when (value) {
57+
is Map<*, *> -> value.mapValues { neo4jToKafka(schema.valueSchema(), it.value) }
58+
else -> throw IllegalArgumentException("For Schema.Type.MAP we support only Map")
7359
}
74-
is Relationship -> {
75-
val map = value.asStreamsMap()
76-
neo4jToKafka(schema, map)
77-
}
78-
else -> throw IllegalArgumentException("For Schema.Type.STRUCT we support only Map and Point")
79-
}
80-
else -> when (value) {
81-
null -> null
82-
is TemporalAccessor -> {
83-
val temporalValue = JSONUtils.readValue<String>(value)
84-
neo4jToKafka(schema, temporalValue)
60+
Schema.Type.STRUCT -> when (value) {
61+
is Map<*, *> -> {
62+
val struct = Struct(schema)
63+
schema.fields().forEach {
64+
val field = it
65+
neo4jToKafka(field.schema(), value[field.name()])?.let {
66+
struct.put(field, it)
67+
}
68+
}
69+
struct
70+
}
71+
is Point -> {
72+
val map = JSONUtils.readValue<Map<String, Any>>(value)
73+
neo4jToKafka(schema, map)
74+
}
75+
is Node -> {
76+
val map = value.asStreamsMap()
77+
neo4jToKafka(schema, map)
78+
}
79+
is Relationship -> {
80+
val map = value.asStreamsMap()
81+
neo4jToKafka(schema, map)
82+
}
83+
else -> throw IllegalArgumentException("For Schema.Type.STRUCT we support only Map and Point")
8584
}
86-
else -> when {
87-
Schema.Type.STRING == schema.type() && value !is String -> value.toString()
88-
else -> value
85+
else -> when (value) {
86+
is TemporalAccessor -> {
87+
val temporalValue = JSONUtils.readValue<String>(value)
88+
neo4jToKafka(schema, temporalValue)
89+
}
90+
else -> when {
91+
Schema.Type.STRING == schema.type() && value !is String -> value.toString()
92+
else -> value
93+
}
8994
}
9095
}
9196
}
9297

93-
private fun neo4jValueSchema(value: Any?): Schema = when (value) {
98+
private fun neo4jValueSchema(value: Any?): Schema? = when (value) {
99+
null -> null
94100
is Long -> Schema.OPTIONAL_INT64_SCHEMA
95101
is Double -> Schema.OPTIONAL_FLOAT64_SCHEMA
96102
is Boolean -> Schema.OPTIONAL_BOOLEAN_SCHEMA
97103
is Collection<*> -> {
98-
(value.firstOrNull()?.let {
99-
SchemaBuilder.array(neo4jValueSchema(it))
100-
} ?: SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA)).build()
104+
val schema = value.firstNotNullOfOrNull { neo4jValueSchema(it) }
105+
if (schema == null) null else SchemaBuilder.array(schema).optional()
101106
}
102107
is Array<*> -> {
103-
(value.firstOrNull()?.let {
104-
SchemaBuilder.array(neo4jValueSchema(it))
105-
} ?: SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA)).build()
108+
val schema = value.firstNotNullOfOrNull { neo4jValueSchema(it) }
109+
if (schema == null) null else SchemaBuilder.array(schema).optional()
106110
}
107111
is Map<*, *> -> {
108112
if (value.isEmpty()) {
109-
SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA).build()
113+
SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA)
114+
.optional()
115+
.build()
110116
} else {
111-
val valueTypes = value.values.mapNotNull { elem -> elem?.let{ it::class.java.simpleName } }
112-
.toSet()
117+
val valueTypes = value.values
118+
.mapNotNull { elem -> elem?.let{ it::class.java.simpleName } }
119+
.filter { !it.lowercase().startsWith("empty") }
120+
.toSet()
113121
if (valueTypes.size == 1) {
114-
SchemaBuilder.map(Schema.STRING_SCHEMA, neo4jValueSchema(value.values.first())).build()
122+
neo4jValueSchema(value.values.first())
123+
?.let {
124+
SchemaBuilder.map(Schema.STRING_SCHEMA, it)
125+
.optional()
126+
.build()
127+
}
115128
} else {
116-
val structMap = SchemaBuilder.struct()
117-
value.forEach { structMap.field(it.key.toString(), neo4jValueSchema(it.value)) }
118-
structMap.build()
129+
val structMap = SchemaBuilder
130+
.struct()
131+
.optional()
132+
value.forEach {
133+
val entry = it
134+
neo4jValueSchema(entry.value)?.let {
135+
structMap.field(entry.key.toString(), it)
136+
}
137+
}
138+
if (structMap.fields().isEmpty()) null
139+
else structMap.build()
119140
}
120141
}
121142
}

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,4 +292,50 @@ class Neo4jSourceTaskTest {
292292

293293
task.poll()
294294
}
295+
296+
@Test
297+
fun `should source data from mock with custom QUERY without streaming property with Schema`() {
298+
val props = mutableMapOf<String, String>()
299+
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
300+
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
301+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "10"
302+
props[Neo4jSourceConnectorConfig.ENFORCE_SCHEMA] = "true"
303+
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = """
304+
|WITH
305+
|{
306+
| id: 'ROOT_ID',
307+
| root: [
308+
| { children: [] },
309+
| { children: [{ name: "child" }] }
310+
| ],
311+
| arr: [null, {foo: "bar"}]
312+
|} AS data
313+
|RETURN data, data.id AS id
314+
""".trimMargin()
315+
props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
316+
317+
task.start(props)
318+
val totalRecords = 10
319+
insertRecords(totalRecords)
320+
321+
val list = mutableListOf<SourceRecord>()
322+
323+
val expected = mapOf(
324+
"id" to "ROOT_ID",
325+
"data" to mapOf(
326+
"id" to "ROOT_ID",
327+
"arr" to listOf(null, mapOf("foo" to "bar")),
328+
"root" to listOf(
329+
mapOf("children" to emptyList<Map<String, Any>>()),
330+
mapOf("children" to listOf(mapOf("name" to "child")))
331+
)
332+
)
333+
)
334+
335+
Assert.assertEventually(ThrowingSupplier {
336+
task.poll()?.let { list.addAll(it) }
337+
val actualList = list.map { (it.value() as Struct).toMap() }
338+
actualList.first() == expected
339+
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
340+
}
295341
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
<java.version>11</java.version>
5252
<kotlin.version>1.7.10</kotlin.version>
5353
<kotlin.coroutines.version>1.6.4</kotlin.coroutines.version>
54-
<neo4j.version>4.4.10</neo4j.version>
54+
<neo4j.version>4.4.17</neo4j.version>
5555
<kafka.version>2.4.1</kafka.version>
5656
<jackson.version>2.13.4</jackson.version>
5757
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>

0 commit comments

Comments
 (0)