Skip to content

Commit bc1f73c

Browse files
authored
sink performance improvements (#369)
1 parent ccb21dc commit bc1f73c

File tree

50 files changed

+464
-362
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+464
-362
lines changed

common/src/main/kotlin/streams/config/StreamsConfig.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import java.io.FileInputStream
88
import java.io.FileNotFoundException
99
import java.util.Properties
1010
import java.util.concurrent.ConcurrentHashMap
11+
import java.util.concurrent.TimeUnit
1112

1213
data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementService) : LifecycleAdapter() {
1314

@@ -33,6 +34,7 @@ data class StreamsConfig(private val log: Log, private val dbms: DatabaseManagem
3334
const val CHECK_WRITEABLE_INSTANCE_INTERVAL = "streams.check.writeable.instance.interval"
3435
const val SYSTEM_DB_WAIT_TIMEOUT = "streams.systemdb.wait.timeout"
3536
const val SYSTEM_DB_WAIT_TIMEOUT_VALUE = 10000L
37+
const val POLL_INTERVAL = "streams.sink.poll.interval"
3638
private var afterInitListeners = mutableListOf<((MutableMap<String, String>) -> Unit)>()
3739

3840
fun registerListener(after: (MutableMap<String, String>) -> Unit) {

common/src/main/kotlin/streams/events/StreamsEvent.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,15 @@ data class Schema(val properties: Map<String, String> = emptyMap(),
5555

5656
open class StreamsEvent(open val payload: Any)
5757
data class StreamsTransactionEvent(val meta: Meta, override val payload: Payload, val schema: Schema): StreamsEvent(payload)
58+
59+
data class StreamsTransactionNodeEvent(val meta: Meta,
60+
val payload: NodePayload,
61+
val schema: Schema) {
62+
fun toStreamsTransactionEvent() = StreamsTransactionEvent(this.meta, this.payload, this.schema)
63+
}
64+
data class StreamsTransactionRelationshipEvent(val meta: Meta,
65+
val payload: RelationshipPayload,
66+
val schema: Schema) {
67+
fun toStreamsTransactionEvent() = StreamsTransactionEvent(this.meta, this.payload, this.schema)
68+
}
69+

common/src/main/kotlin/streams/extensions/CommonExtensions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
99
import org.apache.kafka.clients.consumer.OffsetAndMetadata
1010
import org.apache.kafka.common.TopicPartition
1111
import org.neo4j.graphdb.Node
12-
import streams.serialization.JSONUtils
12+
import streams.utils.JSONUtils
1313
import streams.service.StreamsSinkEntity
1414
import java.nio.ByteBuffer
1515
import java.util.*
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package streams.extensions
2+
3+
import kotlinx.coroutines.Deferred
4+
import kotlinx.coroutines.ExperimentalCoroutinesApi
5+
import kotlinx.coroutines.ObsoleteCoroutinesApi
6+
import kotlinx.coroutines.channels.ticker
7+
import kotlinx.coroutines.selects.whileSelect
8+
import java.util.concurrent.CopyOnWriteArraySet
9+
import java.util.concurrent.TimeoutException
10+
11+
12+
// taken from https://stackoverflow.com/questions/52192752/kotlin-how-to-run-n-coroutines-and-wait-for-first-m-results-or-timeout
13+
@ObsoleteCoroutinesApi
14+
@ExperimentalCoroutinesApi
15+
suspend fun <T> List<Deferred<T>>.awaitAll(timeoutMs: Long): List<T> {
16+
val jobs = CopyOnWriteArraySet<Deferred<T>>(this)
17+
val result = ArrayList<T>(size)
18+
val timeout = ticker(timeoutMs)
19+
20+
whileSelect {
21+
jobs.forEach { deferred ->
22+
deferred.onAwait {
23+
jobs.remove(deferred)
24+
result.add(it)
25+
result.size != size
26+
}
27+
}
28+
29+
timeout.onReceive {
30+
jobs.forEach { it.cancel() }
31+
throw TimeoutException("Tasks $size cancelled after timeout of $timeoutMs ms.")
32+
}
33+
}
34+
35+
return result
36+
}
37+
38+
@ExperimentalCoroutinesApi
39+
fun <T> Deferred<T>.errors() = when {
40+
isCompleted -> getCompletionExceptionOrNull()
41+
isCancelled -> getCompletionExceptionOrNull() // was getCancellationException()
42+
isActive -> RuntimeException("Job $this still active")
43+
else -> null
44+
}

common/src/main/kotlin/streams/service/errors/ErrorService.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import org.apache.avro.generic.GenericRecord
44
import org.apache.kafka.clients.consumer.ConsumerRecord
55
import org.apache.kafka.common.record.RecordBatch
66
import streams.extensions.toMap
7-
import streams.serialization.JSONUtils
7+
import streams.utils.JSONUtils
88
import java.util.*
99

1010

common/src/main/kotlin/streams/service/sink/strategy/CUDIngestionStrategy.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package streams.service.sink.strategy
22

33
import streams.events.EntityType
44
import streams.extensions.quote
5-
import streams.serialization.JSONUtils
5+
import streams.utils.JSONUtils
66
import streams.service.StreamsSinkEntity
77
import streams.service.sink.strategy.CUDIngestionStrategy.Companion.FROM_KEY
88
import streams.service.sink.strategy.CUDIngestionStrategy.Companion.TO_KEY

common/src/main/kotlin/streams/service/sink/strategy/NodePatternIngestionStrategy.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package streams.service.sink.strategy
22

33
import streams.extensions.flatten
4-
import streams.serialization.JSONUtils
4+
import streams.utils.JSONUtils
55
import streams.service.StreamsSinkEntity
66
import streams.utils.IngestionUtils.containsProp
77
import streams.utils.IngestionUtils.getLabelsAsString

common/src/main/kotlin/streams/service/sink/strategy/RelationshipPatternIngestionStrategy.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package streams.service.sink.strategy
22

33
import streams.extensions.flatten
4-
import streams.serialization.JSONUtils
4+
import streams.utils.JSONUtils
55
import streams.service.StreamsSinkEntity
66
import streams.utils.IngestionUtils.containsProp
77
import streams.utils.IngestionUtils.getLabelsAsString

common/src/main/kotlin/streams/service/sink/strategy/SourceIdIngestionStrategy.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package streams.service.sink.strategy
22

33
import streams.events.*
44
import streams.extensions.quote
5-
import streams.serialization.JSONUtils
65
import streams.service.StreamsSinkEntity
76
import streams.utils.IngestionUtils.getLabelsAsString
87
import streams.utils.SchemaUtils

common/src/main/kotlin/streams/utils/JSONUtils.kt

Lines changed: 42 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package streams.serialization
1+
package streams.utils
22

33
import com.fasterxml.jackson.core.JsonGenerator
44
import com.fasterxml.jackson.core.JsonParseException
@@ -12,7 +12,6 @@ import org.neo4j.driver.internal.value.PointValue
1212
import org.neo4j.graphdb.spatial.Point
1313
import org.neo4j.values.storable.CoordinateReferenceSystem
1414
import streams.events.*
15-
import streams.utils.StreamsUtils
1615
import java.io.IOException
1716
import java.time.temporal.TemporalAccessor
1817
import kotlin.reflect.full.isSubclassOf
@@ -35,8 +34,7 @@ fun Point.toStreamsPoint(): StreamsPoint {
3534

3635
fun PointValue.toStreamsPoint(): StreamsPoint {
3736
val point = this.asPoint()
38-
val crsType = point.srid()
39-
return when (crsType) {
37+
return when (val crsType = point.srid()) {
4038
CoordinateReferenceSystem.Cartesian.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian.name, point.x(), point.y())
4139
CoordinateReferenceSystem.Cartesian_3D.code -> StreamsPointCartesian(CoordinateReferenceSystem.Cartesian_3D.name, point.x(), point.y(), point.z())
4240
CoordinateReferenceSystem.WGS84.code -> StreamsPointWgs(CoordinateReferenceSystem.WGS84.name, point.x(), point.y())
@@ -82,20 +80,22 @@ class TemporalAccessorSerializer : JsonSerializer<TemporalAccessor>() {
8280
object JSONUtils {
8381

8482
private val OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper()
83+
private val STRICT_OBJECT_MAPPER: ObjectMapper = jacksonObjectMapper()
8584

8685
init {
8786
val module = SimpleModule("Neo4jKafkaSerializer")
88-
StreamsUtils.ignoreExceptions({ module.addSerializer(Point::class.java, PointSerializer()) }, NoClassDefFoundError::class.java, UnsupportedClassVersionError::class.java) // in case is loaded from
89-
StreamsUtils.ignoreExceptions({ module.addSerializer(PointValue::class.java, PointValueSerializer()) }, NoClassDefFoundError::class.java, UnsupportedClassVersionError::class.java) // in case is loaded from
87+
StreamsUtils.ignoreExceptions({ module.addSerializer(Point::class.java, PointSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
88+
StreamsUtils.ignoreExceptions({ module.addSerializer(PointValue::class.java, PointValueSerializer()) }, NoClassDefFoundError::class.java) // in case is loaded from
9089
module.addSerializer(TemporalAccessor::class.java, TemporalAccessorSerializer())
9190
OBJECT_MAPPER.registerModule(module)
9291
OBJECT_MAPPER.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
9392
OBJECT_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
93+
STRICT_OBJECT_MAPPER.registerModule(module)
9494
}
9595

96-
fun getObjectMapper(): ObjectMapper {
97-
return OBJECT_MAPPER
98-
}
96+
fun getObjectMapper(): ObjectMapper = OBJECT_MAPPER
97+
98+
fun getStrictObjectMapper(): ObjectMapper = STRICT_OBJECT_MAPPER
9999

100100
fun asMap(any: Any): Map<String, Any?> {
101101
return OBJECT_MAPPER.convertValue(any, Map::class.java)
@@ -114,60 +114,52 @@ object JSONUtils {
114114
return getObjectMapper().readValue(value, T::class.java)
115115
}
116116

117-
inline fun <reified T> readValue(value: Any, stringWhenFailure:Boolean = false): T {
118-
val strValue = when (value) {
119-
is String -> value
120-
is ByteArray -> String(value)
121-
else -> getObjectMapper().writeValueAsString(value)
122-
}
117+
inline fun <reified T> readValue(value: Any,
118+
stringWhenFailure: Boolean = false,
119+
objectMapper: ObjectMapper = getObjectMapper()): T {
123120
return try {
124-
getObjectMapper().readValue(strValue)
125-
} catch(e: JsonParseException) {
121+
when (value) {
122+
is String -> objectMapper.readValue(value)
123+
is ByteArray -> objectMapper.readValue(value)
124+
else -> objectMapper.convertValue(value)
125+
}
126+
} catch (e: JsonParseException) {
126127
if (stringWhenFailure && String::class.isSubclassOf(T::class)) {
128+
val strValue = when (value) {
129+
is ByteArray -> String(value)
130+
null -> ""
131+
else -> value.toString()
132+
}
127133
strValue.trimStart().let {
128-
if (it.get(0) == '{' || it.get(0) == '[') throw e
134+
if (it[0] == '{' || it[0] == '[') throw e
129135
else it as T
130136
}
131137
}
132138
else throw e
133139
}
134140
}
135141

136-
inline fun <reified T> convertValue(value: Any): T {
137-
return getObjectMapper().convertValue(value)
142+
inline fun <reified T> convertValue(value: Any, objectMapper: ObjectMapper = getObjectMapper()): T {
143+
return objectMapper.convertValue(value)
138144
}
139145

140-
@Suppress("UNCHECKED_CAST")
141146
fun asStreamsTransactionEvent(obj: Any): StreamsTransactionEvent {
142-
if (obj is StreamsTransactionEvent) {
143-
return obj
144-
}
145-
val value = when (obj) {
146-
is Map<*, *> -> obj as Map<String, Map<String, Any>>
147-
is String -> readValue(obj)
148-
is ByteArray -> readValue(obj)
149-
else -> convertValue(obj)
150-
}
151-
val meta = convertValue<Meta>(value.getValue("meta"))
152-
153-
val schema = convertValue<Schema>(value.getValue("schema"))
154-
155-
val payloadMap = value.getValue("payload")
156-
val type = payloadMap.getValue("type").toString()
157-
val id = payloadMap.getValue("id").toString()
158-
159-
val payload = if (type == "node") {
160-
val before = if (payloadMap["before"] != null) convertValue<NodeChange>(payloadMap["before"]!!) else null
161-
val after = if (payloadMap["after"] != null) convertValue<NodeChange>(payloadMap["after"]!!) else null
162-
NodePayload(id, before, after)
163-
} else {
164-
val label= payloadMap.getValue("label").toString()
165-
val start = convertValue<RelationshipNodeChange>(payloadMap.getValue("start"))
166-
val end = convertValue<RelationshipNodeChange>(payloadMap.getValue("end"))
167-
val before = if (payloadMap["before"] != null) convertValue<RelationshipChange>(payloadMap["before"]!!) else null
168-
val after = if (payloadMap["after"] != null) convertValue<RelationshipChange>(payloadMap["after"]!!) else null
169-
RelationshipPayload(id, start, end, before, after, label)
147+
return try {
148+
val evt = when (obj) {
149+
is String, is ByteArray -> readValue<StreamsTransactionNodeEvent>(value = obj,
150+
objectMapper = STRICT_OBJECT_MAPPER)
151+
else -> convertValue<StreamsTransactionNodeEvent>(value = obj,
152+
objectMapper = STRICT_OBJECT_MAPPER)
153+
}
154+
evt.toStreamsTransactionEvent()
155+
} catch (e: Exception) {
156+
val evt = when (obj) {
157+
is String, is ByteArray -> readValue<StreamsTransactionRelationshipEvent>(value = obj,
158+
objectMapper = STRICT_OBJECT_MAPPER)
159+
else -> convertValue<StreamsTransactionRelationshipEvent>(value = obj,
160+
objectMapper = STRICT_OBJECT_MAPPER)
161+
}
162+
evt.toStreamsTransactionEvent()
170163
}
171-
return StreamsTransactionEvent(meta, payload, schema)
172164
}
173165
}

0 commit comments

Comments
 (0)