diff --git a/CHANGELOG.md b/CHANGELOG.md index 4972f5e2..61e9d74a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-BETA26 + +* Fix issue where event emissions were being suspended for an indeterminate amount of time which resulted in Swift watch queries not being re-run timeously. + ## 1.0.0-BETA25 * JVM: Lower minimum supported version from 17 to 8. diff --git a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt index 090be5e1..483184ee 100644 --- a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt +++ b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt @@ -76,6 +76,7 @@ public actual class DatabaseDriverFactory( ), ) setupSqliteBinding() + return this.driver as PsSqlDriver } diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index a1ec0bfb..6c2fc057 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -1,6 +1,7 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver +import co.touchlab.kermit.Logger import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow @@ -13,8 +14,14 @@ internal class PsSqlDriver( private val driver: SqlDriver, private val scope: CoroutineScope, ) : SqlDriver by driver { - // MutableSharedFlow to emit batched table updates - private val tableUpdatesFlow = MutableSharedFlow>(replay = 0) + private val otherTableUpdatesFlow = + MutableSharedFlow>(replay = 0, extraBufferCapacity = 64) + + private val uiTableUpdatesFlow = + MutableSharedFlow>(replay = 0, extraBufferCapacity = 64) + + private val crudTableUpdatesFlow = + MutableSharedFlow>(replay = 0, extraBufferCapacity = 64) // In-memory buffer to store table names before flushing private val pendingUpdates = mutableSetOf() @@ -27,20 +34,52 @@ internal class PsSqlDriver( pendingUpdates.clear() } - // Flows on table updates - fun tableUpdates(): Flow> = tableUpdatesFlow.asSharedFlow() + fun crudTableUpdates(): Flow> = crudTableUpdatesFlow.asSharedFlow() + + fun updatesOnCrudTable(tableName: String): Flow = crudTableUpdates().filter { it.contains(tableName) }.map { } - // Flows on table updates containing a specific table - fun updatesOnTable(tableName: String): Flow = tableUpdates().filter { it.contains(tableName) }.map { } + fun uiTableUpdates(): Flow> = uiTableUpdatesFlow.asSharedFlow() + + fun otherTableUpdates(): Flow> = otherTableUpdatesFlow.asSharedFlow() fun fireTableUpdates() { val updates = pendingUpdates.toList() if (updates.isEmpty()) { return } - scope.launch { - tableUpdatesFlow.emit(updates) + val otherUpdates = updates.filter { !isDataTable(it) && !isCrudTable(it) } + val uiUpdates = updates.filter { isDataTable(it) } + val crudUpdates = updates.filter { isCrudTable(it) } + + if (!otherTableUpdatesFlow.tryEmit(otherUpdates)) { + Logger.i("Failed to emit other table updates") + } else { + Logger.i(("RUNNING OTHER UPDATES")) + Logger.i(otherUpdates.toString()) + } + + if (!crudTableUpdatesFlow.tryEmit(crudUpdates)) { + Logger.i("Failed to emit CRUD table updates will try normal emit and suspend") + scope.launch { + crudTableUpdatesFlow.emit(crudUpdates) + } + } else if (crudUpdates.isNotEmpty()) { + Logger.i(("RUNNING CRUD UPDATES")) + Logger.i(crudUpdates.toString()) + } + + if (!uiTableUpdatesFlow.tryEmit(uiUpdates)) { + Logger.i("Failed to emit ui table updates") + } else if (uiUpdates.isNotEmpty()) { + Logger.i(("RUNNING UI UPDATES")) + Logger.i(uiUpdates.toString()) } + pendingUpdates.clear() } + + // Determine if a table requires high-priority processing + private fun isDataTable(tableName: String): Boolean = tableName.startsWith("ps_data") + + private fun isCrudTable(tableName: String): Boolean = tableName.startsWith("ps_crud") } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 43abf0f5..3f1f250c 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -6,6 +6,7 @@ import app.cash.sqldelight.coroutines.asFlow import app.cash.sqldelight.coroutines.mapToList import app.cash.sqldelight.db.QueryResult import app.cash.sqldelight.db.SqlPreparedStatement +import co.touchlab.kermit.Logger import com.persistence.PowersyncQueries import com.powersync.PowerSyncException import com.powersync.PsSqlDriver @@ -21,6 +22,7 @@ import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex @@ -37,10 +39,16 @@ internal class InternalDatabaseImpl( override val queries: PowersyncQueries = transactor.powersyncQueries // Register callback for table updates - private fun tableUpdates(): Flow> = driver.tableUpdates() + private fun crudTableUpdates(): Flow> = driver.crudTableUpdates() + + private fun uiTableUpdates(): Flow> = driver.uiTableUpdates() + + private fun otherTableUpdates(): Flow> = driver.otherTableUpdates() // Debounced by transaction completion private val tableUpdatesMutex = Mutex() + private val tableUpdatesMutex2 = Mutex() + private val tableUpdatesMutex3 = Mutex() // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. private val dbContext = Dispatchers.IO @@ -76,23 +84,70 @@ internal class InternalDatabaseImpl( } init { - scope.launch { + scope.launch(Dispatchers.IO) { + val accumulatedUpdates = mutableSetOf() + // Store table changes in an accumulated array which will be (debounced) emitted on transaction end + uiTableUpdates() + .map { tables -> + tables + .map { toFriendlyTableName(it) } + .filter { it.isNotBlank() } + }.filter { dataTables -> dataTables.isNotEmpty() } + .onEach { dataTables -> + tableUpdatesMutex2.withLock { + accumulatedUpdates.addAll(dataTables) + } + }.debounce(DEFAULT_WATCH_THROTTLE_MS) + .collect { _ -> + tableUpdatesMutex2.withLock { + Logger.i("Notifying Listeners of UI Updates") + driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) + accumulatedUpdates.clear() + } + } + } + + scope.launch(Dispatchers.IO) { val accumulatedUpdates = mutableSetOf() // Store table changes in an accumulated array which will be (debounced) emitted on transaction end - tableUpdates() + otherTableUpdates() + .map { tables -> + tables + .map { toFriendlyTableName(it) } + .filter { it.isNotBlank() } + }.filter { dataTables -> dataTables.isNotEmpty() } + .onEach { dataTables -> + tableUpdatesMutex3.withLock { + accumulatedUpdates.addAll(dataTables) + } + }.debounce(DEFAULT_WATCH_THROTTLE_MS) + .collect { _ -> + tableUpdatesMutex3.withLock { + Logger.i("Notifying Listeners of Other Table Updates") + driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) + accumulatedUpdates.clear() + } + } + } + + scope.launch(Dispatchers.IO) { + val accumulatedUpdates = mutableSetOf() + crudTableUpdates() + .map { tables -> + tables + .map { toFriendlyTableName(it) } + .filter { it.isNotBlank() } + }.filter { dataTables -> dataTables.isNotEmpty() } .onEach { tables -> - val dataTables = - tables - .map { toFriendlyTableName(it) } - .filter { it.isNotBlank() } tableUpdatesMutex.withLock { - accumulatedUpdates.addAll(dataTables) + accumulatedUpdates.addAll(tables) } } // debounce ignores events inside the throttle. Debouncing needs to be done after accumulation .debounce(DEFAULT_WATCH_THROTTLE_MS) .collect { _ -> tableUpdatesMutex.withLock { + Logger.i("Notifying Listeners of Crud Updates") driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) accumulatedUpdates.clear() } @@ -197,6 +252,7 @@ internal class InternalDatabaseImpl( .map { toFriendlyTableName(it) } .filter { it.isNotBlank() } .toSet() + // Explore how this works internally return watchQuery( query = sql, parameters = parameters?.size ?: 0, @@ -272,7 +328,7 @@ internal class InternalDatabaseImpl( } // Register callback for table updates on a specific table - override fun updatesOnTable(tableName: String): Flow = driver.updatesOnTable(tableName) + override fun updatesOnTable(tableName: String): Flow = driver.updatesOnCrudTable(tableName) private fun toFriendlyTableName(tableName: String): String { val regex = POWERSYNC_TABLE_MATCH.toRegex() diff --git a/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt b/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt index a12f7974..08ea4c53 100644 --- a/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt +++ b/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt @@ -23,6 +23,7 @@ import kotlinx.coroutines.CoroutineScope @OptIn(ExperimentalForeignApi::class) public actual class DatabaseDriverFactory { private var driver: PsSqlDriver? = null + private var readDriver: PsSqlDriver? = null init { init_powersync_sqlite_extension() @@ -97,6 +98,7 @@ public actual class DatabaseDriverFactory { ), ), ) + return this.driver as PsSqlDriver } diff --git a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt index 6b0a09ba..d01de086 100644 --- a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt +++ b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt @@ -8,6 +8,7 @@ import kotlin.io.path.absolutePathString @Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING", "SqlNoDataSourceInspection") public actual class DatabaseDriverFactory { private var driver: PsSqlDriver? = null + private var readDriver: PsSqlDriver? = null private external fun setupSqliteBinding() diff --git a/gradle.properties b/gradle.properties index e219cfed..f495ba79 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,7 +17,7 @@ development=true RELEASE_SIGNING_ENABLED=true # Library config GROUP=com.powersync -LIBRARY_VERSION=1.0.0-BETA25 +LIBRARY_VERSION=1.0.0-BETA26 GITHUB_REPO=https://github.com/powersync-ja/powersync-kotlin.git # POM POM_URL=https://github.com/powersync-ja/powersync-kotlin/