diff --git a/store/api/android/store.api b/store/api/android/store.api index 99c14d7fe..45a91c111 100644 --- a/store/api/android/store.api +++ b/store/api/android/store.api @@ -123,6 +123,15 @@ public final class org/mobilenativefoundation/store/store5/FetcherResult$Error$M public fun toString ()Ljava/lang/String; } +public abstract interface class org/mobilenativefoundation/store/store5/Logger { + public abstract fun debug (Ljava/lang/String;)V + public abstract fun error (Ljava/lang/String;Ljava/lang/Throwable;)V +} + +public final class org/mobilenativefoundation/store/store5/Logger$DefaultImpls { + public static synthetic fun error$default (Lorg/mobilenativefoundation/store/store5/Logger;Ljava/lang/String;Ljava/lang/Throwable;ILjava/lang/Object;)V +} + public final class org/mobilenativefoundation/store/store5/MemoryPolicy { public static final field Companion Lorg/mobilenativefoundation/store/store5/MemoryPolicy$Companion; public static final field DEFAULT_SIZE_POLICY J @@ -612,9 +621,6 @@ public final class org/mobilenativefoundation/store/store5/impl/extensions/Store public static final fun get (Lorg/mobilenativefoundation/store/store5/Store;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } -public abstract interface annotation class org/mobilenativefoundation/store/store5/internal/concurrent/AnyThread : java/lang/annotation/Annotation { -} - public abstract class org/mobilenativefoundation/store/store5/internal/result/EagerConflictResolutionResult { } diff --git a/store/api/jvm/store.api b/store/api/jvm/store.api index b992e2c43..cc92814fe 100644 --- a/store/api/jvm/store.api +++ b/store/api/jvm/store.api @@ -116,6 +116,15 @@ public final class org/mobilenativefoundation/store/store5/FetcherResult$Error$M public fun toString ()Ljava/lang/String; } +public abstract interface class org/mobilenativefoundation/store/store5/Logger { + public abstract fun debug (Ljava/lang/String;)V + public abstract fun error (Ljava/lang/String;Ljava/lang/Throwable;)V +} + +public final class org/mobilenativefoundation/store/store5/Logger$DefaultImpls { + public static synthetic fun error$default (Lorg/mobilenativefoundation/store/store5/Logger;Ljava/lang/String;Ljava/lang/Throwable;ILjava/lang/Object;)V +} + public final class org/mobilenativefoundation/store/store5/MemoryPolicy { public static final field Companion Lorg/mobilenativefoundation/store/store5/MemoryPolicy$Companion; public static final field DEFAULT_SIZE_POLICY J @@ -605,9 +614,6 @@ public final class org/mobilenativefoundation/store/store5/impl/extensions/Store public static final fun get (Lorg/mobilenativefoundation/store/store5/Store;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } -public abstract interface annotation class org/mobilenativefoundation/store/store5/internal/concurrent/AnyThread : java/lang/annotation/Annotation { -} - public abstract class org/mobilenativefoundation/store/store5/internal/result/EagerConflictResolutionResult { } diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/Logger.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/Logger.kt new file mode 100644 index 000000000..d7318b9af --- /dev/null +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/Logger.kt @@ -0,0 +1,24 @@ +package org.mobilenativefoundation.store.store5 + +/** + * A simple logging interface for logging error and debug messages. + */ +interface Logger { + /** + * Logs an error message, optionally with a throwable. + * + * @param message The error message to log. + * @param throwable An optional [Throwable] associated with the error. + */ + fun error( + message: String, + throwable: Throwable? = null, + ) + + /** + * Logs a debug message. + * + * @param message The debug message to log. + */ + fun debug(message: String) +} diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/DefaultLogger.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/DefaultLogger.kt new file mode 100644 index 000000000..9219683c3 --- /dev/null +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/DefaultLogger.kt @@ -0,0 +1,26 @@ +package org.mobilenativefoundation.store.store5.impl + +import co.touchlab.kermit.CommonWriter +import org.mobilenativefoundation.store.store5.Logger + +/** + * Default implementation of [Logger] using the Kermit logging library. + */ +internal class DefaultLogger : Logger { + private val delegate = + co.touchlab.kermit.Logger.apply { + setLogWriters(listOf(CommonWriter())) + setTag("Store") + } + + override fun debug(message: String) { + delegate.d(message) + } + + override fun error( + message: String, + throwable: Throwable?, + ) { + delegate.e(message, throwable) + } +} diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt index d798d57fe..39c2ac833 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt @@ -2,8 +2,6 @@ package org.mobilenativefoundation.store.store5.impl -import co.touchlab.kermit.CommonWriter -import co.touchlab.kermit.Logger import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow @@ -14,6 +12,7 @@ import kotlinx.coroutines.sync.withLock import org.mobilenativefoundation.store.core5.ExperimentalStoreApi import org.mobilenativefoundation.store.store5.Bookkeeper import org.mobilenativefoundation.store.store5.Clear +import org.mobilenativefoundation.store.store5.Logger import org.mobilenativefoundation.store.store5.MutableStore import org.mobilenativefoundation.store.store5.StoreReadRequest import org.mobilenativefoundation.store.store5.StoreReadResponse @@ -22,16 +21,16 @@ import org.mobilenativefoundation.store.store5.StoreWriteResponse import org.mobilenativefoundation.store.store5.Updater import org.mobilenativefoundation.store.store5.UpdaterResult import org.mobilenativefoundation.store.store5.impl.extensions.now -import org.mobilenativefoundation.store.store5.internal.concurrent.AnyThread import org.mobilenativefoundation.store.store5.internal.concurrent.ThreadSafety import org.mobilenativefoundation.store.store5.internal.definition.WriteRequestQueue import org.mobilenativefoundation.store.store5.internal.result.EagerConflictResolutionResult -@ExperimentalStoreApi +@OptIn(ExperimentalStoreApi::class) internal class RealMutableStore( private val delegate: RealStore, private val updater: Updater, private val bookkeeper: Bookkeeper?, + private val logger: Logger = DefaultLogger(), ) : MutableStore, Clear.Key by delegate, Clear.All by delegate { private val storeLock = Mutex() private val keyToWriteRequestQueue = mutableMapOf>() @@ -39,42 +38,60 @@ internal class RealMutableStore stream(request: StoreReadRequest): Flow> = flow { + // Ensure we are ready for this key. safeInitStore(request.key) + // Try to eagerly resolve conflicts before pulling from network. when (val eagerConflictResolutionResult = tryEagerlyResolveConflicts(request.key)) { + // TODO(#678): Many use cases will not want to pull immediately after failing to push local changes. + // We should enable configuration of conflict resolution strategies, such as logging, retrying, canceling. + is EagerConflictResolutionResult.Error.Exception -> { - logger.e(eagerConflictResolutionResult.error.toString()) + logger.error(eagerConflictResolutionResult.error.toString()) } is EagerConflictResolutionResult.Error.Message -> { - logger.e(eagerConflictResolutionResult.message) + logger.error(eagerConflictResolutionResult.message) } is EagerConflictResolutionResult.Success.ConflictsResolved -> { - logger.d(eagerConflictResolutionResult.value.toString()) + logger.debug(eagerConflictResolutionResult.value.toString()) } EagerConflictResolutionResult.Success.NoConflicts -> { - logger.d(eagerConflictResolutionResult.toString()) + logger.debug("No conflicts.") } } + // Now, we can just delegate to the underlying stream. delegate.stream(request).collect { storeReadResponse -> emit(storeReadResponse) } } @ExperimentalStoreApi override fun stream(requestStream: Flow>): Flow = flow { + // Each incoming write request is enqueued. + // Then we try to update the network and delegate. + requestStream .onEach { writeRequest -> + // Prepare per-key data structures. safeInitStore(writeRequest.key) + + // Enqueue the new write request. addWriteRequestToQueue(writeRequest) } .collect { writeRequest -> val storeWriteResponse = try { + // Always write to local first. delegate.write(writeRequest.key, writeRequest.value) - when (val updaterResult = tryUpdateServer(writeRequest)) { + + // Try to sync to network. + val updaterResult = tryUpdateServer(writeRequest) + + // Convert UpdaterResult -> StoreWriteResponse. + when (updaterResult) { is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error) is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message) is UpdaterResult.Success.Typed<*> -> { @@ -103,6 +120,7 @@ internal class RealMutableStore(request.key) if (updaterResult is UpdaterResult.Success) { + // We successfully synced to network, can now clear out any stale writes. updateWriteRequestQueue( key = request.key, created = request.created, @@ -110,14 +128,20 @@ internal class RealMutableStore postLatest(key: Key): UpdaterResult { + // The "latest" is the last item in the queue for this key. val writer = getLatestWriteRequest(key) + return when (val updaterResult = updater.post(key, writer.value)) { is UpdaterResult.Error.Exception -> UpdaterResult.Error.Exception(updaterResult.error) is UpdaterResult.Error.Message -> UpdaterResult.Error.Message(updaterResult.message) @@ -133,7 +157,9 @@ internal class RealMutableStore updateWriteRequestQueue( key: Key, created: Long, @@ -141,10 +167,11 @@ internal class RealMutableStore>() + val remaining = ArrayDeque>() for (writeRequest in this) { if (writeRequest.created <= created) { + // Mark each relevant request as succeeded. updater.onCompletion?.onSuccess?.invoke(updaterResult) val storeWriteResponse = @@ -161,55 +188,64 @@ internal class RealMutableStore StoreWriteResponse.Success.Untyped(updaterResult.value) } + // Notify each on-completion callback. writeRequest.onCompletions?.forEach { onStoreWriteCompletion -> onStoreWriteCompletion.onSuccess(storeWriteResponse) } } else { - outstandingWriteRequests.add(writeRequest) + // Keep requests that happened after created. + remaining.add(writeRequest) } } - outstandingWriteRequests + remaining } - withThreadSafety(key) { + // Update the in-memory map outside the queue's mutex. + storeLock.withLock { keyToWriteRequestQueue[key] = nextWriteRequestQueue } } - @AnyThread + /** + * Locks the queue for [key] and invokes [block]. + */ private suspend fun withWriteRequestQueueLock( key: Key, block: suspend WriteRequestQueue.() -> Result, - ): Result = - withThreadSafety(key) { - writeRequests.lightswitch.lock(writeRequests.mutex) - val writeRequestQueue = requireNotNull(keyToWriteRequestQueue[key]) - val output = writeRequestQueue.block() - writeRequests.lightswitch.unlock(writeRequests.mutex) - output - } - - private suspend fun getLatestWriteRequest(key: Key): StoreWriteRequest = - withThreadSafety(key) { - writeRequests.mutex.lock() - val output = requireNotNull(keyToWriteRequestQueue[key]?.last()) - writeRequests.mutex.unlock() - output + ): Result { + // Acquire the ThreadSafety object for this key without holding storeLock. + val threadSafety = getThreadSafety(key) + + // Now safely lock the queue's own mutex. + threadSafety.writeRequests.lightswitch.lock(threadSafety.writeRequests.mutex) + return try { + val queue = getQueue((key)) + queue.block() + } finally { + threadSafety.writeRequests.lightswitch.unlock(threadSafety.writeRequests.mutex) } + } - @AnyThread - private suspend fun withThreadSafety( - key: Key, - block: suspend ThreadSafety.() -> Output, - ): Output = - storeLock.withLock { - val threadSafety = requireNotNull(keyToThreadSafety[key]) - threadSafety.block() + private suspend fun getLatestWriteRequest(key: Key): StoreWriteRequest { + val threadSafety = getThreadSafety(key) + threadSafety.writeRequests.mutex.lock() + return try { + val queue = getQueue(key) + require(queue.isNotEmpty()) { + "No writes found for key=$key." + } + queue.last() + } finally { + threadSafety.writeRequests.mutex.unlock() } + } + /** + * Checks if we have un-synced writes or a recorded failed sync for [key]. + */ private suspend fun conflictsMightExist(key: Key): Boolean { - val lastFailedSync = bookkeeper?.getLastFailedSync(key) - return lastFailedSync != null || writeRequestsQueueIsEmpty(key).not() + val failed = bookkeeper?.getLastFailedSync(key) + return (failed != null) || !writeRequestsQueueIsEmpty(key) } private fun writeRequestsQueueIsEmpty(key: Key): Boolean = keyToWriteRequestQueue[key].isNullOrEmpty() @@ -219,58 +255,86 @@ internal class RealMutableStore tryEagerlyResolveConflicts(key: Key): EagerConflictResolutionResult = - withThreadSafety(key) { - val latest = delegate.latestOrNull(key) - when { - latest == null || bookkeeper == null || conflictsMightExist(key).not() -> EagerConflictResolutionResult.Success.NoConflicts - else -> { - try { - val updaterResult = - updater.post(key, latest).also { updaterResult -> - if (updaterResult is UpdaterResult.Success) { - updateWriteRequestQueue(key = key, created = now(), updaterResult = updaterResult) - } - } + private suspend fun tryEagerlyResolveConflicts(key: Key): EagerConflictResolutionResult { + // Acquire the ThreadSafety object for this key without holding storeLock. + val threadSafety = getThreadSafety(key) - when (updaterResult) { - is UpdaterResult.Error.Exception -> EagerConflictResolutionResult.Error.Exception(updaterResult.error) - is UpdaterResult.Error.Message -> EagerConflictResolutionResult.Error.Message(updaterResult.message) - is UpdaterResult.Success -> EagerConflictResolutionResult.Success.ConflictsResolved(updaterResult) + // Lock just long enough to check if conflicts exist. + val (latestValue, conflictsExist) = + threadSafety.readCompletions.mutex.withLock { + val latestValue = delegate.latestOrNull(key) + val conflictsExist = latestValue != null && bookkeeper != null && conflictsMightExist(key) + latestValue to conflictsExist + } + + return if (!conflictsExist || latestValue == null) { + EagerConflictResolutionResult.Success.NoConflicts + } else { + try { + val updaterResult = + updater.post(key, latestValue).also { updaterResult -> + if (updaterResult is UpdaterResult.Success) { + // If it succeeds, we want to remove stale requests and clear the bookkeeper. + updateWriteRequestQueue(key = key, created = now(), updaterResult = updaterResult) + + bookkeeper?.clear(key) } - } catch (throwable: Throwable) { - EagerConflictResolutionResult.Error.Exception(throwable) + } + + when (updaterResult) { + is UpdaterResult.Error.Exception -> { + EagerConflictResolutionResult.Error.Exception(updaterResult.error) + } + + is UpdaterResult.Error.Message -> { + EagerConflictResolutionResult.Error.Message(updaterResult.message) + } + + is UpdaterResult.Success -> { + EagerConflictResolutionResult.Success.ConflictsResolved(updaterResult) } } + } catch (error: Throwable) { + EagerConflictResolutionResult.Error.Exception(error) } } + } - private suspend fun safeInitWriteRequestQueue(key: Key) = - withThreadSafety(key) { + /** + * Ensures that [keyToThreadSafety] and [keyToWriteRequestQueue] have entries for [key]. + * We only hold [storeLock] while touching these two maps, then release it immediately. + */ + private suspend fun safeInitStore(key: Key) { + storeLock.withLock { + if (keyToThreadSafety[key] == null) { + keyToThreadSafety[key] = ThreadSafety() + } if (keyToWriteRequestQueue[key] == null) { keyToWriteRequestQueue[key] = ArrayDeque() } } + } - private suspend fun safeInitThreadSafety(key: Key) = - storeLock.withLock { - if (keyToThreadSafety[key] == null) { - keyToThreadSafety[key] = ThreadSafety() + /** + * Retrieves the [ThreadSafety] object for [key] without reinitializing it, since [safeInitStore] handles creation. + * We do a quick [storeLock] read then release it without nesting per-key locks inside [storeLock]. + */ + private suspend fun getThreadSafety(key: Key): ThreadSafety { + return storeLock.withLock { + requireNotNull(keyToThreadSafety[key]) { + "ThreadSafety not initialized for key=$key." } } - - private suspend fun safeInitStore(key: Key) { - safeInitThreadSafety(key) - safeInitWriteRequestQueue(key) } - companion object { - private val logger = - Logger.apply { - setLogWriters(listOf(CommonWriter())) - setTag("Store") + /** + * Helper to retrieve the queue for [key] without re-initialization logic. + */ + private suspend fun getQueue(key: Key): WriteRequestQueue { + return storeLock.withLock { + requireNotNull(keyToWriteRequestQueue[key]) { + "No write request queue found for key=$key." } - private const val UNKNOWN_ERROR = "Unknown error occurred" + } } } diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/internal/concurrent/AnyThread.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/internal/concurrent/AnyThread.kt deleted file mode 100644 index 49dbc123e..000000000 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/internal/concurrent/AnyThread.kt +++ /dev/null @@ -1,3 +0,0 @@ -package org.mobilenativefoundation.store.store5.internal.concurrent - -annotation class AnyThread diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/RealMutableStoreTest.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/RealMutableStoreTest.kt new file mode 100644 index 000000000..18ea7c72a --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/RealMutableStoreTest.kt @@ -0,0 +1,375 @@ +@file:OptIn(ExperimentalCoroutinesApi::class, ExperimentalStoreApi::class) + +package org.mobilenativefoundation.store.store5.mutablestore + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.async +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import org.mobilenativefoundation.store.core5.ExperimentalStoreApi +import org.mobilenativefoundation.store.store5.FetcherResult +import org.mobilenativefoundation.store.store5.StoreReadRequest +import org.mobilenativefoundation.store.store5.StoreReadResponse +import org.mobilenativefoundation.store.store5.StoreWriteRequest +import org.mobilenativefoundation.store.store5.StoreWriteResponse +import org.mobilenativefoundation.store.store5.impl.RealMutableStore +import org.mobilenativefoundation.store.store5.impl.RealStore +import org.mobilenativefoundation.store.store5.mutablestore.util.TestCache +import org.mobilenativefoundation.store.store5.mutablestore.util.TestConverter +import org.mobilenativefoundation.store.store5.mutablestore.util.TestFetcher +import org.mobilenativefoundation.store.store5.mutablestore.util.TestInMemoryBookkeeper +import org.mobilenativefoundation.store.store5.mutablestore.util.TestLogger +import org.mobilenativefoundation.store.store5.mutablestore.util.TestSourceOfTruth +import org.mobilenativefoundation.store.store5.mutablestore.util.TestUpdater +import org.mobilenativefoundation.store.store5.mutablestore.util.TestValidator +import org.mobilenativefoundation.store.store5.mutablestore.util.testStore +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue + +private data class Note(val id: String, val content: String) + +private data class NetworkNote(val id: String, val content: String) + +private data class DatabaseNote(val id: String, val content: String) + +@OptIn(ExperimentalCoroutinesApi::class, ExperimentalStoreApi::class) +class RealMutableStoreTest { + private lateinit var testFetcher: TestFetcher + private lateinit var testConverter: TestConverter + private lateinit var testValidator: TestValidator + private lateinit var testSourceOfTruth: TestSourceOfTruth + private lateinit var testCache: TestCache + + private lateinit var testUpdater: TestUpdater + private lateinit var testBookkeeper: TestInMemoryBookkeeper + private lateinit var testLogger: TestLogger + + private lateinit var delegateStore: RealStore + private lateinit var mutableStore: RealMutableStore + + @BeforeTest + fun setUp() { + testFetcher = TestFetcher() + val defaultLocalValue = DatabaseNote("defaultLocalId", "defaultLocalContent") + testConverter = + TestConverter( + defaultNetworkToLocalConverter = { defaultLocalValue }, + defaultOutputToLocalConverter = { defaultLocalValue }, + ) + testValidator = TestValidator() + testSourceOfTruth = TestSourceOfTruth() + testCache = TestCache() + + testFetcher.whenever("key1") { + flowOf(FetcherResult.Data(NetworkNote("networkId", "networkContent"))) + } + + testUpdater = TestUpdater() + testBookkeeper = TestInMemoryBookkeeper() + testLogger = TestLogger() + + delegateStore = + testStore( + fetcher = testFetcher, + sourceOfTruth = testSourceOfTruth, + converter = testConverter, + validator = testValidator, + memoryCache = testCache, + ) + + mutableStore = + RealMutableStore( + delegate = delegateStore, + updater = testUpdater, + bookkeeper = testBookkeeper, + logger = testLogger, + ) + } + + @Test + fun stream_givenNoConflicts_whenReading_thenEmitsFromDelegate() = + runTest { + // Given + val request = StoreReadRequest.Companion.cached("key1", refresh = true) + + // When + val results = mutableStore.stream(request).take(2).toList() + + // Then + assertTrue(results.size >= 2) + assertIs(results[0]) + assertIs>(results[1]) + } + + @Test + fun stream_givenConflictsAndBookkeeper_whenReading_thenAttemptsEagerConflictResolution() = + runTest { + // Given + val request = StoreReadRequest.Companion.cached("key2", refresh = true) + delegateStore.write("key2", Note("localId", "localContent")) + testBookkeeper.setLastFailedSync("key2") + testUpdater.successValue = NetworkNote("resolvedId", "resolvedContent") + + // When + val results = mutableStore.stream(request).take(2).toList() + + // Then + assertTrue(results.isNotEmpty()) + val foundResolutionLog = + testLogger.debugLogs.any { it.contains("resolvedContent") } || + testLogger.debugLogs.any { it.contains("No conflicts.") } + assertTrue(foundResolutionLog, "Expected conflict resolution attempt in debug logs") + assertEquals(null, testBookkeeper.getLastFailedSync("key2")) + assertIs>(results.last()) + } + + @Test + fun stream_givenConflictResolutionFails_whenReading_thenLogsErrorButContinues() = + runTest { + // Given + val request = StoreReadRequest.Companion.cached("key3", refresh = true) + val errorMessage = "Conflict not resolved" + + delegateStore.write("key3", Note("localId3", "localContent3")) + testBookkeeper.setLastFailedSync("key3") + testUpdater.errorMessage = errorMessage + + // When + val results = mutableStore.stream(request).take(2).toList() + + // Then + assertTrue(results.size >= 2) + assertTrue( + testLogger.errorLogs.any { (msg, _) -> msg.contains(errorMessage) }, + "Expected error logs due to conflict resolution failing", + ) + assertNotNull(testBookkeeper.getLastFailedSync("key3")) + } + + @Test + fun stream_givenWriteFlowAndNoConflicts_whenCollecting_thenLocalAndNetworkAreUpdated() = + runTest { + // Given + val requestsFlow = MutableSharedFlow>(replay = 1) + + // When + val responsesDeferred = + async { + mutableStore.stream(requestsFlow).take(1).toList() + } + + requestsFlow.emit( + StoreWriteRequest.Companion.of( + key = "writeKey1", + value = Note("localNoteId1", "localNoteContent1"), + created = 1111L, + onCompletions = null, + ), + ) + + val responses = responsesDeferred.await() + assertTrue(responses.first() is StoreWriteResponse.Success) + + // Then + val read = delegateStore.latestOrNull("writeKey1") + assertEquals("localNoteContent1", read?.content) + assertEquals(null, testBookkeeper.getLastFailedSync("writeKey1")) + } + + @Test + fun stream_givenWriteFlowAndNetworkFailure_whenCollecting_thenLocalIsUpdatedButConflictRemains() = + runTest { + // Given + val requestsFlow = MutableSharedFlow>(replay = 1) + testUpdater.errorMessage = "Network failure" + + // When + val responsesDeferred = + async { + mutableStore.stream(requestsFlow).take(1).toList() + } + + requestsFlow.emit( + StoreWriteRequest.Companion.of( + key = "writeKey2", + value = Note("localNoteId2", "localNoteContent2"), + created = 1111L, + onCompletions = null, + ), + ) + + val responses = responsesDeferred.await() + + // Then + val firstResponse = responses.first() + assertTrue(firstResponse is StoreWriteResponse.Error.Message) + assertTrue(firstResponse.message.contains("Network failure")) + val read = delegateStore.latestOrNull("writeKey2") + assertEquals("localNoteContent2", read?.content) + assertNotNull(testBookkeeper.getLastFailedSync("writeKey2")) + } + + @Test + fun stream_givenMultipleWritesForSameKey_whenAllSucceed_thenOlderRequestsAreClearedFromQueue() = + runTest { + // Given + val requestsFlow = MutableSharedFlow>(replay = 2) + testUpdater.successValue = NetworkNote("someNetId", "someNetContent") + val responsesDeferred = + async { + mutableStore.stream(requestsFlow).take(2).toList() + } + + // When + requestsFlow.emit( + StoreWriteRequest.Companion.of( + key = "multiKey", + value = Note("first", "firstContent"), + created = 100, + onCompletions = null, + ), + ) + requestsFlow.emit( + StoreWriteRequest.Companion.of( + key = "multiKey", + value = Note("second", "secondContent"), + created = 200, + onCompletions = null, + ), + ) + + // Then + val responses = responsesDeferred.await() + assertTrue(responses[0] is StoreWriteResponse.Success) + assertTrue(responses[1] is StoreWriteResponse.Success) + val read = delegateStore.latestOrNull("multiKey") + assertEquals("secondContent", read?.content) + assertNull(testBookkeeper.getLastFailedSync("multiKey")) + } + + @Test + fun write_givenSingleRequestAndNoNetworkIssues_whenCalled_thenSucceeds() = + runTest { + // Given + val request = + StoreWriteRequest.Companion.of( + key = "singleWriteKey", + value = Note("id", "content"), + created = 9999L, + onCompletions = null, + ) + + // When + val response = mutableStore.write(request) + + // Then + assertIs(response) + assertEquals("content", delegateStore.latestOrNull("singleWriteKey")?.content) + } + + @Test + fun write_givenSingleRequestAndNetworkException_whenCalled_thenFailsButLocalUpdated() = + runTest { + // Given + testUpdater.exception = IllegalStateException("Network error!") + val request = + StoreWriteRequest.Companion.of( + key = "exceptionKey", + value = Note("exceptionId", "contentException"), + created = 2222L, + onCompletions = null, + ) + + // When + val response = mutableStore.write(request) + + // Then + assertIs(response) + assertEquals("contentException", delegateStore.latestOrNull("exceptionKey")?.content) + assertNotNull(testBookkeeper.getLastFailedSync("exceptionKey")) + } + + @Test + fun clearAll_givenSomeKeys_whenCalled_thenDelegateIsCleared() = + runTest { + // Given + delegateStore.write("clearKey1", Note("id1", "content1")) + delegateStore.write("clearKey2", Note("id2", "content2")) + assertNotNull(delegateStore.latestOrNull("clearKey1")) + assertNotNull(delegateStore.latestOrNull("clearKey2")) + + // When + mutableStore.clear() + + // Then + assertNull(delegateStore.latestOrNull("clearKey1")) + assertNull(delegateStore.latestOrNull("clearKey2")) + } + + @Test + fun clear_givenKey_whenCalled_thenDelegateIsClearedForThatKey() = + runTest { + // Given + delegateStore.write("clearKey", Note("idCleared", "contentCleared")) + assertNotNull(delegateStore.latestOrNull("clearKey")) + + // When + mutableStore.clear("clearKey") + + // Then + assertNull(delegateStore.latestOrNull("clearKey")) + } + + @Test + fun stream_givenNoBookkeeper_whenConflictsMightExistIsCalled_thenNoEagerResolutionIsAttempted() = + runTest { + // Given + val storeNoBookkeeper = + RealMutableStore( + delegate = delegateStore, + updater = testUpdater, + bookkeeper = null, + logger = testLogger, + ) + delegateStore.write("keyNoBook", Note("idNoBook", "contentNoBook")) + val request = StoreReadRequest.Companion.cached("keyNoBook", refresh = false) + + // When + val results = storeNoBookkeeper.stream(request).take(2).toList() + + // Then + assertTrue(results.isNotEmpty()) + assertTrue( + testLogger.debugLogs.none { it.contains("ConflictsResolved") }, + "No conflict resolution logs expected because no Bookkeeper", + ) + } + + @Test + fun write_givenKeyNotInitialized_whenCalled_thenStoreIsSafelyInitialized() = + runTest { + // Given + val request = + StoreWriteRequest.Companion.of( + key = "newKey", + value = Note("someId", "someContent"), + created = 777L, + onCompletions = null, + ) + + // When + val response = mutableStore.write(request) + + // Then + assertIs(response) + assertEquals("someContent", delegateStore.latestOrNull("newKey")?.content) + } +} diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestCache.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestCache.kt new file mode 100644 index 000000000..78d22c91f --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestCache.kt @@ -0,0 +1,76 @@ +package org.mobilenativefoundation.store.store5.mutablestore.util + +import org.mobilenativefoundation.store.cache5.Cache + +@Suppress("UNCHECKED_CAST") +class TestCache : Cache { + private val map = HashMap() + var getIfPresentCalls = 0 + var getOrPutCalls = 0 + var getAllPresentCalls = 0 + var putCalls = 0 + var putAllCalls = 0 + var invalidateCalls = 0 + var invalidateAllKeysCalls = 0 + var invalidateAllCalls = 0 + var sizeCalls = 0 + + override fun getIfPresent(key: Key): Value? { + getIfPresentCalls++ + return map[key] + } + + override fun getOrPut( + key: Key, + valueProducer: () -> Value, + ): Value { + getOrPutCalls++ + return map.getOrPut(key, valueProducer) + } + + override fun getAllPresent(keys: List<*>): Map { + getAllPresentCalls++ + return keys.mapNotNull { it as? Key }.associateWithNotNull { key -> map[key] } + } + + override fun put( + key: Key, + value: Value, + ) { + putCalls++ + map[key] = value + } + + override fun putAll(map: Map) { + putAllCalls++ + map.forEach { (k, v) -> put(k, v) } + } + + override fun invalidate(key: Key) { + invalidateCalls++ + map.remove(key) + } + + override fun invalidateAll(keys: List) { + invalidateAllKeysCalls++ + keys.forEach { map.remove(it) } + } + + override fun invalidateAll() { + invalidateAllCalls++ + map.clear() + } + + override fun size(): Long { + sizeCalls++ + return map.size.toLong() + } + + private inline fun Iterable.associateWithNotNull(transform: (K) -> V?): Map { + val destination = mutableMapOf() + for (element in this) { + transform(element)?.let { destination[element] = it } + } + return destination + } +} diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestConverter.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestConverter.kt new file mode 100644 index 000000000..845dcd439 --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestConverter.kt @@ -0,0 +1,34 @@ +package org.mobilenativefoundation.store.store5.mutablestore.util + +import org.mobilenativefoundation.store.store5.Converter + +@Suppress("UNCHECKED_CAST") +class TestConverter( + private val defaultNetworkToLocalConverter: ((Network) -> Local)? = null, + private val defaultOutputToLocalConverter: ((Output) -> Local)? = null, +) : Converter { + private val networkToLocalMap: HashMap = HashMap() + private val outputToLocalMap: HashMap = HashMap() + + fun wheneverNetwork( + network: Network, + block: () -> Local, + ) { + networkToLocalMap[network] = block() + } + + fun wheneverOutput( + output: Output, + block: () -> Local, + ) { + outputToLocalMap[output] = block() + } + + override fun fromNetworkToLocal(network: Network): Local { + return networkToLocalMap[network] ?: defaultNetworkToLocalConverter?.invoke(network) ?: network as Local + } + + override fun fromOutputToLocal(output: Output): Local { + return outputToLocalMap[output] ?: defaultOutputToLocalConverter?.invoke(output) ?: output as Local + } +} diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestFetcher.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestFetcher.kt new file mode 100644 index 000000000..a074424a4 --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestFetcher.kt @@ -0,0 +1,25 @@ +package org.mobilenativefoundation.store.store5.mutablestore.util + +import kotlinx.coroutines.flow.Flow +import org.mobilenativefoundation.store.store5.Fetcher +import org.mobilenativefoundation.store.store5.FetcherResult + +class TestFetcher( + override val name: String? = null, + override val fallback: Fetcher? = null, +) : Fetcher { + private val faked = HashMap>>() + + fun whenever( + key: Key, + block: () -> Flow>, + ) { + faked[key] = block() + } + + override operator fun invoke(key: Key): Flow> { + return requireNotNull(faked[key]) { + "No fetcher result provided for key=$key" + } + } +} diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestInMemoryBookkeeper.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestInMemoryBookkeeper.kt new file mode 100644 index 000000000..1c159c81e --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestInMemoryBookkeeper.kt @@ -0,0 +1,28 @@ +package org.mobilenativefoundation.store.store5.mutablestore.util + +import org.mobilenativefoundation.store.store5.Bookkeeper + +class TestInMemoryBookkeeper : Bookkeeper { + private val failedSyncMap = mutableMapOf() + + override suspend fun getLastFailedSync(key: Key): Long? { + return failedSyncMap[key] + } + + override suspend fun setLastFailedSync( + key: Key, + timestamp: Long, + ): Boolean { + failedSyncMap[key] = timestamp + return true + } + + override suspend fun clear(key: Key): Boolean { + return failedSyncMap.remove(key) != null + } + + override suspend fun clearAll(): Boolean { + failedSyncMap.clear() + return true + } +} diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestLogger.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestLogger.kt new file mode 100644 index 000000000..332ea0023 --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestLogger.kt @@ -0,0 +1,19 @@ +package org.mobilenativefoundation.store.store5.mutablestore.util + +import org.mobilenativefoundation.store.store5.Logger + +class TestLogger : Logger { + val debugLogs = mutableListOf() + val errorLogs = mutableListOf>() + + override fun debug(message: String) { + debugLogs.add(message) + } + + override fun error( + message: String, + throwable: Throwable?, + ) { + errorLogs.add(message to throwable) + } +} diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestSourceOfTruth.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestSourceOfTruth.kt new file mode 100644 index 000000000..3b569e8d0 --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestSourceOfTruth.kt @@ -0,0 +1,67 @@ +package org.mobilenativefoundation.store.store5.mutablestore.util + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import org.mobilenativefoundation.store.store5.SourceOfTruth + +@Suppress("UNCHECKED_CAST") +class TestSourceOfTruth : SourceOfTruth { + private val storage = HashMap() + private val flows = HashMap>() + private var readError: Throwable? = null + private var writeError: Throwable? = null + private var deleteError: Throwable? = null + private var deleteAllError: Throwable? = null + + fun throwOnRead( + key: Key, + block: () -> Throwable, + ) { + readError = block() + } + + fun throwOnWrite( + key: Key, + block: () -> Throwable, + ) { + writeError = block() + } + + fun throwOnDelete( + key: Key?, + block: () -> Throwable, + ) { + if (key != null) deleteError = block() else deleteAllError = block() + } + + override fun reader(key: Key): Flow = + flow { + readError?.let { throw SourceOfTruth.ReadException(key, it) } + val sharedFlow = flows.getOrPut(key) { MutableSharedFlow(replay = 1) } + emit(storage[key] as Output?) + emitAll(sharedFlow) + } + + override suspend fun write( + key: Key, + value: Local, + ) { + writeError?.let { throw SourceOfTruth.WriteException(key, value, it) } + storage[key] = value + flows[key]?.emit(value as Output?) + } + + override suspend fun delete(key: Key) { + deleteError?.let { throw it } + storage.remove(key) + flows.remove(key) + } + + override suspend fun deleteAll() { + deleteAllError?.let { throw it } + storage.clear() + flows.clear() + } +} diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestStore.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestStore.kt new file mode 100644 index 000000000..1a2fb1f5a --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestStore.kt @@ -0,0 +1,29 @@ +package org.mobilenativefoundation.store.store5.mutablestore.util + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import org.mobilenativefoundation.store.cache5.Cache +import org.mobilenativefoundation.store.store5.Converter +import org.mobilenativefoundation.store.store5.Fetcher +import org.mobilenativefoundation.store.store5.SourceOfTruth +import org.mobilenativefoundation.store.store5.Validator +import org.mobilenativefoundation.store.store5.impl.RealStore + +internal fun testStore( + dispatcher: CoroutineDispatcher = Dispatchers.Default, + scope: CoroutineScope = CoroutineScope(dispatcher), + fetcher: Fetcher = TestFetcher(), + sourceOfTruth: SourceOfTruth = TestSourceOfTruth(), + converter: Converter = TestConverter(), + validator: Validator = TestValidator(), + memoryCache: Cache = TestCache(), +): RealStore = + RealStore( + scope = scope, + fetcher = fetcher, + sourceOfTruth = sourceOfTruth, + converter = converter, + validator = validator, + memCache = memoryCache, + ) diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestUpdater.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestUpdater.kt new file mode 100644 index 000000000..802faa34d --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestUpdater.kt @@ -0,0 +1,23 @@ +package org.mobilenativefoundation.store.store5.mutablestore.util + +import org.mobilenativefoundation.store.store5.OnUpdaterCompletion +import org.mobilenativefoundation.store.store5.Updater +import org.mobilenativefoundation.store.store5.UpdaterResult + +class TestUpdater : Updater { + var exception: Throwable? = null + var errorMessage: String? = null + var successValue: Response? = null + + override suspend fun post( + key: Key, + value: Output, + ): UpdaterResult { + exception?.let { return UpdaterResult.Error.Exception(it) } + errorMessage?.let { return UpdaterResult.Error.Message(it) } + successValue?.let { return UpdaterResult.Success.Typed(it) } + return UpdaterResult.Success.Untyped(value) + } + + override val onCompletion: OnUpdaterCompletion? = null +} diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestValidator.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestValidator.kt new file mode 100644 index 000000000..c6e852429 --- /dev/null +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/mutablestore/util/TestValidator.kt @@ -0,0 +1,18 @@ +package org.mobilenativefoundation.store.store5.mutablestore.util + +import org.mobilenativefoundation.store.store5.Validator + +class TestValidator : Validator { + private val map: HashMap = HashMap() + + fun whenever( + item: Output, + block: () -> Boolean, + ) { + map[item] = block() + } + + override suspend fun isValid(item: Output): Boolean { + return map[item] != false + } +}