Skip to content

Commit 962b43a

Browse files
authored
Startup Queue Logic (#10)
* initial changes * init StartupQueue.kt * refactor state and settings update logc * remove unwanted logs
1 parent 1b701e8 commit 962b43a

File tree

5 files changed

+111
-27
lines changed

5 files changed

+111
-27
lines changed

analytics-kotlin/src/main/java/com/segment/analytics/Analytics.kt

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.segment.analytics
33
import com.segment.analytics.platform.DestinationPlugin
44
import com.segment.analytics.platform.Plugin
55
import com.segment.analytics.platform.Timeline
6+
import com.segment.analytics.platform.plugins.StartupQueue
67
import com.segment.analytics.platform.plugins.log
78
import kotlinx.coroutines.CoroutineDispatcher
89
import kotlinx.coroutines.CoroutineScope
@@ -60,22 +61,10 @@ class Analytics(internal val configuration: Configuration) : Subscriber {
6061

6162
// subscribe to store after state is provided
6263
storage.subscribeToStore()
63-
64-
// check settings
65-
// subscribe to ensure plugins get settings updates
66-
it.subscribe(
67-
this,
68-
initialState = true,
69-
stateClazz = System::class,
70-
queue = processingDispatcher
71-
) { state: System ->
72-
state.settings?.let { settings ->
73-
timeline.applyClosure { plugin -> plugin.update(settings) }
74-
}
75-
}
7664
}
7765

7866
checkSettings()
67+
add(StartupQueue())
7968
if (configuration.autoAddSegmentDestination) {
8069
add(
8170
SegmentDestination(
@@ -362,7 +351,6 @@ class Analytics(internal val configuration: Configuration) : Subscriber {
362351
* @param plugin [Plugin] to be added
363352
*/
364353
fun add(plugin: Plugin): Analytics {
365-
// could be done in background thread
366354
this.timeline.add(plugin)
367355
if (plugin is DestinationPlugin && plugin.name != "Segment.io") {
368356
analyticsScope.launch(ioDispatcher) {

analytics-kotlin/src/main/java/com/segment/analytics/Settings.kt

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import kotlinx.serialization.decodeFromString
88
import kotlinx.serialization.json.Json
99
import kotlinx.serialization.json.JsonObject
1010
import java.io.BufferedReader
11-
import java.util.zip.GZIPInputStream
1211

1312
@Serializable
1413
data class Settings(
@@ -17,25 +16,42 @@ data class Settings(
1716
var edgeFunction: JsonObject = emptyJsonObject,
1817
)
1918

19+
internal fun Analytics.update(settings: Settings) {
20+
timeline.applyClosure { plugin ->
21+
// tell all top level plugins to update.
22+
// For destination plugins they auto-handle propagation to sub-plugins
23+
plugin.update(settings)
24+
}
25+
}
26+
2027
/**
2128
* Make analytics client call into Segment's settings API, to refresh certain configurations.
2229
*/
2330
fun Analytics.checkSettings() {
2431
val writeKey = configuration.writeKey
25-
val defaultSettings = configuration.defaultSettings
32+
33+
// stop things; queue in case our settings have changed.
34+
store.dispatch(System.ToggleRunningAction(running = false), System::class)
35+
2636
analyticsScope.launch(ioDispatcher) {
2737
log("Fetching settings on ${Thread.currentThread().name}")
28-
val settingsObj: Settings = try {
38+
val settingsObj: Settings? = try {
2939
val connection = HTTPClient().settings(writeKey)
3040
val settingsString =
3141
connection.inputStream?.bufferedReader()?.use(BufferedReader::readText) ?: ""
3242
log("Fetched Settings: $settingsString")
3343
Json { ignoreUnknownKeys = true }.decodeFromString(settingsString)
3444
} catch (ex: Exception) {
3545
log(message = "${ex.message}: failed to fetch settings", type = LogType.ERROR)
36-
defaultSettings
46+
null
47+
}
48+
settingsObj?.let {
49+
log("Dispatching update settings on ${Thread.currentThread().name}")
50+
store.dispatch(System.UpdateSettingsAction(settingsObj), System::class)
51+
update(settingsObj)
3752
}
38-
log("Dispatching update settings on ${Thread.currentThread().name}")
39-
store.dispatch(System.UpdateSettingsAction(settingsObj), System::class)
53+
54+
// we're good to go back to a running state.
55+
store.dispatch(System.ToggleRunningAction(running = true), System::class)
4056
}
4157
}

analytics-kotlin/src/main/java/com/segment/analytics/State.kt

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import kotlin.collections.set
1515
data class System(
1616
var configuration: Configuration = Configuration(""),
1717
var integrations: Integrations?,
18-
var settings: Settings?
18+
var settings: Settings?,
19+
var running: Boolean
1920
) : State {
2021

2122
companion object {
@@ -32,6 +33,7 @@ data class System(
3233
configuration = configuration,
3334
integrations = emptyJsonObject,
3435
settings = settings,
36+
running = false
3537
)
3638
}
3739
}
@@ -41,7 +43,8 @@ data class System(
4143
return System(
4244
state.configuration,
4345
state.integrations,
44-
settings
46+
settings,
47+
state.running
4548
)
4649
}
4750
}
@@ -58,7 +61,8 @@ data class System(
5861
return System(
5962
state.configuration,
6063
JsonObject(newIntegrations),
61-
state.settings
64+
state.settings,
65+
state.running
6266
)
6367
}
6468
return state
@@ -73,12 +77,24 @@ data class System(
7377
return System(
7478
state.configuration,
7579
JsonObject(newIntegrations),
76-
state.settings
80+
state.settings,
81+
state.running
7782
)
7883
}
7984
return state
8085
}
8186
}
87+
88+
class ToggleRunningAction(var running: Boolean): Action<System> {
89+
override fun reduce(state: System): System {
90+
return System(
91+
state.configuration,
92+
state.integrations,
93+
state.settings,
94+
running
95+
)
96+
}
97+
}
8298
}
8399

84100
data class UserInfo(
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.segment.analytics.platform.plugins
2+
3+
import com.segment.analytics.Analytics
4+
import com.segment.analytics.BaseEvent
5+
import com.segment.analytics.platform.Plugin
6+
import com.segment.analytics.System
7+
import sovran.kotlin.Subscriber
8+
import java.util.Queue
9+
import java.util.concurrent.ConcurrentLinkedQueue
10+
import java.util.concurrent.atomic.AtomicBoolean
11+
12+
class StartupQueue(): Plugin, Subscriber {
13+
override val type: Plugin.Type = Plugin.Type.Before
14+
override val name: String = "Segment_StartupQueue"
15+
override lateinit var analytics: Analytics
16+
17+
private val maxSize = 1000
18+
private val started: AtomicBoolean = AtomicBoolean(false)
19+
private val queuedEvents: Queue<BaseEvent> = ConcurrentLinkedQueue()
20+
21+
override fun setup(analytics: Analytics) {
22+
super.setup(analytics)
23+
analytics.store.subscribe(
24+
subscriber = this,
25+
stateClazz = System::class,
26+
initialState = true,
27+
handler = this::runningUpdate
28+
)
29+
}
30+
31+
override fun execute(event: BaseEvent): BaseEvent? {
32+
if (!started.get()) {
33+
analytics.log("$name queueing event", event = event)
34+
// timeline hasn't started, so queue it up.
35+
if (queuedEvents.size >= maxSize) {
36+
// if we've exceeded the max queue size start dropping events
37+
queuedEvents.remove()
38+
}
39+
queuedEvents.offer(event)
40+
return null
41+
}
42+
// the timeline has started, so let the event pass.
43+
return event
44+
}
45+
46+
// Handler to manage system update
47+
private fun runningUpdate(state: System) {
48+
analytics.log("Analytics starting = ${state.running}")
49+
started.set(state.running)
50+
if (started.get()) {
51+
replayEvents()
52+
}
53+
}
54+
55+
private fun replayEvents() {
56+
// replay the queued events to the instance of Analytics we're working with.
57+
for (event in queuedEvents) {
58+
analytics.process(event)
59+
}
60+
queuedEvents.clear()
61+
}
62+
}

analytics-kotlin/src/test/java/com/segment/analytics/main/StorageTests.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.segment.analytics.main
33
import android.content.Context
44
import android.content.SharedPreferences
55
import com.segment.analytics.*
6+
import com.segment.analytics.System
67
import com.segment.analytics.main.utils.MemorySharedPreferences
78
import com.segment.analytics.main.utils.mockAnalytics
89
import com.segment.analytics.main.utils.mockContext
@@ -20,7 +21,6 @@ import java.util.*
2021
import kotlinx.serialization.encodeToString
2122
import kotlinx.serialization.json.*
2223
import org.junit.jupiter.api.Assertions.*
23-
import java.io.IOException
2424

2525
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
2626
class StorageTests {
@@ -47,7 +47,8 @@ class StorageTests {
4747
System(
4848
configuration = Configuration("123"),
4949
integrations = emptyJsonObject,
50-
settings = Settings()
50+
settings = Settings(),
51+
false
5152
)
5253
)
5354

@@ -103,7 +104,8 @@ class StorageTests {
103104
},
104105
plan = emptyJsonObject,
105106
edgeFunction = emptyJsonObject
106-
)
107+
),
108+
false
107109
)
108110
}
109111
}

0 commit comments

Comments
 (0)