Skip to content

Commit bd4d2cf

Browse files
authored
Handle null values returned from query correctly (#628)
1 parent fd9c3e9 commit bd4d2cf

File tree

2 files changed

+107
-39
lines changed

2 files changed

+107
-39
lines changed

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/utils/ConnectExtensionFunctions.kt

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ import streams.service.StreamsSinkEntity
1515
import java.time.temporal.TemporalAccessor
1616

1717
fun SinkRecord.toStreamsSinkEntity(): StreamsSinkEntity = StreamsSinkEntity(
18-
convertData(this.key(),true),
19-
convertData(this.value()))
18+
convertData(this.key(), true),
19+
convertData(this.value())
20+
)
2021

2122
private val converter = Neo4jValueConverter()
2223

23-
private fun convertData(data: Any?, stringWhenFailure :Boolean = false) = when (data) {
24+
private fun convertData(data: Any?, stringWhenFailure: Boolean = false) = when (data) {
2425
is Struct -> converter.convert(data)
2526
null -> null
2627
else -> JSONUtils.readValue<Any>(data, stringWhenFailure)
@@ -44,6 +45,7 @@ fun Record.asStruct(): Struct {
4445
return struct
4546
}
4647

48+
4749
private fun neo4jToKafka(schema: Schema, value: Any?): Any? = if (value == null) {
4850
null
4951
} else {
@@ -53,10 +55,12 @@ private fun neo4jToKafka(schema: Schema, value: Any?): Any? = if (value == null)
5355
is Array<*> -> value.map { neo4jToKafka(schema.valueSchema(), it) }.toTypedArray()
5456
else -> throw IllegalArgumentException("For Schema.Type.ARRAY we support only Collection and Array")
5557
}
58+
5659
Schema.Type.MAP -> when (value) {
5760
is Map<*, *> -> value.mapValues { neo4jToKafka(schema.valueSchema(), it.value) }
5861
else -> throw IllegalArgumentException("For Schema.Type.MAP we support only Map")
5962
}
63+
6064
Schema.Type.STRUCT -> when (value) {
6165
is Map<*, *> -> {
6266
val struct = Struct(schema)
@@ -68,25 +72,31 @@ private fun neo4jToKafka(schema: Schema, value: Any?): Any? = if (value == null)
6872
}
6973
struct
7074
}
75+
7176
is Point -> {
7277
val map = JSONUtils.readValue<Map<String, Any>>(value)
7378
neo4jToKafka(schema, map)
7479
}
80+
7581
is Node -> {
7682
val map = value.asStreamsMap()
7783
neo4jToKafka(schema, map)
7884
}
85+
7986
is Relationship -> {
8087
val map = value.asStreamsMap()
8188
neo4jToKafka(schema, map)
8289
}
90+
8391
else -> throw IllegalArgumentException("For Schema.Type.STRUCT we support only Map and Point")
8492
}
93+
8594
else -> when (value) {
8695
is TemporalAccessor -> {
8796
val temporalValue = JSONUtils.readValue<String>(value)
8897
neo4jToKafka(schema, temporalValue)
8998
}
99+
90100
else -> when {
91101
Schema.Type.STRING == schema.type() && value !is String -> value.toString()
92102
else -> value
@@ -95,28 +105,47 @@ private fun neo4jToKafka(schema: Schema, value: Any?): Any? = if (value == null)
95105
}
96106
}
97107

108+
private val NULL_SCHEMA = SchemaBuilder.struct().optional().build()
109+
110+
private fun Any?.notNullOrEmpty(): Boolean =
111+
when (val value = this) {
112+
null -> false
113+
is Collection<*> -> value.isNotEmpty() && value.any { it.notNullOrEmpty() }
114+
is Array<*> -> value.isNotEmpty() && value.any { it.notNullOrEmpty() }
115+
is Map<*, *> -> value.isNotEmpty() && value.values.any { it.notNullOrEmpty() }
116+
else -> true
117+
}
118+
98119
private fun neo4jValueSchema(value: Any?): Schema? = when (value) {
99-
null -> null
120+
null -> NULL_SCHEMA
100121
is Long -> Schema.OPTIONAL_INT64_SCHEMA
101122
is Double -> Schema.OPTIONAL_FLOAT64_SCHEMA
102123
is Boolean -> Schema.OPTIONAL_BOOLEAN_SCHEMA
103124
is Collection<*> -> {
104-
val schema = value.firstNotNullOfOrNull { neo4jValueSchema(it) }
105-
if (schema == null) null else SchemaBuilder.array(schema).optional()
125+
// locate the first element that is a good (not null, not empty and has not null or not empty contents)
126+
// candidate to derive the schema
127+
val first = value.firstOrNull { it.notNullOrEmpty() }
128+
val schema = neo4jValueSchema(first)
129+
SchemaBuilder.array(schema).optional().build()
106130
}
131+
107132
is Array<*> -> {
108-
val schema = value.firstNotNullOfOrNull { neo4jValueSchema(it) }
109-
if (schema == null) null else SchemaBuilder.array(schema).optional()
133+
// locate the first element that is a good (not null, not empty and has not null or not empty contents)
134+
// candidate to derive the schema
135+
val first = value.firstOrNull { it.notNullOrEmpty() }
136+
val schema = neo4jValueSchema(first)
137+
SchemaBuilder.array(schema).optional().build()
110138
}
139+
111140
is Map<*, *> -> {
112141
if (value.isEmpty()) {
113142
SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA)
114143
.optional()
115144
.build()
116145
} else {
117146
val valueTypes = value.values
118-
.mapNotNull { elem -> elem?.let{ it::class.java.simpleName } }
119-
.filter { !it.lowercase().startsWith("empty") }
147+
.filter { it.notNullOrEmpty() }
148+
.mapNotNull { it!!.javaClass.name }
120149
.toSet()
121150
if (valueTypes.size == 1) {
122151
neo4jValueSchema(value.values.first())
@@ -129,17 +158,17 @@ private fun neo4jValueSchema(value: Any?): Schema? = when (value) {
129158
val structMap = SchemaBuilder
130159
.struct()
131160
.optional()
132-
value.forEach {
133-
val entry = it
161+
value.forEach { entry ->
134162
neo4jValueSchema(entry.value)?.let {
135163
structMap.field(entry.key.toString(), it)
136164
}
137165
}
138-
if (structMap.fields().isEmpty()) null
166+
if (structMap.fields().isEmpty()) NULL_SCHEMA
139167
else structMap.build()
140168
}
141169
}
142170
}
171+
143172
is Point -> neo4jValueSchema(JSONUtils.readValue<Map<String, Any>>(value))
144173
is Node -> neo4jValueSchema(value.asStreamsMap())
145174
is Relationship -> neo4jValueSchema(value.asStreamsMap())

kafka-connect-neo4j/src/test/kotlin/streams/kafka/connect/source/Neo4jSourceTaskTest.kt

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import org.junit.Test
1515
import org.mockito.Mockito
1616
import org.neo4j.driver.Driver
1717
import org.neo4j.driver.Session
18-
import org.neo4j.driver.types.Node
1918
import org.neo4j.function.ThrowingSupplier
2019
import streams.Assert
2120
import streams.Neo4jContainerExtension
@@ -66,20 +65,20 @@ class Neo4jSourceTaskTest {
6665
val sourceTaskContextMock = Mockito.mock(SourceTaskContext::class.java)
6766
val offsetStorageReader = Mockito.mock(OffsetStorageReader::class.java)
6867
Mockito.`when`(sourceTaskContextMock.offsetStorageReader())
69-
.thenReturn(offsetStorageReader)
68+
.thenReturn(offsetStorageReader)
7069
Mockito.`when`(offsetStorageReader.offset(Mockito.anyMap<String, Any>()))
71-
.thenReturn(emptyMap())
70+
.thenReturn(emptyMap())
7271
task.initialize(sourceTaskContextMock)
7372
}
7473

7574
private fun structToMap(struct: Struct): Map<String, Any?> = struct.schema().fields()
76-
.map {
77-
it.name() to when (val value = struct[it.name()]) {
78-
is Struct -> structToMap(value)
79-
else -> value
80-
}
75+
.map {
76+
it.name() to when (val value = struct[it.name()]) {
77+
is Struct -> structToMap(value)
78+
else -> value
8179
}
82-
.toMap()
80+
}
81+
.toMap()
8382

8483
fun Struct.toMap() = structToMap(this)
8584

@@ -88,7 +87,7 @@ class Neo4jSourceTaskTest {
8887
val props = mutableMapOf<String, String>()
8988
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
9089
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
91-
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "10"
90+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000"
9291
props[Neo4jSourceConnectorConfig.STREAMING_PROPERTY] = "timestamp"
9392
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery()
9493
props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
@@ -110,7 +109,7 @@ class Neo4jSourceTaskTest {
110109
val props = mutableMapOf<String, String>()
111110
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
112111
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
113-
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "10"
112+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000"
114113
props[Neo4jSourceConnectorConfig.ENFORCE_SCHEMA] = "true"
115114
props[Neo4jSourceConnectorConfig.STREAMING_PROPERTY] = "timestamp"
116115
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery()
@@ -134,7 +133,7 @@ class Neo4jSourceTaskTest {
134133
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
135134
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
136135
props[Neo4jSourceConnectorConfig.STREAMING_FROM] = "ALL"
137-
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "10"
136+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000"
138137
props[Neo4jSourceConnectorConfig.STREAMING_PROPERTY] = "timestamp"
139138
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery()
140139
props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
@@ -157,7 +156,7 @@ class Neo4jSourceTaskTest {
157156
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
158157
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
159158
props[Neo4jSourceConnectorConfig.STREAMING_FROM] = "ALL"
160-
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "10"
159+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000"
161160
props[Neo4jSourceConnectorConfig.ENFORCE_SCHEMA] = "true"
162161
props[Neo4jSourceConnectorConfig.STREAMING_PROPERTY] = "timestamp"
163162
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery()
@@ -177,7 +176,8 @@ class Neo4jSourceTaskTest {
177176

178177
private fun insertRecords(totalRecords: Int, longToInt: Boolean = false) = session.beginTransaction().use { tx ->
179178
val elements = (1..totalRecords).map {
180-
val result = tx.run("""
179+
val result = tx.run(
180+
"""
181181
|CREATE (n:Test{
182182
| name: 'Name ' + $it,
183183
| timestamp: timestamp(),
@@ -196,11 +196,12 @@ class Neo4jSourceTaskTest {
196196
| key2: "value2"
197197
| } AS map,
198198
| n AS node
199-
""".trimMargin())
199+
""".trimMargin()
200+
)
200201
val next = result.next()
201202
val map = next.asMap().toMutableMap()
202203
map["array"] = next["array"].asList()
203-
.map { if (longToInt) (it as Long).toInt() else it }
204+
.map { if (longToInt) (it as Long).toInt() else it }
204205
map["point"] = JSONUtils.readValue<Map<String, Any>>(map["point"]!!)
205206
map["datetime"] = next["datetime"].asLocalDateTime().toString()
206207
val node = next["node"].asNode()
@@ -223,7 +224,7 @@ class Neo4jSourceTaskTest {
223224
val props = mutableMapOf<String, String>()
224225
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
225226
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
226-
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "10"
227+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000"
227228
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery()
228229
props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
229230

@@ -244,7 +245,7 @@ class Neo4jSourceTaskTest {
244245
val props = mutableMapOf<String, String>()
245246
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
246247
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
247-
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "10"
248+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000"
248249
props[Neo4jSourceConnectorConfig.ENFORCE_SCHEMA] = "true"
249250
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = getSourceQuery()
250251
props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
@@ -282,7 +283,7 @@ class Neo4jSourceTaskTest {
282283
val props = mutableMapOf<String, String>()
283284
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
284285
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
285-
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "10"
286+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000"
286287
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = "WRONG QUERY".trimMargin()
287288
props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
288289

@@ -308,7 +309,7 @@ class Neo4jSourceTaskTest {
308309
val props = mutableMapOf<String, String>()
309310
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
310311
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
311-
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "10"
312+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000"
312313
props[Neo4jSourceConnectorConfig.ENFORCE_SCHEMA] = "true"
313314
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = """
314315
|WITH
@@ -328,8 +329,6 @@ class Neo4jSourceTaskTest {
328329
val totalRecords = 10
329330
insertRecords(totalRecords)
330331

331-
val list = mutableListOf<SourceRecord>()
332-
333332
val expected = mapOf(
334333
"id" to "ROOT_ID",
335334
"data" to mapOf(
@@ -343,9 +342,49 @@ class Neo4jSourceTaskTest {
343342
)
344343

345344
Assert.assertEventually(ThrowingSupplier {
346-
task.poll()?.let { list.addAll(it) }
347-
val actualList = list.map { (it.value() as Struct).toMap() }
348-
actualList.first() == expected
349-
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
345+
task.poll()?.map { (it.value() as Struct).toMap() }?.first()
346+
}, Matchers.equalTo(expected), 30, TimeUnit.SECONDS)
347+
}
348+
349+
@Test
350+
fun `should support null values returned from query`() {
351+
val props = mutableMapOf<String, String>()
352+
props[Neo4jConnectorConfig.SERVER_URI] = neo4j.boltUrl
353+
props[Neo4jSourceConnectorConfig.TOPIC] = UUID.randomUUID().toString()
354+
props[Neo4jSourceConnectorConfig.STREAMING_POLL_INTERVAL] = "1000"
355+
props[Neo4jSourceConnectorConfig.ENFORCE_SCHEMA] = "true"
356+
props[Neo4jSourceConnectorConfig.SOURCE_TYPE_QUERY] = """
357+
|RETURN {
358+
| prop1: 1,
359+
| prop2: "string",
360+
| prop3: true,
361+
| prop4: null,
362+
| prop5: {
363+
| prop: null
364+
| },
365+
| prop6: [1],
366+
| prop7: [null]
367+
|} AS data, 1717773205 AS timestamp
368+
""".trimMargin()
369+
props[Neo4jConnectorConfig.AUTHENTICATION_TYPE] = AuthenticationType.NONE.toString()
370+
371+
task.start(props)
372+
373+
374+
val expected = mapOf(
375+
"data" to mapOf(
376+
"prop1" to 1L,
377+
"prop2" to "string",
378+
"prop3" to true,
379+
"prop4" to null,
380+
"prop5" to mapOf("prop" to null),
381+
"prop6" to listOf(1L),
382+
"prop7" to listOf<Any?>(null)
383+
), "timestamp" to 1717773205L
384+
)
385+
386+
Assert.assertEventually(ThrowingSupplier {
387+
task.poll()?.map { (it.value() as Struct).toMap() }?.first()
388+
}, Matchers.equalTo(expected), 30, TimeUnit.SECONDS)
350389
}
351390
}

0 commit comments

Comments
 (0)