Skip to content

Commit 6f3d16f

Browse files
authored
fix: offline events storage sync (#498)
1 parent 8b12982 commit 6f3d16f

File tree

5 files changed

+96
-20
lines changed

5 files changed

+96
-20
lines changed

buildSrc/src/main/kotlin/io.customer/android/Versions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ object Versions {
3030
internal const val MOCKITO = "4.8.1"
3131
internal const val MOCKK = "1.12.2"
3232
internal const val MOSHI = "1.14.0"
33-
internal const val SEGMENT = "1.16.3"
33+
internal const val SEGMENT = "1.19.1"
3434
internal const val TIMBER = "5.0.0"
3535
internal const val REDUX_KOTLIN = "0.6.0"
3636
internal const val ROBOLECTRIC = "4.9"

datapipelines/src/main/kotlin/io/customer/datapipelines/extensions/AnalyticsExt.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package io.customer.datapipelines.extensions
22

33
import com.segment.analytics.kotlin.core.Configuration
44
import com.segment.analytics.kotlin.core.ErrorHandler
5+
import com.segment.analytics.kotlin.core.Settings
6+
import com.segment.analytics.kotlin.core.emptyJsonObject
57
import io.customer.datapipelines.config.DataPipelinesModuleConfig
8+
import io.customer.datapipelines.plugins.CUSTOMER_IO_DATA_PIPELINES
9+
import kotlinx.serialization.json.buildJsonObject
610

711
/**
812
* Updates analytics configuration using the provided [DataPipelinesModuleConfig].
@@ -17,6 +21,15 @@ internal fun updateAnalyticsConfig(
1721
// Force set to false as we don't need to forward events to Segment destination
1822
// User can disable CIO destination to achieve same results
1923
this.autoAddSegmentDestination = false
24+
this.defaultSettings = this.defaultSettings?.copy(
25+
integrations = buildJsonObject {
26+
put(CUSTOMER_IO_DATA_PIPELINES, emptyJsonObject)
27+
}
28+
) ?: Settings(
29+
integrations = buildJsonObject {
30+
put(CUSTOMER_IO_DATA_PIPELINES, emptyJsonObject)
31+
}
32+
)
2033
this.trackApplicationLifecycleEvents = moduleConfig.trackApplicationLifecycleEvents
2134
this.apiHost = moduleConfig.apiHost
2235
this.cdnHost = moduleConfig.cdnHost

datapipelines/src/main/kotlin/io/customer/datapipelines/plugins/CustomerIODestination.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ data class CustomerIOSettings(
3333
var apiHost: String? = null
3434
)
3535

36+
internal const val CUSTOMER_IO_DATA_PIPELINES = "Customer.io Data Pipelines"
37+
3638
/**
3739
* CustomerIODestination plugin that is used to send events to Customer IO CDP api, in the choice of region.
3840
* How it works
@@ -44,7 +46,7 @@ class CustomerIODestination : DestinationPlugin(), VersionedPlugin, Subscriber {
4446

4547
private var pipeline: EventPipeline? = null
4648
private var flushPolicies: List<FlushPolicy> = emptyList()
47-
override val key: String = "Customer.io Data Pipelines"
49+
override val key: String = CUSTOMER_IO_DATA_PIPELINES
4850

4951
override fun track(payload: TrackEvent): BaseEvent {
5052
enqueue(payload)
@@ -79,13 +81,11 @@ class CustomerIODestination : DestinationPlugin(), VersionedPlugin, Subscriber {
7981
super.setup(analytics)
8082

8183
// convert flushAt and flushIntervals into FlushPolicies
82-
flushPolicies = if (analytics.configuration.flushPolicies.isEmpty()) {
84+
flushPolicies = analytics.configuration.flushPolicies.ifEmpty {
8385
listOf(
8486
CountBasedFlushPolicy(analytics.configuration.flushAt),
8587
FrequencyFlushPolicy(analytics.configuration.flushInterval * 1000L)
8688
)
87-
} else {
88-
analytics.configuration.flushPolicies
8989
}
9090

9191
// Add DestinationMetadata enrichment plugin

datapipelines/src/test/java/io/customer/datapipelines/DataPipelinesCompatibilityTests.kt

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package io.customer.datapipelines
33
import com.segment.analytics.kotlin.core.Storage
44
import com.segment.analytics.kotlin.core.emptyJsonArray
55
import com.segment.analytics.kotlin.core.emptyJsonObject
6+
import com.segment.analytics.kotlin.core.utilities.StorageImpl
67
import com.segment.analytics.kotlin.core.utilities.getString
78
import io.customer.commontest.config.TestConfig
89
import io.customer.commontest.extensions.random
@@ -22,9 +23,12 @@ import io.customer.sdk.events.Metric
2223
import io.customer.sdk.events.TrackMetric
2324
import io.customer.sdk.util.EventNames
2425
import io.mockk.every
26+
import java.io.File
2527
import kotlinx.coroutines.test.runTest
28+
import kotlinx.serialization.json.Json
2629
import kotlinx.serialization.json.JsonArray
2730
import kotlinx.serialization.json.JsonElement
31+
import kotlinx.serialization.json.JsonObject
2832
import kotlinx.serialization.json.buildJsonObject
2933
import kotlinx.serialization.json.jsonArray
3034
import kotlinx.serialization.json.jsonObject
@@ -43,7 +47,7 @@ class DataPipelinesCompatibilityTests : JUnitTest() {
4347

4448
private lateinit var globalPreferenceStore: GlobalPreferenceStore
4549
private lateinit var deviceStore: DeviceStore
46-
private lateinit var storage: Storage
50+
private lateinit var storage: StorageImpl
4751

4852
override fun setup(testConfig: TestConfig) {
4953
super.setup(
@@ -59,31 +63,25 @@ class DataPipelinesCompatibilityTests : JUnitTest() {
5963
globalPreferenceStore = androidSDKComponent.globalPreferenceStore
6064
deviceStore = androidSDKComponent.deviceStore
6165

62-
storage = analytics.storage
66+
storage = analytics.storage as StorageImpl
6367
}
6468

6569
private suspend fun getQueuedEvents(): JsonArray {
6670
// Rollover to ensure that analytics completes writing to the current file
6771
// and update file contents with valid JSON
6872
storage.rollover()
69-
val cdpApiKey = sdkInstance.moduleConfig.cdpApiKey
7073
// Find all files that contain the CDP API key in the name
7174
// The file we are looking for is named after the CDP API key
7275
// /tmp/analytics-kotlin/{CDP_API_KEY}/events/{CDP_API_KEY}-{N}.tmp before the rollover
7376
// /tmp/analytics-kotlin/{CDP_API_KEY}/events/{CDP_API_KEY}-{N} after the rollover
74-
val eventsFiles = storage.storageDirectory.walk().filter { file ->
75-
file.name.contains(cdpApiKey) && file.isFile && file.extension.isBlank()
76-
}
77-
val result = mutableListOf<JsonElement>()
78-
// Read the contents of each file and extract the JSON array of batched events
79-
eventsFiles.forEach { file ->
80-
val contents = file.readText()
81-
val storedEvents = contents.decodeJson()
82-
val jsonArray = storedEvents["batch"]?.jsonArray ?: emptyJsonArray
83-
result.addAll(jsonArray)
84-
}
77+
78+
val storagePath = storage.read(Storage.Constants.Events)?.let {
79+
it.split(',')[0]
80+
}.shouldNotBeNull()
81+
val storageContents = File(storagePath).readText()
82+
val jsonFormat = Json.decodeFromString(JsonObject.serializer(), storageContents)
8583
// Return the flat list of batched events
86-
return JsonArray(result)
84+
return jsonFormat["batch"]?.jsonArray ?: emptyJsonArray
8785
}
8886

8987
//endregion
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.customer.datapipelines.plugins
2+
3+
import com.segment.analytics.kotlin.core.HTTPClient
4+
import io.customer.commontest.config.TestConfig
5+
import io.customer.datapipelines.testutils.core.JUnitTest
6+
import io.customer.datapipelines.testutils.core.testConfiguration
7+
import io.customer.datapipelines.testutils.utils.OutputReaderPlugin
8+
import io.customer.datapipelines.testutils.utils.trackEvents
9+
import io.mockk.every
10+
import io.mockk.mockkConstructor
11+
import okio.IOException
12+
import org.amshove.kluent.shouldBe
13+
import org.amshove.kluent.shouldNotBe
14+
import org.junit.jupiter.api.Test
15+
16+
/**
17+
* Test for CustomerIODestination plugin, focusing on behavior when network settings fetch fails.
18+
* These tests verify that even when the settings fetch fails, the plugin remains enabled
19+
* and can process events.
20+
*/
21+
class CustomerIODestinationTest : JUnitTest() {
22+
23+
private lateinit var outputReaderPlugin: OutputReaderPlugin
24+
25+
override fun setup(testConfig: TestConfig) {
26+
// Mock HTTP client to throw exception when fetching settings
27+
mockkConstructor(HTTPClient::class)
28+
every { anyConstructed<HTTPClient>().settings(any()) } throws IOException("Network error")
29+
30+
super.setup(
31+
testConfiguration {
32+
sdkConfig {
33+
autoAddCustomerIODestination(true)
34+
}
35+
}
36+
)
37+
38+
outputReaderPlugin = OutputReaderPlugin()
39+
analytics.add(outputReaderPlugin)
40+
}
41+
42+
// process_givenScreenViewUseAnalytics_expectScreenEventWithoutPropertiesProcessed
43+
@Test
44+
fun givenFetchingSettingFails_expectCustomerIODestinationPluginStillBeEnabled() {
45+
// Verify plugin is present in analytics instance
46+
sdkInstance.analytics.find(CustomerIODestination::class) shouldNotBe null
47+
48+
// Track an event to verify it flows through the pipeline
49+
sdkInstance.track("test_event")
50+
51+
// Verify event was processed
52+
outputReaderPlugin.trackEvents.size shouldBe 1
53+
outputReaderPlugin.trackEvents.first().event shouldBe "test_event"
54+
}
55+
56+
@Test
57+
fun givenDefaultSetting_expectConfigurationIntegrationIncludeCustomerIODestination() {
58+
// Verify settings contain CustomerIO destination even with network failure
59+
val configuration = analytics.configuration
60+
configuration.defaultSettings shouldNotBe null
61+
val integrations = configuration.defaultSettings?.integrations
62+
integrations shouldNotBe null
63+
(integrations?.contains(CUSTOMER_IO_DATA_PIPELINES) ?: false) shouldBe true
64+
}
65+
}

0 commit comments

Comments
 (0)