Skip to content

Commit 4cd8aee

Browse files
authored
Feature/add flush policies (#122)
Add Flush Policies with concrete implementations of: CountBasedFlushPolicy and FrequencyFlushPolicy.
1 parent 7fc55c4 commit 4cd8aee

File tree

14 files changed

+443
-77
lines changed

14 files changed

+443
-77
lines changed

core/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ test {
1616
dependencies {
1717
// MAIN DEPS
1818
api 'com.segment:sovran-kotlin:1.2.1'
19-
api "org.jetbrains.kotlinx:kotlinx-serialization-json:1.2.2"
19+
api "org.jetbrains.kotlinx:kotlinx-serialization-json:1.4.1"
2020
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0'
2121

2222
// TESTING

core/src/main/java/com/segment/analytics/kotlin/core/Configuration.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.segment.analytics.kotlin.core
22

33
import com.segment.analytics.kotlin.core.Constants.DEFAULT_API_HOST
44
import com.segment.analytics.kotlin.core.Constants.DEFAULT_CDN_HOST
5+
import com.segment.analytics.kotlin.core.platform.policies.FlushPolicy
56
import com.segment.analytics.kotlin.core.utilities.ConcreteStorageProvider
67
import kotlinx.coroutines.*
78
import sovran.kotlin.Store
@@ -31,6 +32,7 @@ data class Configuration(
3132
var trackDeepLinks: Boolean = false,
3233
var flushAt: Int = 20,
3334
var flushInterval: Int = 30,
35+
var flushPolicies: List<FlushPolicy> = emptyList<FlushPolicy>(),
3436
val defaultSettings: Settings = Settings(),
3537
var autoAddSegmentDestination: Boolean = true,
3638
var apiHost: String = DEFAULT_API_HOST,

core/src/main/java/com/segment/analytics/kotlin/core/Events.kt

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
package com.segment.analytics.kotlin.core
22

3+
import kotlinx.serialization.DeserializationStrategy
34
import kotlinx.serialization.KSerializer
45
import kotlinx.serialization.SerialName
56
import kotlinx.serialization.Serializable
67
import kotlinx.serialization.descriptors.PrimitiveKind
78
import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor
89
import kotlinx.serialization.encoding.Decoder
910
import kotlinx.serialization.encoding.Encoder
10-
import kotlinx.serialization.json.JsonArray
11-
import kotlinx.serialization.json.JsonObject
11+
import kotlinx.serialization.json.*
1212
import sovran.kotlin.Store
1313
import java.time.Instant
14-
import java.util.UUID
14+
import java.util.*
1515

1616
typealias AnalyticsContext = JsonObject
1717
typealias Integrations = JsonObject
@@ -66,6 +66,8 @@ enum class EventType {
6666
* @see GroupEvent
6767
* @see AliasEvent
6868
*/
69+
70+
@Serializable(with = BaseEventSerializer::class)
6971
sealed class BaseEvent {
7072
// The type of event
7173
abstract var type: EventType
@@ -351,3 +353,16 @@ data class ScreenEvent(
351353
return result
352354
}
353355
}
356+
357+
object BaseEventSerializer : JsonContentPolymorphicSerializer<BaseEvent>(BaseEvent::class) {
358+
override fun selectDeserializer(element: JsonElement): DeserializationStrategy<out BaseEvent> {
359+
return when (element.jsonObject["type"]?.jsonPrimitive?.content) {
360+
"track" -> TrackEvent.serializer()
361+
"screen" -> ScreenEvent.serializer()
362+
"alias" -> AliasEvent.serializer()
363+
"identify" -> IdentifyEvent.serializer()
364+
"group" -> GroupEvent.serializer()
365+
else -> throw Exception("Unknown Event: key 'type' not found or does not matches any event type")
366+
}
367+
}
368+
}

core/src/main/java/com/segment/analytics/kotlin/core/platform/EventPipeline.kt

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,36 @@
11
package com.segment.analytics.kotlin.core.platform
22

33
import com.segment.analytics.kotlin.core.*
4-
import com.segment.analytics.kotlin.core.platform.plugins.logger.*
5-
import kotlinx.coroutines.*
4+
import com.segment.analytics.kotlin.core.platform.plugins.logger.LogFilterKind
5+
import com.segment.analytics.kotlin.core.platform.plugins.logger.log
6+
import com.segment.analytics.kotlin.core.platform.plugins.logger.segmentLog
7+
import com.segment.analytics.kotlin.core.platform.policies.FlushPolicy
8+
import com.segment.analytics.kotlin.core.utilities.EncodeDefaultsJson
69
import kotlinx.coroutines.channels.Channel
710
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
811
import kotlinx.coroutines.channels.consumeEach
12+
import kotlinx.coroutines.launch
13+
import kotlinx.coroutines.withContext
14+
import kotlinx.serialization.encodeToString
15+
import kotlinx.serialization.json.Json
16+
import kotlinx.serialization.json.encodeToJsonElement
17+
import kotlinx.serialization.json.jsonObject
18+
import kotlinx.serialization.json.jsonPrimitive
919
import java.io.File
1020
import java.io.FileInputStream
11-
import java.lang.Exception
12-
import java.util.concurrent.atomic.AtomicInteger
1321

1422
internal class EventPipeline(
1523
private val analytics: Analytics,
1624
private val logTag: String,
1725
apiKey: String,
18-
private val flushCount: Int = 20,
19-
private val flushIntervalInMillis: Long = 30_000, // 30s
26+
private val flushPolicies: List<FlushPolicy>,
2027
var apiHost: String = Constants.DEFAULT_API_HOST
2128
) {
2229

23-
private val writeChannel: Channel<String>
30+
private val writeChannel: Channel<BaseEvent>
2431

2532
private val uploadChannel: Channel<String>
2633

27-
private val eventCount: AtomicInteger = AtomicInteger(0)
28-
2934
private val httpClient: HTTPClient = HTTPClient(apiKey)
3035

3136
private val storage get() = analytics.storage
@@ -37,7 +42,7 @@ internal class EventPipeline(
3742

3843
companion object {
3944
internal const val FLUSH_POISON = "#!flush"
40-
45+
internal val FLUSH_EVENT = ScreenEvent(FLUSH_POISON, FLUSH_POISON, emptyJsonObject).apply { messageId = FLUSH_POISON }
4146
internal const val UPLOAD_SIG = "#!upload"
4247
}
4348

@@ -50,12 +55,12 @@ internal class EventPipeline(
5055
registerShutdownHook()
5156
}
5257

53-
fun put(event: String) {
58+
fun put(event: BaseEvent) {
5459
writeChannel.trySend(event)
5560
}
5661

5762
fun flush() {
58-
writeChannel.trySend(FLUSH_POISON)
63+
writeChannel.trySend(FLUSH_EVENT)
5964
}
6065

6166
fun start() {
@@ -68,32 +73,47 @@ internal class EventPipeline(
6873
fun stop() {
6974
uploadChannel.cancel()
7075
writeChannel.cancel()
76+
unschedule()
7177
running = false
7278
}
7379

80+
fun stringifyBaseEvent(payload: BaseEvent): String {
81+
val finalPayload = EncodeDefaultsJson.encodeToJsonElement(payload)
82+
.jsonObject.filterNot { (k, v) ->
83+
// filter out empty userId and traits values
84+
(k == "userId" && v.jsonPrimitive.content.isBlank()) || (k == "traits" && v == emptyJsonObject)
85+
}
86+
87+
val stringVal = Json.encodeToString(finalPayload)
88+
return stringVal
89+
}
90+
7491
private fun write() = scope.launch(analytics.fileIODispatcher) {
7592
for (event in writeChannel) {
7693
// write to storage
77-
val isPoison = (event == FLUSH_POISON)
94+
val isPoison = (event.messageId == FLUSH_POISON)
7895
if (!isPoison) try {
79-
storage.write(Storage.Constants.Events, event)
96+
val stringVal = stringifyBaseEvent(event)
97+
analytics.log("$logTag running $stringVal")
98+
storage.write(Storage.Constants.Events, stringVal)
99+
100+
flushPolicies.forEach { flushPolicy -> flushPolicy.updateState(event) }
80101
}
81102
catch (e : Exception) {
82103
Analytics.segmentLog("Error adding payload: $event", kind = LogFilterKind.ERROR)
83104
}
84105

85106
// if flush condition met, generate paths
86-
if (eventCount.incrementAndGet() >= flushCount || isPoison) {
87-
eventCount.set(0)
107+
if (isPoison || flushPolicies.any { it.shouldFlush() }) {
88108
uploadChannel.trySend(UPLOAD_SIG)
109+
flushPolicies.forEach { it.reset() }
89110
}
90111
}
91112
}
92113

93114
private fun upload() = scope.launch(analytics.networkIODispatcher) {
94115
uploadChannel.consumeEach {
95116
analytics.log("$logTag performing flush")
96-
97117
withContext(analytics.fileIODispatcher) {
98118
storage.rollover()
99119
}
@@ -130,20 +150,16 @@ internal class EventPipeline(
130150
}
131151
}
132152

133-
private fun schedule() = scope.launch(analytics.fileIODispatcher) {
134-
if (flushIntervalInMillis > 0) {
135-
while (isActive && running) {
136-
flush()
153+
private fun schedule() {
154+
flushPolicies.forEach { it.schedule(analytics) }
155+
}
137156

138-
// use delay to do periodical task
139-
// this is doable in coroutine, since delay only suspends, allowing thread to
140-
// do other work and then come back. see:
141-
// https://github.com/Kotlin/kotlinx.coroutines/issues/1632#issuecomment-545693827
142-
delay(flushIntervalInMillis)
143-
}
144-
}
157+
private fun unschedule() {
158+
flushPolicies.forEach { it.unschedule() }
145159
}
146160

161+
162+
147163
private fun handleUploadException(e: Exception, file: File): Boolean {
148164
var shouldCleanup = false
149165
if (e is HTTPException) {

core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/SegmentDestination.kt

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,14 @@
11
package com.segment.analytics.kotlin.core.platform.plugins
22

33
import com.segment.analytics.kotlin.core.*
4-
import com.segment.analytics.kotlin.core.Constants.DEFAULT_API_HOST
54
import com.segment.analytics.kotlin.core.platform.DestinationPlugin
65
import com.segment.analytics.kotlin.core.platform.EventPipeline
76
import com.segment.analytics.kotlin.core.platform.Plugin
87
import com.segment.analytics.kotlin.core.platform.VersionedPlugin
9-
import com.segment.analytics.kotlin.core.platform.plugins.logger.log
10-
import com.segment.analytics.kotlin.core.utilities.EncodeDefaultsJson
8+
import com.segment.analytics.kotlin.core.platform.policies.CountBasedFlushPolicy
9+
import com.segment.analytics.kotlin.core.platform.policies.FlushPolicy
10+
import com.segment.analytics.kotlin.core.platform.policies.FrequencyFlushPolicy
1111
import kotlinx.serialization.Serializable
12-
import kotlinx.serialization.encodeToString
13-
import kotlinx.serialization.json.Json
14-
import kotlinx.serialization.json.encodeToJsonElement
15-
import kotlinx.serialization.json.jsonObject
16-
import kotlinx.serialization.json.jsonPrimitive
1712

1813
@Serializable
1914
data class SegmentSettings(
@@ -28,10 +23,10 @@ data class SegmentSettings(
2823
* - We store events into a file with the batch api format (@link {https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#batch})
2924
* - We upload events on a dedicated thread using the batch api
3025
*/
31-
class SegmentDestination : DestinationPlugin(), VersionedPlugin {
26+
class SegmentDestination: DestinationPlugin(), VersionedPlugin {
3227

3328
private lateinit var pipeline: EventPipeline
34-
29+
var flushPolicies: List<FlushPolicy> = emptyList()
3530
override val key: String = "Segment.io"
3631

3732
override fun track(payload: TrackEvent): BaseEvent {
@@ -59,23 +54,24 @@ class SegmentDestination : DestinationPlugin(), VersionedPlugin {
5954
return payload
6055
}
6156

62-
private inline fun <reified T : BaseEvent> enqueue(payload: T) {
63-
// needs to be inline reified for encoding using Json
64-
val finalPayload = EncodeDefaultsJson.encodeToJsonElement(payload)
65-
.jsonObject.filterNot { (k, v) ->
66-
// filter out empty userId and traits values
67-
(k == "userId" && v.jsonPrimitive.content.isBlank()) || (k == "traits" && v == emptyJsonObject)
68-
}
69-
70-
val stringVal = Json.encodeToString(finalPayload)
71-
analytics.log("$key running $stringVal")
7257

73-
pipeline.put(stringVal)
58+
private fun enqueue(payload: BaseEvent) {
59+
pipeline.put(payload)
7460
}
7561

7662
override fun setup(analytics: Analytics) {
7763
super.setup(analytics)
7864

65+
// convert flushAt and flushIntervals into FlushPolicies
66+
flushPolicies = if (analytics.configuration.flushPolicies.isEmpty()) {
67+
listOf(
68+
CountBasedFlushPolicy(analytics.configuration.flushAt),
69+
FrequencyFlushPolicy(analytics.configuration.flushInterval * 1000L)
70+
)
71+
} else {
72+
analytics.configuration.flushPolicies
73+
}
74+
7975
// Add DestinationMetadata enrichment plugin
8076
add(DestinationMetadataPlugin())
8177

@@ -84,8 +80,7 @@ class SegmentDestination : DestinationPlugin(), VersionedPlugin {
8480
analytics,
8581
key,
8682
configuration.writeKey,
87-
configuration.flushAt,
88-
configuration.flushInterval * 1000L,
83+
flushPolicies,
8984
configuration.apiHost
9085
)
9186
pipeline.start()
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.segment.analytics.kotlin.core.platform.policies
2+
3+
import com.segment.analytics.kotlin.core.BaseEvent
4+
5+
/**
6+
* A Count based Flush Policy that instructs the EventPipeline to flush at the
7+
* given @param[flushAt]. The default value is 20. @param[flushAt] values should
8+
* be >= 1 or they'll get the default value.
9+
*/
10+
class CountBasedFlushPolicy(var flushAt: Int = 20): FlushPolicy {
11+
12+
13+
init {
14+
// Make sure to only take valid counts or fallback to our default.
15+
flushAt = when {
16+
flushAt >= 1 -> flushAt
17+
else -> 20
18+
}
19+
}
20+
21+
private var count: Int = 0
22+
23+
override fun shouldFlush(): Boolean {
24+
return count >= flushAt
25+
}
26+
27+
override fun updateState(event: BaseEvent) {
28+
count++
29+
}
30+
31+
override fun reset() {
32+
count = 0
33+
}
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.segment.analytics.kotlin.core.platform.policies
2+
3+
import com.segment.analytics.kotlin.core.Analytics
4+
import com.segment.analytics.kotlin.core.BaseEvent
5+
import kotlinx.coroutines.CoroutineScope
6+
7+
interface FlushPolicy {
8+
9+
/**
10+
* Called when the policy becomes active. We should start any periodic flushing
11+
* we want here.
12+
*/
13+
fun schedule(analytics: Analytics) = Unit
14+
15+
/**
16+
* Called when policy should stop running any scheduled flushes
17+
*/
18+
fun unschedule() = Unit
19+
20+
/**
21+
* Called to check whether or not the events should be flushed.
22+
*/
23+
fun shouldFlush(): Boolean
24+
25+
/**
26+
* Called as events are added to the timeline and JSON Stringified.
27+
*/
28+
fun updateState(event: BaseEvent) = Unit
29+
30+
/**
31+
* Called after the events are flushed.
32+
*/
33+
fun reset() = Unit
34+
}

0 commit comments

Comments
 (0)