diff --git a/.gitignore b/.gitignore index f73044b6..575429a5 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ captures Pods/ dialect/bin .build +.vscode diff --git a/CHANGELOG.md b/CHANGELOG.md index 134f481a..b22b7608 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-BETA27 + +* Improved watch query internals. Added the ability to throttle watched queries. + ## 1.0.0-BETA26 * Support bucket priorities and partial syncs. diff --git a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt index 0ca5d9ea..a6a90eff 100644 --- a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt +++ b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt @@ -1,19 +1,18 @@ package com.powersync -import androidx.test.platform.app.InstrumentationRegistry import androidx.test.ext.junit.runners.AndroidJUnit4 +import androidx.test.platform.app.InstrumentationRegistry import app.cash.turbine.turbineScope import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow +import kotlinx.coroutines.* import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import org.junit.After - -import org.junit.Test -import org.junit.runner.RunWith - import org.junit.Assert.* import org.junit.Before +import org.junit.Test +import org.junit.runner.RunWith @RunWith(AndroidJUnit4::class) class AndroidDatabaseTest { @@ -91,4 +90,4 @@ class AndroidDatabaseTest { query.cancel() } } -} \ No newline at end of file +} diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index a1ec0bfb..77c6f658 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -1,23 +1,21 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver +import com.powersync.utils.AtomicMutableSet import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch internal class PsSqlDriver( private val driver: SqlDriver, private val scope: CoroutineScope, ) : SqlDriver by driver { // MutableSharedFlow to emit batched table updates - private val tableUpdatesFlow = MutableSharedFlow>(replay = 0) + private val tableUpdatesFlow = MutableSharedFlow>(replay = 0) // In-memory buffer to store table names before flushing - private val pendingUpdates = mutableSetOf() + private val pendingUpdates = AtomicMutableSet() fun updateTable(tableName: String) { pendingUpdates.add(tableName) @@ -27,20 +25,13 @@ internal class PsSqlDriver( pendingUpdates.clear() } - // Flows on table updates - fun tableUpdates(): Flow> = tableUpdatesFlow.asSharedFlow() + // Flows on any table change + // This specifically returns a SharedFlow for downstream timing considerations + fun updatesOnTables(): SharedFlow> = + tableUpdatesFlow + .asSharedFlow() - // Flows on table updates containing a specific table - fun updatesOnTable(tableName: String): Flow = tableUpdates().filter { it.contains(tableName) }.map { } - - fun fireTableUpdates() { - val updates = pendingUpdates.toList() - if (updates.isEmpty()) { - return - } - scope.launch { - tableUpdatesFlow.emit(updates) - } - pendingUpdates.clear() + suspend fun fireTableUpdates() { + tableUpdatesFlow.emit(pendingUpdates.toSetAndClear()) } } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index ec61a209..2bdd340b 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -18,7 +18,6 @@ internal class BucketStorageImpl( private val db: InternalDatabase, private val logger: Logger, ) : BucketStorage { - private val tableNames: MutableSet = mutableSetOf() private var hasCompletedSync = AtomicBoolean(false) private var pendingBucketDeletes = AtomicBoolean(false) @@ -32,18 +31,6 @@ internal class BucketStorageImpl( const val COMPACT_OPERATION_INTERVAL = 1_000 } - init { - readTableNames() - } - - private fun readTableNames() { - tableNames.clear() - // Query to get existing table names - val names = db.getExistingTableNames("ps_data_*") - - tableNames.addAll(names) - } - override fun getMaxOpId(): String = MAX_OP_ID override suspend fun getClientId(): String { diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 2a3abfdb..b5bb5b2f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -22,13 +22,14 @@ import com.powersync.sync.SyncStatusData import com.powersync.sync.SyncStream import com.powersync.utils.JsonParam import com.powersync.utils.JsonUtil +import com.powersync.utils.throttle import com.powersync.utils.toJsonObject import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking @@ -141,9 +142,13 @@ internal class PowerSyncDatabaseImpl( uploadJob = scope.launch { - internalDb.updatesOnTable(InternalTable.CRUD.toString()).debounce(crudThrottleMs).collect { - syncStream!!.triggerCrudUpload() - } + internalDb + .updatesOnTables() + .filter { it.contains(InternalTable.CRUD.toString()) } + .throttle(crudThrottleMs) + .collect { + syncStream!!.triggerCrudUpload() + } } } @@ -233,8 +238,9 @@ internal class PowerSyncDatabaseImpl( override fun watch( sql: String, parameters: List?, + throttleMs: Long?, mapper: (SqlCursor) -> RowType, - ): Flow> = internalDb.watch(sql, parameters, mapper) + ): Flow> = internalDb.watch(sql, parameters, throttleMs, mapper) override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) @@ -280,7 +286,11 @@ internal class PowerSyncDatabaseImpl( syncStream = null } - currentStatus.update(connected = false, connecting = false, lastSyncedAt = currentStatus.lastSyncedAt) + currentStatus.update( + connected = false, + connecting = false, + lastSyncedAt = currentStatus.lastSyncedAt, + ) } override suspend fun disconnectAndClear(clearLocal: Boolean) { diff --git a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt index f400297c..ba1a0063 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt @@ -54,6 +54,10 @@ public interface Queries { public fun watch( sql: String, parameters: List? = listOf(), + /** + * Specify the minimum interval, in milliseconds, between queries. + */ + throttleMs: Long? = null, mapper: (SqlCursor) -> RowType, ): Flow> diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index 54a9cd16..92362904 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -4,7 +4,7 @@ import app.cash.sqldelight.db.Closeable import com.persistence.PowersyncQueries import com.powersync.db.Queries import com.powersync.persistence.PsDatabase -import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharedFlow internal interface InternalDatabase : Queries, @@ -12,7 +12,5 @@ internal interface InternalDatabase : val transactor: PsDatabase val queries: PowersyncQueries - fun getExistingTableNames(tableGlob: String): List - - fun updatesOnTable(tableName: String): Flow + fun updatesOnTables(): SharedFlow> } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 91cad180..4d37cecc 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -1,9 +1,6 @@ package com.powersync.db.internal import app.cash.sqldelight.ExecutableQuery -import app.cash.sqldelight.Query -import app.cash.sqldelight.coroutines.asFlow -import app.cash.sqldelight.coroutines.mapToList import app.cash.sqldelight.db.QueryResult import app.cash.sqldelight.db.SqlPreparedStatement import com.persistence.PowersyncQueries @@ -13,15 +10,17 @@ import com.powersync.db.SqlCursor import com.powersync.db.runWrapped import com.powersync.persistence.PsDatabase import com.powersync.utils.JsonUtil +import com.powersync.utils.throttle import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.debounce -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.onSubscription import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext @@ -35,14 +34,10 @@ internal class InternalDatabaseImpl( override val transactor: PsDatabase = PsDatabase(driver) override val queries: PowersyncQueries = transactor.powersyncQueries - // Register callback for table updates - private fun tableUpdates(): Flow> = driver.tableUpdates() - - // Debounced by transaction completion - private val tableUpdatesMutex = Mutex() - // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. private val dbContext = Dispatchers.IO + private val writeLock = Mutex() + private val transaction = object : PowerSyncTransaction { override fun execute( @@ -70,43 +65,19 @@ internal class InternalDatabaseImpl( } companion object { - const val POWERSYNC_TABLE_MATCH = "(^ps_data__|^ps_data_local__)" const val DEFAULT_WATCH_THROTTLE_MS = 30L } - init { - scope.launch { - val accumulatedUpdates = mutableSetOf() - // Store table changes in an accumulated array which will be (debounced) emitted on transaction end - tableUpdates() - .onEach { tables -> - val dataTables = - tables - .map { toFriendlyTableName(it) } - .filter { it.isNotBlank() } - tableUpdatesMutex.withLock { - accumulatedUpdates.addAll(dataTables) - } - } - // debounce ignores events inside the throttle. Debouncing needs to be done after accumulation - .debounce(DEFAULT_WATCH_THROTTLE_MS) - .collect { _ -> - tableUpdatesMutex.withLock { - driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) - accumulatedUpdates.clear() - } - } - } - } - override suspend fun execute( sql: String, parameters: List?, ): Long = - withContext(dbContext) { - val r = executeSync(sql, parameters) - driver.fireTableUpdates() - r + writeLock.withLock { + withContext(dbContext) { + executeSync(sql, parameters) + }.also { + driver.fireTableUpdates() + } } private fun executeSync( @@ -189,21 +160,37 @@ internal class InternalDatabaseImpl( override fun watch( sql: String, parameters: List?, + throttleMs: Long?, mapper: (SqlCursor) -> RowType, - ): Flow> { - val tables = - getSourceTables(sql, parameters) - .map { toFriendlyTableName(it) } - .filter { it.isNotBlank() } - .toSet() - return watchQuery( - query = sql, - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), - mapper = mapper, - tables = tables, - ).asFlow().mapToList(scope.coroutineContext) - } + ): Flow> = + // Use a channel flow here since we throttle (buffer used under the hood) + // This causes some emissions to be from different scopes. + channelFlow { + // Fetch the tables asynchronously with getAll + val tables = + getSourceTables(sql, parameters) + .filter { it.isNotBlank() } + .toSet() + + updatesOnTables() + // onSubscription here is very important. + // This ensures that the initial result and all updates are emitted. + .onSubscription { + send(getAll(sql, parameters = parameters, mapper = mapper)) + }.filter { + // Only trigger updates on relevant tables + it.intersect(tables).isNotEmpty() + } + // Throttling here is a feature which prevents watch queries from spamming updates. + // Throttling by design discards and delays events within the throttle window. Discarded events + // still trigger a trailing edge update. + // Backpressure is avoided on the throttling and consumer level by buffering the last upstream value. + // Note that the buffered upstream "value" only serves to trigger the getAll query. We don't buffer watch results. + .throttle(throttleMs ?: DEFAULT_WATCH_THROTTLE_MS) + .collect { + send(getAll(sql, parameters = parameters, mapper = mapper)) + } + } private fun createQuery( query: String, @@ -218,28 +205,6 @@ internal class InternalDatabaseImpl( } } - private fun watchQuery( - query: String, - mapper: (SqlCursor) -> T, - parameters: Int = 0, - binders: (SqlPreparedStatement.() -> Unit)? = null, - tables: Set = setOf(), - ): Query = - object : Query(wrapperMapper(mapper)) { - override fun execute(mapper: (app.cash.sqldelight.db.SqlCursor) -> QueryResult): QueryResult = - runWrapped { - driver.executeQuery(null, query, mapper, parameters, binders) - } - - override fun addListener(listener: Listener) { - driver.addListener(queryKeys = tables.toTypedArray(), listener = listener) - } - - override fun removeListener(listener: Listener) { - driver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) - } - } - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = withContext(dbContext) { transactor.transactionWithResult(noEnclosing = true) { @@ -254,8 +219,8 @@ internal class InternalDatabaseImpl( } override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = - withContext(dbContext) { - val r = + writeLock.withLock { + withContext(dbContext) { transactor.transactionWithResult(noEnclosing = true) { runWrapped { val result = callback.execute(transaction) @@ -265,31 +230,24 @@ internal class InternalDatabaseImpl( result } } - // Trigger watched queries - driver.fireTableUpdates() - r + }.also { + // Trigger watched queries + // Fire updates inside the write lock + driver.fireTableUpdates() + } } // Register callback for table updates on a specific table - override fun updatesOnTable(tableName: String): Flow = driver.updatesOnTable(tableName) - - private fun toFriendlyTableName(tableName: String): String { - val regex = POWERSYNC_TABLE_MATCH.toRegex() - if (regex.containsMatchIn(tableName)) { - return tableName.replace(regex, "") - } - return tableName - } + override fun updatesOnTables(): SharedFlow> = driver.updatesOnTables() - private fun getSourceTables( + private suspend fun getSourceTables( sql: String, parameters: List?, ): Set { val rows = - createQuery( - query = "EXPLAIN $sql", - parameters = parameters?.size ?: 0, - binders = getBindersFromParams(parameters), + getAll( + "EXPLAIN $sql", + parameters = parameters, mapper = { ExplainQueryResult( addr = it.getString(0)!!, @@ -299,7 +257,7 @@ internal class InternalDatabaseImpl( p3 = it.getLong(4)!!, ) }, - ).executeAsList() + ) val rootPages = mutableListOf() for (row in rows) { @@ -309,33 +267,15 @@ internal class InternalDatabaseImpl( } val params = listOf(JsonUtil.json.encodeToString(rootPages)) val tableRows = - createQuery( + getAll( "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", - parameters = params.size, - binders = { - bindString(0, params[0]) - }, + parameters = params, mapper = { it.getString(0)!! }, - ).executeAsList() + ) return tableRows.toSet() } - override fun getExistingTableNames(tableGlob: String): List { - val existingTableNames = - createQuery( - "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB ?", - parameters = 1, - binders = { - bindString(0, tableGlob) - }, - mapper = { cursor -> - cursor.getString(0)!! - }, - ).executeAsList() - return existingTableNames - } - override fun close() { runWrapped { this.driver.close() } } diff --git a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt new file mode 100644 index 00000000..536ddd1a --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt @@ -0,0 +1,26 @@ +package com.powersync.utils + +import kotlinx.atomicfu.locks.SynchronizedObject +import kotlinx.atomicfu.locks.synchronized + +public class AtomicMutableSet : SynchronizedObject() { + private val set = mutableSetOf() + + public fun add(element: T): Boolean = + synchronized(this) { + return set.add(element) + } + + // Synchronized clear method + public fun clear(): Unit = + synchronized(this) { + set.clear() + } + + public fun toSetAndClear(): Set = + synchronized(this) { + val copied = set.toList().toSet() + set.clear() + copied + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt new file mode 100644 index 00000000..711b2cb4 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt @@ -0,0 +1,31 @@ +package com.powersync.utils + +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.flow + +/** + * Throttles a flow with emissions on the leading and trailing edge. + * Events, from the incoming flow, during the throttle window are discarded. + * Events are discarded by using a conflated buffer. + * This throttle method acts as a slow consumer, but backpressure is not a concern + * due to the conflated buffer dropping events during the throttle window. + */ +internal fun Flow.throttle(windowMs: Long): Flow = + flow { + // Use a buffer before throttle (ensure only the latest event is kept) + val bufferedFlow = this@throttle.buffer(Channel.CONFLATED) + + bufferedFlow.collect { value -> + // Emit the event immediately (leading edge) + emit(value) + + // Delay for the throttle window to avoid emitting too frequently + delay(windowMs) + + // The next incoming event will be provided from the buffer. + // The next collect will emit the trailing edge + } + } diff --git a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt index 946b8580..0347d967 100644 --- a/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/bucket/BucketStorageTest.kt @@ -5,7 +5,6 @@ import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import com.powersync.db.internal.InternalDatabase import dev.mokkery.answering.returns -import dev.mokkery.every import dev.mokkery.everySuspend import dev.mokkery.matcher.any import dev.mokkery.mock @@ -24,9 +23,7 @@ class BucketStorageTest { @Test fun testGetMaxOpId() { mockDb = - mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") - } + mock {} bucketStorage = BucketStorageImpl(mockDb, Logger) assertEquals("9223372036854775807", bucketStorage.getMaxOpId()) } @@ -36,7 +33,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional( any(), @@ -55,7 +51,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional( any(), @@ -88,7 +83,6 @@ class BucketStorageTest { ) mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(), any(), any()) } returns mockCrudEntry } bucketStorage = BucketStorageImpl(mockDb, Logger) @@ -102,7 +96,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(), any(), any()) } returns null } bucketStorage = BucketStorageImpl(mockDb, Logger) @@ -116,7 +109,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(), any(), any()) } returns 1L } bucketStorage = BucketStorageImpl(mockDb, Logger) @@ -129,7 +121,6 @@ class BucketStorageTest { runTest { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional(any(), any(), any()) } returns null } bucketStorage = BucketStorageImpl(mockDb, Logger) @@ -142,7 +133,6 @@ class BucketStorageTest { runBlocking { mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional( any(), @@ -164,7 +154,6 @@ class BucketStorageTest { val mockBucketStates = listOf(BucketState("bucket1", "op1"), BucketState("bucket2", "op2")) mockDb = mock { - every { getExistingTableNames("ps_data_*") } returns listOf("list_1", "list_2") everySuspend { getOptional( any(), diff --git a/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt b/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt index 0157012f..aa2611e1 100644 --- a/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt @@ -1,5 +1,10 @@ package com.powersync.utils +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonNull import kotlinx.serialization.json.JsonObject @@ -36,6 +41,28 @@ class JsonTest { assertEquals("test", jsonElement.content) } + @Test + fun testThrottle() { + runTest { + val t = + flow { + emit(1) + delay(10) + emit(2) + delay(20) + emit(3) + delay(100) + emit(4) + }.throttle(100) + .map { + // Adding a delay here to simulate a slow consumer + delay(1000) + it + }.toList() + assertEquals(t, listOf(1, 4)) + } + } + @Test fun testBooleanToJsonElement() { val boolean = JsonParam.Boolean(true) diff --git a/demos/supabase-todolist/androidApp/src/androidMain/AndroidManifest.xml b/demos/supabase-todolist/androidApp/src/androidMain/AndroidManifest.xml index 89050814..11d32bb4 100644 --- a/demos/supabase-todolist/androidApp/src/androidMain/AndroidManifest.xml +++ b/demos/supabase-todolist/androidApp/src/androidMain/AndroidManifest.xml @@ -10,7 +10,9 @@ android:roundIcon="@mipmap/ic_launcher_round" android:supportsRtl="true" android:theme="@style/Theme.AppCompat.Light.NoActionBar" - android:enableOnBackInvokedCallback="true"> + android:enableOnBackInvokedCallback="true" + android:networkSecurityConfig="@xml/network_security_config" + > + + + localhost + +