Skip to content

Commit 25af500

Browse files
authored
Wenxi/replace flush upload with channel api (#53)
* suspendable http client * minimize the number of times of open/close file * add cache to query of current file * init pipeline * file api update * connect pipeline with segment destination * connect httpclient with settings * replace jdk 11 httpclient with ktor * make flush thread safe * bug fix * settings bug fix * bug fix * replace the httpclient with the old implementation * optimization * make file writing thread safe * address comments * nit * fix broken unit tests * add unit tests for event pipeline * add comment to rollover method * update sovran dependency
1 parent 43a6501 commit 25af500

File tree

20 files changed

+571
-341
lines changed

20 files changed

+571
-341
lines changed

android/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ android {
4646
dependencies {
4747
// MAIN DEPS
4848
api project(':core')
49-
api 'com.github.segmentio:sovran-kotlin:1.1.0'
49+
api 'com.github.segmentio:sovran-kotlin:1.2.0'
5050
api "org.jetbrains.kotlinx:kotlinx-serialization-json:1.2.2"
5151
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2'
5252
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.5.2'

android/src/main/java/com/segment/analytics/kotlin/android/Storage.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class AndroidStorage(
4646
)
4747
}
4848

49-
override fun write(key: Storage.Constants, value: String) {
49+
override suspend fun write(key: Storage.Constants, value: String) {
5050
when (key) {
5151
Storage.Constants.Events -> {
5252
if (value.length < MAX_PAYLOAD_SIZE) {
@@ -92,6 +92,10 @@ class AndroidStorage(
9292
override fun removeFile(filePath: String): Boolean {
9393
return eventsFile.remove(filePath)
9494
}
95+
96+
override suspend fun rollover() {
97+
eventsFile.rollover()
98+
}
9599
}
96100

97101
object AndroidStorageProvider : StorageProvider {

android/src/main/java/com/segment/analytics/kotlin/android/plugins/AndroidLifecyclePlugin.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class AndroidLifecyclePlugin() : Application.ActivityLifecycleCallbacks, Default
7272
}
7373
}
7474

75-
private fun runOnAnalyticsThread(block: () -> Unit) = with(analytics) {
75+
private fun runOnAnalyticsThread(block: suspend () -> Unit) = with(analytics) {
7676
analyticsScope.launch(analyticsDispatcher) {
7777
block()
7878
}
@@ -264,8 +264,10 @@ class AndroidLifecyclePlugin() : Application.ActivityLifecycleCallbacks, Default
264264
}
265265

266266
// Update the recorded version.
267-
storage.write(Storage.Constants.AppVersion, currentVersion)
268-
storage.write(Storage.Constants.AppBuild, currentBuild)
267+
runOnAnalyticsThread {
268+
storage.write(Storage.Constants.AppVersion, currentVersion)
269+
storage.write(Storage.Constants.AppBuild, currentBuild)
270+
}
269271
}
270272

271273
fun unregisterListeners() {

android/src/test/java/com/segment/analytics/kotlin/android/AndroidContextCollectorTests.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.segment.analytics.kotlin.android.plugins.AndroidContextPlugin
88
import com.segment.analytics.kotlin.android.utils.MemorySharedPreferences
99
import io.mockk.every
1010
import io.mockk.spyk
11+
import kotlinx.coroutines.runBlocking
1112
import kotlinx.serialization.json.*
1213
import org.junit.Assert.*
1314
import org.junit.Test
@@ -95,7 +96,7 @@ class AndroidContextCollectorTests {
9596
}
9697

9798
@Test
98-
fun `getDeviceId returns anonId when disabled`() {
99+
fun `getDeviceId returns anonId when disabled`() = runBlocking {
99100
analytics.storage.write(Storage.Constants.AnonymousId, "anonId")
100101
val contextCollector = AndroidContextPlugin()
101102
contextCollector.setup(analytics)

android/src/test/java/com/segment/analytics/kotlin/android/AndroidLifecyclePluginTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class AndroidLifecyclePluginTests {
206206
}
207207

208208
@Test
209-
fun `application updated is tracked`() {
209+
fun `application updated is tracked`() = runBlocking {
210210
analytics.configuration.trackApplicationLifecycleEvents = true
211211
analytics.configuration.trackDeepLinks = false
212212
analytics.configuration.useLifecycleObserver = false

android/src/test/java/com/segment/analytics/kotlin/android/EventsFileTests.kt

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.segment.analytics.kotlin.core.utilities.EncodeDefaultsJson
77
import com.segment.analytics.kotlin.core.utilities.EventsFileManager
88
import io.mockk.every
99
import io.mockk.mockkStatic
10+
import kotlinx.coroutines.runBlocking
1011
import kotlinx.serialization.encodeToString
1112
import kotlinx.serialization.json.Json
1213
import kotlinx.serialization.json.buildJsonObject
@@ -39,7 +40,7 @@ class EventsFileTests {
3940
}
4041

4142
@Test
42-
fun `check if event is stored correctly and creates new file`() {
43+
fun `check if event is stored correctly and creates new file`() = runBlocking {
4344
val file = EventsFileManager(directory, "123", kvStore)
4445
val trackEvent = TrackEvent(
4546
event = "clicked",
@@ -62,7 +63,7 @@ class EventsFileTests {
6263
}
6364

6465
@Test
65-
fun `storeEvent stores in existing file if available`() {
66+
fun `storeEvent stores in existing file if available`() = runBlocking {
6667
val file = EventsFileManager(directory, "123", kvStore)
6768
val trackEvent = TrackEvent(
6869
event = "clicked",
@@ -86,7 +87,7 @@ class EventsFileTests {
8687
}
8788

8889
@Test
89-
fun `storeEvent creates new file when at capacity and closes other file`() {
90+
fun `storeEvent creates new file when at capacity and closes other file`() = runBlocking {
9091
val file = EventsFileManager(directory, "123", kvStore)
9192
val trackEvent = TrackEvent(
9293
event = "clicked",
@@ -117,13 +118,14 @@ class EventsFileTests {
117118
}
118119

119120
@Test
120-
fun `read returns empty list when no events stored`() {
121+
fun `read returns empty list when no events stored`() = runBlocking {
121122
val file = EventsFileManager(directory, "123", kvStore)
123+
file.rollover()
122124
assertTrue(file.read().isEmpty())
123125
}
124126

125127
@Test
126-
fun `read finishes open file and lists it`() {
128+
fun `read finishes open file and lists it`() = runBlocking {
127129
val file = EventsFileManager(directory, "123", kvStore)
128130
val trackEvent = TrackEvent(
129131
event = "clicked",
@@ -138,6 +140,7 @@ class EventsFileTests {
138140
val eventString = EncodeDefaultsJson.encodeToString(trackEvent)
139141
file.storeEvent(eventString)
140142

143+
file.rollover()
141144
val fileUrls = file.read()
142145
assertEquals(1, fileUrls.size)
143146
val expectedContents = """ {"batch":[${eventString}],"sentAt":"$epochTimestamp"} """.trim()
@@ -148,7 +151,7 @@ class EventsFileTests {
148151
}
149152

150153
@Test
151-
fun `multiple reads doesnt create extra files`() {
154+
fun `multiple reads doesnt create extra files`() = runBlocking {
152155
val file = EventsFileManager(directory, "123", kvStore)
153156
val trackEvent = TrackEvent(
154157
event = "clicked",
@@ -163,6 +166,7 @@ class EventsFileTests {
163166
val eventString = EncodeDefaultsJson.encodeToString(trackEvent)
164167
file.storeEvent(eventString)
165168

169+
file.rollover()
166170
file.read().let {
167171
assertEquals(1, it.size)
168172
val expectedContents =
@@ -173,14 +177,15 @@ class EventsFileTests {
173177
assertEquals(expectedContents, actualContents)
174178
}
175179
// second read is a no-op
180+
file.rollover()
176181
file.read().let {
177182
assertEquals(1, it.size)
178183
assertEquals(1, directory.list()!!.size)
179184
}
180185
}
181186

182187
@Test
183-
fun `read lists all available files for writekey`() {
188+
fun `read lists all available files for writekey`() = runBlocking {
184189
val trackEvent = TrackEvent(
185190
event = "clicked",
186191
properties = buildJsonObject { put("behaviour", "good") })
@@ -199,6 +204,9 @@ class EventsFileTests {
199204
file1.storeEvent(eventString)
200205
file2.storeEvent(eventString)
201206

207+
file1.rollover()
208+
file2.rollover()
209+
202210
assertEquals(listOf("/tmp/analytics-android-test/123-0"), file1.read())
203211
assertEquals(listOf("/tmp/analytics-android-test/qwerty-0"), file2.read())
204212
}
@@ -233,7 +241,7 @@ class EventsFileTests {
233241
// }
234242

235243
@Test
236-
fun `remove deletes file`() {
244+
fun `remove deletes file`() = runBlocking {
237245
val file = EventsFileManager(directory, "123", kvStore)
238246
val trackEvent = TrackEvent(
239247
event = "clicked",
@@ -248,6 +256,7 @@ class EventsFileTests {
248256
val eventString = EncodeDefaultsJson.encodeToString(trackEvent)
249257
file.storeEvent(eventString)
250258

259+
file.rollover()
251260
val list = file.read()
252261
file.remove(list[0])
253262

android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class StorageTests {
129129
val map = getWorkingMap(mockContext.getSharedPreferences("", 0))
130130

131131
@Test
132-
fun `write updates sharedPreferences`() {
132+
fun `write updates sharedPreferences`() = runBlocking {
133133
androidStorage.write(Storage.Constants.AppVersion, "100")
134134
assertEquals("100", map["segment.app.version"])
135135
}
@@ -151,7 +151,7 @@ class StorageTests {
151151
inner class EventsStorage() {
152152

153153
@Test
154-
fun `writing events writes to eventsFile`() {
154+
fun `writing events writes to eventsFile`() = runBlocking {
155155
val event = TrackEvent(
156156
event = "clicked",
157157
properties = buildJsonObject { put("behaviour", "good") })
@@ -164,26 +164,33 @@ class StorageTests {
164164
}
165165
val stringified: String = Json.encodeToString(event)
166166
androidStorage.write(Storage.Constants.Events, stringified)
167+
androidStorage.eventsFile.rollover()
167168
val storagePath = androidStorage.eventsFile.read()[0]
168169
val storageContents = File(storagePath).readText()
169170
val jsonFormat = Json.decodeFromString(JsonObject.serializer(), storageContents)
170171
assertEquals(1, jsonFormat["batch"]!!.jsonArray.size)
171172
}
172173

173174
@Test
174-
fun `cannot write more than 32kb as event`() {
175+
fun `cannot write more than 32kb as event`() = runBlocking {
175176
val stringified: String = "A".repeat(32002)
176-
assertThrows(Exception::class.java) {
177+
val exception = try {
177178
androidStorage.write(
178179
Storage.Constants.Events,
179180
stringified
180181
)
182+
null
181183
}
184+
catch (e: Exception) {
185+
e
186+
}
187+
assertNotNull(exception)
188+
androidStorage.eventsFile.rollover()
182189
assertTrue(androidStorage.eventsFile.read().isEmpty())
183190
}
184191

185192
@Test
186-
fun `reading events returns a non-null file handle with correct events`() {
193+
fun `reading events returns a non-null file handle with correct events`() = runBlocking {
187194
val event = TrackEvent(
188195
event = "clicked",
189196
properties = buildJsonObject { put("behaviour", "good") })
@@ -197,6 +204,7 @@ class StorageTests {
197204
val stringified: String = Json.encodeToString(event)
198205
androidStorage.write(Storage.Constants.Events, stringified)
199206

207+
androidStorage.eventsFile.rollover()
200208
val fileUrl = androidStorage.read(Storage.Constants.Events)
201209
assertNotNull(fileUrl)
202210
fileUrl!!.let {
@@ -216,13 +224,14 @@ class StorageTests {
216224
}
217225

218226
@Test
219-
fun `reading events with empty storage return empty list`() {
227+
fun `reading events with empty storage return empty list`() = runBlocking {
228+
androidStorage.eventsFile.rollover()
220229
val fileUrls = androidStorage.read(Storage.Constants.Events)
221230
assertTrue(fileUrls!!.isEmpty())
222231
}
223232

224233
@Test
225-
fun `can write and read multiple events`() {
234+
fun `can write and read multiple events`() = runBlocking {
226235
val event1 = TrackEvent(
227236
event = "clicked",
228237
properties = buildJsonObject { put("behaviour", "good") })
@@ -248,6 +257,7 @@ class StorageTests {
248257
androidStorage.write(Storage.Constants.Events, stringified1)
249258
androidStorage.write(Storage.Constants.Events, stringified2)
250259

260+
androidStorage.rollover()
251261
val fileUrl = androidStorage.read(Storage.Constants.Events)
252262
assertNotNull(fileUrl)
253263
fileUrl!!.let {

core/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ test {
1616
dependencies {
1717
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
1818
// MAIN DEPS
19-
api 'com.github.segmentio:sovran-kotlin:1.1.0'
19+
api 'com.github.segmentio:sovran-kotlin:1.2.0'
2020
api "org.jetbrains.kotlinx:kotlinx-serialization-json:1.2.2"
2121
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2'
2222

core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,7 @@ class Analytics internal constructor(
8181
checkSettings()
8282

8383
if (configuration.autoAddSegmentDestination) {
84-
add(
85-
SegmentDestination(
86-
configuration.writeKey,
87-
configuration.flushAt,
88-
configuration.flushInterval * 1000L,
89-
configuration.apiHost
90-
)
91-
)
84+
add(SegmentDestination())
9285
}
9386
}
9487
}

core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ suspend fun Analytics.checkSettings() {
5353
val writeKey = configuration.writeKey
5454
val cdnHost = configuration.cdnHost
5555

56+
// check current system state to determine whether it's initial or refresh
57+
val systemState = store.currentState(System::class)
58+
val hasSettings = systemState?.settings?.integrations != null &&
59+
systemState.settings?.plan != null
60+
val updateType = if (hasSettings) Plugin.UpdateType.Refresh else Plugin.UpdateType.Initial
61+
5662
// stop things; queue in case our settings have changed.
5763
store.dispatch(System.ToggleRunningAction(running = false), System::class)
5864

@@ -68,23 +74,15 @@ suspend fun Analytics.checkSettings() {
6874
log(message = "${ex.message}: failed to fetch settings", type = LogType.ERROR)
6975
null
7076
}
71-
settingsObj?.let {
72-
log("Dispatching update settings on ${Thread.currentThread().name}")
73-
74-
// check current system state to determine whether it's initial or refresh
75-
val systemState = store.currentState(System::class)
76-
val hasSettings = systemState?.settings?.integrations != null &&
77-
systemState.settings?.plan != null
78-
val updateType = if (hasSettings) Plugin.UpdateType.Refresh else Plugin.UpdateType.Initial
7977

80-
withContext(analyticsDispatcher) {
78+
withContext(analyticsDispatcher) {
79+
settingsObj?.let {
80+
log("Dispatching update settings on ${Thread.currentThread().name}")
8181
store.dispatch(System.UpdateSettingsAction(settingsObj), System::class)
82+
update(settingsObj, updateType)
8283
}
83-
update(settingsObj, updateType)
84-
}
8584

86-
// we're good to go back to a running state.
87-
withContext(analyticsDispatcher) {
85+
// we're good to go back to a running state.
8886
store.dispatch(System.ToggleRunningAction(running = true), System::class)
8987
}
9088
}

0 commit comments

Comments
 (0)