diff --git a/android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt b/android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt index 9245434d..7de21037 100644 --- a/android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt +++ b/android/src/test/java/com/segment/analytics/kotlin/android/StorageTests.kt @@ -158,7 +158,7 @@ class StorageTests { fun `system reset action removes system`() = runTest { val action = object : Action { override fun reduce(state: System): System { - return System(state.configuration, null, state.running, state.initializedPlugins, state.enabled) + return System(state.configuration, null, state.running, state.initializedPlugins, state.waitingPlugins, state.enabled) } } store.dispatch(action, System::class) diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt index 62fef554..13926816 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt @@ -84,31 +84,27 @@ suspend fun Analytics.checkSettings() { val writeKey = configuration.writeKey val cdnHost = configuration.cdnHost - store.currentState(System::class) ?: return - store.dispatch(System.ToggleRunningAction(running = false), System::class) + pauseEventProcessing() - withContext(networkIODispatcher) { + val settingsObj = withContext(networkIODispatcher) { log("Fetching settings on ${Thread.currentThread().name}") - val settingsObj: Settings? = fetchSettings(writeKey, cdnHost) - - withContext(analyticsDispatcher) { - - settingsObj?.let { - log("Dispatching update settings on ${Thread.currentThread().name}") - store.dispatch(System.UpdateSettingsAction(settingsObj), System::class) - } + return@withContext fetchSettings(writeKey, cdnHost) + } - store.currentState(System::class)?.let { system -> - system.settings?.let { settings -> - log("Propagating settings on ${Thread.currentThread().name}") - update(settings) - } - } + settingsObj?.let { + log("Dispatching update settings on ${Thread.currentThread().name}") + store.dispatch(System.UpdateSettingsAction(settingsObj), System::class) + } - // we're good to go back to a running state. - store.dispatch(System.ToggleRunningAction(running = true), System::class) + store.currentState(System::class)?.let { system -> + system.settings?.let { settings -> + log("Propagating settings on ${Thread.currentThread().name}") + update(settings) } } + + // we're good to go back to a running state. + resumeEventProcessing() } internal fun Analytics.fetchSettings( diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/State.kt b/core/src/main/java/com/segment/analytics/kotlin/core/State.kt index 374c56c5..9445bd3e 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/State.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/State.kt @@ -18,9 +18,10 @@ import java.util.* data class System( var configuration: Configuration = Configuration(""), var settings: Settings?, - var running: Boolean, - var initializedPlugins: Set, - var enabled: Boolean + var running: Boolean = false, + var initializedPlugins: Set = emptySet(), + var waitingPlugins: Set = emptySet(), + var enabled: Boolean = true ) : State { companion object { @@ -62,6 +63,7 @@ data class System( settings = settings, running = false, initializedPlugins = setOf(), + waitingPlugins = setOf(), enabled = true ) } @@ -74,6 +76,7 @@ data class System( settings, state.running, state.initializedPlugins, + state.waitingPlugins, state.enabled ) } @@ -81,11 +84,29 @@ data class System( class ToggleRunningAction(var running: Boolean) : Action { override fun reduce(state: System): System { + if (running && state.waitingPlugins.isNotEmpty()) { + running = false + } + return System( state.configuration, state.settings, running, state.initializedPlugins, + state.waitingPlugins, + state.enabled + ) + } + } + + class ForceRunningAction : Action { + override fun reduce(state: System): System { + return System( + state.configuration, + state.settings, + true, + state.initializedPlugins, + state.waitingPlugins, state.enabled ) } @@ -105,6 +126,7 @@ data class System( newSettings, state.running, state.initializedPlugins, + state.waitingPlugins, state.enabled ) } @@ -120,6 +142,7 @@ data class System( state.settings, state.running, initializedPlugins, + state.waitingPlugins, state.enabled ) } @@ -132,10 +155,39 @@ data class System( state.settings, state.running, state.initializedPlugins, + state.waitingPlugins, enabled ) } } + + class AddWaitingPlugin(val plugin: Int): Action { + override fun reduce(state: System): System { + val waitingPlugins = state.waitingPlugins + plugin + return System( + state.configuration, + state.settings, + state.running, + state.initializedPlugins, + waitingPlugins, + state.enabled + ) + } + } + + class RemoveWaitingPlugin(val plugin: Int): Action { + override fun reduce(state: System): System { + val waitingPlugins = state.waitingPlugins - plugin + return System( + state.configuration, + state.settings, + state.running, + state.initializedPlugins, + waitingPlugins, + state.enabled + ) + } + } } /** diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Waiting.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Waiting.kt new file mode 100644 index 00000000..9d2bcd52 --- /dev/null +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Waiting.kt @@ -0,0 +1,62 @@ +package com.segment.analytics.kotlin.core + +import com.segment.analytics.kotlin.core.platform.Plugin +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +/** + * An interface that provides functionality of pausing and resuming event processing on Analytics. + * + * By default plugins that implement this interface pauses processing when it is added to + * analytics (via `setup()`) and resumes after 30s. + * + * To customize pausing and resuming, override `setup()` and call `pause()/resumes()` as needed + */ +interface WaitingPlugin: Plugin { + override fun setup(analytics: Analytics) { + super.setup(analytics) + pause() + } + + fun pause() { + analytics.pauseEventProcessing(this) + } + + fun resume() { + analytics.resumeEventProcessing(this) + } +} + +fun Analytics.pauseEventProcessing(plugin: WaitingPlugin) = analyticsScope.launch(analyticsDispatcher) { + store.dispatch(System.AddWaitingPlugin(plugin.hashCode()), System::class) + pauseEventProcessing() +} + + +fun Analytics.resumeEventProcessing(plugin: WaitingPlugin) = analyticsScope.launch(analyticsDispatcher) { + store.dispatch(System.RemoveWaitingPlugin(plugin.hashCode()), System::class) + resumeEventProcessing() +} + +internal suspend fun Analytics.running(): Boolean { + val system = store.currentState(System::class) + return system?.running ?: false +} + +internal suspend fun Analytics.pauseEventProcessing(timeout: Long = 30_000) { + if (!running()) return + + store.dispatch(System.ToggleRunningAction(false), System::class) + startProcessingAfterTimeout(timeout) +} + +internal suspend fun Analytics.resumeEventProcessing() { + if (running()) return + store.dispatch(System.ToggleRunningAction(true), System::class) +} + +internal fun Analytics.startProcessingAfterTimeout(timeout: Long) = analyticsScope.launch(analyticsDispatcher) { + delay(timeout) + store.dispatch(System.ForceRunningAction(), System::class) +} + diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Plugin.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Plugin.kt index 0be7decb..afd959fa 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Plugin.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Plugin.kt @@ -161,7 +161,7 @@ abstract class DestinationPlugin : EventPlugin { final override fun execute(event: BaseEvent): BaseEvent? = process(event) - internal fun isDestinationEnabled(event: BaseEvent?): Boolean { + open fun isDestinationEnabled(event: BaseEvent?): Boolean { // if event payload has integration marked false then its disabled by customer val customerEnabled = event?.integrations?.getBoolean(key) ?: true // default to true when missing diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/WaitingTests.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/WaitingTests.kt new file mode 100644 index 00000000..4a88842e --- /dev/null +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/WaitingTests.kt @@ -0,0 +1,241 @@ +package com.segment.analytics.kotlin.core + +import com.segment.analytics.kotlin.core.platform.EventPlugin +import com.segment.analytics.kotlin.core.platform.Plugin +import com.segment.analytics.kotlin.core.utils.StubDestinationPlugin +import com.segment.analytics.kotlin.core.utils.clearPersistentStorage +import com.segment.analytics.kotlin.core.utils.mockHTTPClient +import com.segment.analytics.kotlin.core.utils.testAnalytics +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.mockkStatic +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Assertions.* + + +class WaitingTests { + + private lateinit var analytics: Analytics + + private val testDispatcher = UnconfinedTestDispatcher() + + private val testScope = TestScope(testDispatcher) + + @BeforeEach + fun setup() { + clearPersistentStorage() + mockHTTPClient() + val config = Configuration( + writeKey = "123", + application = "Test", + autoAddSegmentDestination = false + ) + analytics = testAnalytics(config, testScope, testDispatcher) + } + + @Test + fun `test resume after timeout`() = testScope.runTest { + assertTrue(analytics.running()) + analytics.pauseEventProcessing(1000) + assertFalse(analytics.running()) + advanceTimeBy(2000) + assertTrue(analytics.running()) + } + + @Test + fun `test manual resume`() = testScope.runTest { + assertTrue(analytics.running()) + analytics.pauseEventProcessing() + assertFalse(analytics.running()) + analytics.resumeEventProcessing() + assertTrue(analytics.running()) + } + + + @Test + fun `test pause does not dispatch state if already pause`() { + mockkStatic("com.segment.analytics.kotlin.core.WaitingKt") + coEvery { analytics.startProcessingAfterTimeout(any()) } returns Job() + + testScope.runTest { + analytics.pauseEventProcessing() + analytics.pauseEventProcessing() + analytics.pauseEventProcessing() + coVerify(exactly = 1) { + analytics.startProcessingAfterTimeout(any()) + } + } + } + + @Test + fun `test WaitingPlugin makes analytics to wait`() = testScope.runTest { + assertTrue(analytics.running()) + val waitingPlugin = ExampleWaitingPlugin() + analytics.add(waitingPlugin) + analytics.track("foo") + + assertFalse(analytics.running()) + assertFalse(waitingPlugin.tracked) + + advanceUntilIdle() + advanceTimeBy(6000) + + assertTrue(analytics.running()) + assertTrue(waitingPlugin.tracked) + } + + @Test + fun `test timeout force resume`() = testScope.runTest { + assertTrue(analytics.running()) + val waitingPlugin = ManualResumeWaitingPlugin() + analytics.add(waitingPlugin) + analytics.track("foo") + + assertFalse(analytics.running()) + assertFalse(waitingPlugin.tracked) + + advanceUntilIdle() + advanceTimeBy(6000) + + assertTrue(analytics.running()) + assertTrue(waitingPlugin.tracked) + } + + @Test + fun `test multiple WaitingPlugin`() = testScope.runTest { + assertTrue(analytics.running()) + val plugin1 = ExampleWaitingPlugin() + val plugin2 = ManualResumeWaitingPlugin() + analytics.add(plugin1) + analytics.add(plugin2) + analytics.track("foo") + + assertFalse(analytics.running()) + assertFalse(plugin1.tracked) + assertFalse(plugin2.tracked) + + plugin1.resume() + advanceTimeBy(6000) + + assertFalse(analytics.running()) + assertFalse(plugin1.tracked) + assertFalse(plugin2.tracked) + + plugin2.resume() + advanceUntilIdle() + advanceTimeBy(6000) + + assertTrue(analytics.running()) + assertTrue(plugin1.tracked) + assertTrue(plugin2.tracked) + } + + @Test + fun `test WaitingPlugin makes analytics to wait on DestinationPlugin`() = testScope.runTest { + assertTrue(analytics.running()) + val waitingPlugin = ExampleWaitingPlugin() + val destinationPlugin = StubDestinationPlugin() + analytics.add(destinationPlugin) + destinationPlugin.add(waitingPlugin) + analytics.track("foo") + + assertFalse(analytics.running()) + assertFalse(waitingPlugin.tracked) + + advanceUntilIdle() + advanceTimeBy(6000) + + assertTrue(analytics.running()) + assertTrue(waitingPlugin.tracked) + } + + @Test + fun `test timeout force resume on DestinationPlugin`() = testScope.runTest { + assertTrue(analytics.running()) + val waitingPlugin = ManualResumeWaitingPlugin() + val destinationPlugin = StubDestinationPlugin() + analytics.add(destinationPlugin) + destinationPlugin.add(waitingPlugin) + analytics.track("foo") + + assertFalse(analytics.running()) + assertFalse(waitingPlugin.tracked) + + advanceUntilIdle() + advanceTimeBy(6000) + + assertTrue(analytics.running()) + assertTrue(waitingPlugin.tracked) + } + + @Test + fun `test multiple WaitingPlugin on DestinationPlugin`() = testScope.runTest { + assertTrue(analytics.running()) + val destinationPlugin = StubDestinationPlugin() + analytics.add(destinationPlugin) + val plugin1 = ExampleWaitingPlugin() + val plugin2 = ManualResumeWaitingPlugin() + destinationPlugin.add(plugin1) + destinationPlugin.add(plugin2) + analytics.track("foo") + + assertFalse(analytics.running()) + assertFalse(plugin1.tracked) + assertFalse(plugin2.tracked) + + plugin1.resume() + advanceTimeBy(6000) + + assertFalse(analytics.running()) + assertFalse(plugin1.tracked) + assertFalse(plugin2.tracked) + + plugin2.resume() + advanceUntilIdle() + advanceTimeBy(6000) + + assertTrue(analytics.running()) + assertTrue(plugin1.tracked) + assertTrue(plugin2.tracked) + } + + class ExampleWaitingPlugin: EventPlugin, WaitingPlugin { + override val type: Plugin.Type = Plugin.Type.Enrichment + override lateinit var analytics: Analytics + var tracked = false + + override fun update(settings: Settings, type: Plugin.UpdateType) { + if (type == Plugin.UpdateType.Initial) { + analytics.analyticsScope.launch(analytics.analyticsDispatcher) { + delay(3000) + resume() + } + } + } + + override fun track(payload: TrackEvent): BaseEvent? { + tracked = true + return super.track(payload) + } + } + + class ManualResumeWaitingPlugin: EventPlugin, WaitingPlugin { + override val type: Plugin.Type = Plugin.Type.Enrichment + override lateinit var analytics: Analytics + var tracked = false + + override fun track(payload: TrackEvent): BaseEvent? { + tracked = true + return super.track(payload) + } + } +} \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/InMemoryStorageTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/InMemoryStorageTest.kt index aafe2c8d..7196c7b4 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/InMemoryStorageTest.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/InMemoryStorageTest.kt @@ -136,7 +136,7 @@ internal class InMemoryStorageTest { fun `system reset action removes system`() = runTest { val action = object : Action { override fun reduce(state: System): System { - return System(state.configuration, null, state.running, state.initializedPlugins, state.enabled) + return System(state.configuration, null, state.running, state.initializedPlugins, state.waitingPlugins, state.enabled) } } store.dispatch(action, System::class) diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/StorageImplTest.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/StorageImplTest.kt index f8f8efa1..d1e1f370 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/StorageImplTest.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utilities/StorageImplTest.kt @@ -161,7 +161,7 @@ internal class StorageImplTest { fun `system reset action removes system`() = runTest { val action = object : Action { override fun reduce(state: System): System { - return System(state.configuration, null, state.running, state.initializedPlugins, state.enabled) + return System(state.configuration, null, state.running, state.initializedPlugins, state.waitingPlugins, state.enabled) } } store.dispatch(action, System::class) diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utils/Plugins.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utils/Plugins.kt index 1ed795bf..9cf95e89 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utils/Plugins.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utils/Plugins.kt @@ -73,4 +73,9 @@ open class StubPlugin : EventPlugin { open class StubAfterPlugin : EventPlugin { override val type: Plugin.Type = Plugin.Type.After override lateinit var analytics: Analytics +} + +open class StubDestinationPlugin : DestinationPlugin() { + override val key: String = "StubDestination" + override fun isDestinationEnabled(event: BaseEvent?) = true } \ No newline at end of file