From 73b8ea678a97e5a85a182352f8f80b70e8569da1 Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Fri, 21 Feb 2025 14:48:24 +0200 Subject: [PATCH 1/6] fix: swift watch query issue --- CHANGELOG.md | 4 ++++ .../kotlin/com/powersync/PsSqlDriver.kt | 15 +++++++++++---- gradle.properties | 2 +- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4972f5e2..61e9d74a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-BETA26 + +* Fix issue where event emissions were being suspended for an indeterminate amount of time which resulted in Swift watch queries not being re-run timeously. + ## 1.0.0-BETA25 * JVM: Lower minimum supported version from 17 to 8. diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index a1ec0bfb..3fb7637a 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -1,7 +1,10 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver +import co.touchlab.kermit.Logger import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.asSharedFlow @@ -14,7 +17,7 @@ internal class PsSqlDriver( private val scope: CoroutineScope, ) : SqlDriver by driver { // MutableSharedFlow to emit batched table updates - private val tableUpdatesFlow = MutableSharedFlow>(replay = 0) + private val tableUpdatesFlow = MutableSharedFlow>(replay = 0, extraBufferCapacity = 64) // In-memory buffer to store table names before flushing private val pendingUpdates = mutableSetOf() @@ -38,9 +41,13 @@ internal class PsSqlDriver( if (updates.isEmpty()) { return } - scope.launch { - tableUpdatesFlow.emit(updates) + + scope.launch(Dispatchers.IO) { + if (!tableUpdatesFlow.tryEmit(updates)) { + Logger.w("Failed to emit table updates") + } else { + pendingUpdates.clear() + } } - pendingUpdates.clear() } } diff --git a/gradle.properties b/gradle.properties index e219cfed..f495ba79 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,7 +17,7 @@ development=true RELEASE_SIGNING_ENABLED=true # Library config GROUP=com.powersync -LIBRARY_VERSION=1.0.0-BETA25 +LIBRARY_VERSION=1.0.0-BETA26 GITHUB_REPO=https://github.com/powersync-ja/powersync-kotlin.git # POM POM_URL=https://github.com/powersync-ja/powersync-kotlin/ From 88d486d0986f3de86f5212fe0f71729b273efc25 Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Mon, 24 Feb 2025 14:02:06 +0200 Subject: [PATCH 2/6] chore: convert to suspend --- .../kotlin/com/powersync/PsSqlDriver.kt | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index 3fb7637a..c13bdf75 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -1,26 +1,25 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver -import co.touchlab.kermit.Logger import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.IO +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch internal class PsSqlDriver( private val driver: SqlDriver, private val scope: CoroutineScope, ) : SqlDriver by driver { // MutableSharedFlow to emit batched table updates - private val tableUpdatesFlow = MutableSharedFlow>(replay = 0, extraBufferCapacity = 64) + private val tableUpdatesFlow = + MutableSharedFlow>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) // In-memory buffer to store table names before flushing private val pendingUpdates = mutableSetOf() + private val currentUpdates = mutableSetOf() fun updateTable(tableName: String) { pendingUpdates.add(tableName) @@ -36,18 +35,14 @@ internal class PsSqlDriver( // Flows on table updates containing a specific table fun updatesOnTable(tableName: String): Flow = tableUpdates().filter { it.contains(tableName) }.map { } - fun fireTableUpdates() { + suspend fun fireTableUpdates() { val updates = pendingUpdates.toList() + if (updates.isEmpty()) { return } + tableUpdatesFlow.emit(updates) - scope.launch(Dispatchers.IO) { - if (!tableUpdatesFlow.tryEmit(updates)) { - Logger.w("Failed to emit table updates") - } else { - pendingUpdates.clear() - } - } + pendingUpdates.clear() } } From bfe64f132fb8621a0d875d99e58cea0697317569 Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Mon, 24 Feb 2025 17:06:21 +0200 Subject: [PATCH 3/6] chore: revert --- .../commonMain/kotlin/com/powersync/PsSqlDriver.kt | 13 +++++++------ .../powersync/db/internal/InternalDatabaseImpl.kt | 1 - 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index c13bdf75..6a5996cf 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -1,8 +1,8 @@ package com.powersync import app.cash.sqldelight.db.SqlDriver +import co.touchlab.kermit.Logger import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.asSharedFlow @@ -15,11 +15,10 @@ internal class PsSqlDriver( ) : SqlDriver by driver { // MutableSharedFlow to emit batched table updates private val tableUpdatesFlow = - MutableSharedFlow>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + MutableSharedFlow>(replay = 0, extraBufferCapacity = 64) // In-memory buffer to store table names before flushing private val pendingUpdates = mutableSetOf() - private val currentUpdates = mutableSetOf() fun updateTable(tableName: String) { pendingUpdates.add(tableName) @@ -35,13 +34,15 @@ internal class PsSqlDriver( // Flows on table updates containing a specific table fun updatesOnTable(tableName: String): Flow = tableUpdates().filter { it.contains(tableName) }.map { } - suspend fun fireTableUpdates() { + fun fireTableUpdates() { val updates = pendingUpdates.toList() - if (updates.isEmpty()) { return } - tableUpdatesFlow.emit(updates) + Logger.i(pendingUpdates.toString()) + if (!tableUpdatesFlow.tryEmit(updates)) { + Logger.i("Failed to emit table updates") + } pendingUpdates.clear() } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 43abf0f5..91cad180 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -20,7 +20,6 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce -import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex From 0c079d65adc4ffab9785eecefef54cc4b79b887b Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Tue, 25 Feb 2025 15:45:14 +0200 Subject: [PATCH 4/6] chore: try add a read driver --- .../DatabaseDriverFactory.android.kt | 4 +++ .../com/powersync/DatabaseDriverFactory.kt | 2 ++ .../com/powersync/db/PowerSyncDatabaseImpl.kt | 3 ++- .../db/internal/InternalDatabaseImpl.kt | 9 ++++--- .../powersync/DatabaseDriverFactory.ios.kt | 27 +++++++++++++++++++ .../powersync/DatabaseDriverFactory.jvm.kt | 2 ++ 6 files changed, 43 insertions(+), 4 deletions(-) diff --git a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt index 090be5e1..5ddbd039 100644 --- a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt +++ b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt @@ -76,6 +76,7 @@ public actual class DatabaseDriverFactory( ), ) setupSqliteBinding() + return this.driver as PsSqlDriver } @@ -84,4 +85,7 @@ public actual class DatabaseDriverFactory( System.loadLibrary("powersync-sqlite") } } + + // Do not have the same issues with Android so can be looked at later + internal actual fun getReadDriver(): PsSqlDriver = this.driver ?: throw IllegalStateException("Driver not initialized") } diff --git a/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt b/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt index ab469ff0..4a05313e 100644 --- a/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt +++ b/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt @@ -8,4 +8,6 @@ public expect class DatabaseDriverFactory { scope: CoroutineScope, dbFilename: String, ): PsSqlDriver + + internal fun getReadDriver(): PsSqlDriver } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index b7a7d5dc..103d869a 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -50,7 +50,8 @@ internal class PowerSyncDatabaseImpl( val logger: Logger = Logger, driver: PsSqlDriver = factory.createDriver(scope, dbFilename), ) : PowerSyncDatabase { - private val internalDb = InternalDatabaseImpl(driver, scope) + private val readDriver = factory.getReadDriver() + private val internalDb = InternalDatabaseImpl(driver, scope, readDriver) private val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) /** diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 91cad180..c2f264ae 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -31,6 +31,7 @@ import kotlinx.serialization.encodeToString internal class InternalDatabaseImpl( private val driver: PsSqlDriver, private val scope: CoroutineScope, + private val readDriver: PsSqlDriver, ) : InternalDatabase { override val transactor: PsDatabase = PsDatabase(driver) override val queries: PowersyncQueries = transactor.powersyncQueries @@ -87,6 +88,8 @@ internal class InternalDatabaseImpl( tableUpdatesMutex.withLock { accumulatedUpdates.addAll(dataTables) } + + readDriver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) } // debounce ignores events inside the throttle. Debouncing needs to be done after accumulation .debounce(DEFAULT_WATCH_THROTTLE_MS) @@ -228,15 +231,15 @@ internal class InternalDatabaseImpl( object : Query(wrapperMapper(mapper)) { override fun execute(mapper: (app.cash.sqldelight.db.SqlCursor) -> QueryResult): QueryResult = runWrapped { - driver.executeQuery(null, query, mapper, parameters, binders) + readDriver.executeQuery(null, query, mapper, parameters, binders) } override fun addListener(listener: Listener) { - driver.addListener(queryKeys = tables.toTypedArray(), listener = listener) + readDriver.addListener(queryKeys = tables.toTypedArray(), listener = listener) } override fun removeListener(listener: Listener) { - driver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) + readDriver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) } } diff --git a/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt b/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt index a12f7974..13278a2c 100644 --- a/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt +++ b/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt @@ -23,6 +23,7 @@ import kotlinx.coroutines.CoroutineScope @OptIn(ExperimentalForeignApi::class) public actual class DatabaseDriverFactory { private var driver: PsSqlDriver? = null + private var readDriver: PsSqlDriver? = null init { init_powersync_sqlite_extension() @@ -97,6 +98,30 @@ public actual class DatabaseDriverFactory { ), ), ) + + // Create a separate read-only driver + this.readDriver = + PsSqlDriver( + scope = scope, + driver = + NativeSqliteDriver( + configuration = + DatabaseConfiguration( + name = dbFilename, + version = schema.version.toInt(), + create = { }, // No need to create schema again + loggingConfig = Logging(logger = sqlLogger), + lifecycleConfig = + DatabaseConfiguration.Lifecycle( + onCreateConnection = { connection -> + // Set connection to read-only mode + connection.rawExecSql("PRAGMA query_only = 1;") + }, + ), + ), + ), + ) + return this.driver as PsSqlDriver } @@ -149,4 +174,6 @@ public actual class DatabaseDriverFactory { null, ) } + + internal actual fun getReadDriver(): PsSqlDriver = this.readDriver ?: throw IllegalStateException("Read driver not initialized") } diff --git a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt index 6b0a09ba..cacd7106 100644 --- a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt +++ b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt @@ -8,6 +8,7 @@ import kotlin.io.path.absolutePathString @Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING", "SqlNoDataSourceInspection") public actual class DatabaseDriverFactory { private var driver: PsSqlDriver? = null + private var readDriver: PsSqlDriver? = null private external fun setupSqliteBinding() @@ -57,6 +58,7 @@ public actual class DatabaseDriverFactory { return this.driver!! } + internal actual fun getReadDriver(): PsSqlDriver = driver ?: throw IllegalStateException("Read driver not initialized") public companion object { private val jniExtension: Path From e55ecbefdbf7ba84adce1daa73cf72e6a83c6b75 Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Wed, 26 Feb 2025 15:10:31 +0200 Subject: [PATCH 5/6] chore: revert readDriver and implement 3 streams --- .../DatabaseDriverFactory.android.kt | 3 - .../com/powersync/DatabaseDriverFactory.kt | 2 - .../kotlin/com/powersync/PsSqlDriver.kt | 54 ++++++++++-- .../db/internal/InternalDatabaseImpl.kt | 83 +++++++++++++++---- .../powersync/DatabaseDriverFactory.ios.kt | 25 ------ .../powersync/DatabaseDriverFactory.jvm.kt | 1 - 6 files changed, 114 insertions(+), 54 deletions(-) diff --git a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt index 5ddbd039..483184ee 100644 --- a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt +++ b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt @@ -85,7 +85,4 @@ public actual class DatabaseDriverFactory( System.loadLibrary("powersync-sqlite") } } - - // Do not have the same issues with Android so can be looked at later - internal actual fun getReadDriver(): PsSqlDriver = this.driver ?: throw IllegalStateException("Driver not initialized") } diff --git a/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt b/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt index 4a05313e..ab469ff0 100644 --- a/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt +++ b/core/src/commonMain/kotlin/com/powersync/DatabaseDriverFactory.kt @@ -8,6 +8,4 @@ public expect class DatabaseDriverFactory { scope: CoroutineScope, dbFilename: String, ): PsSqlDriver - - internal fun getReadDriver(): PsSqlDriver } diff --git a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt index 6a5996cf..6c2fc057 100644 --- a/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt +++ b/core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt @@ -8,13 +8,19 @@ import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch internal class PsSqlDriver( private val driver: SqlDriver, private val scope: CoroutineScope, ) : SqlDriver by driver { - // MutableSharedFlow to emit batched table updates - private val tableUpdatesFlow = + private val otherTableUpdatesFlow = + MutableSharedFlow>(replay = 0, extraBufferCapacity = 64) + + private val uiTableUpdatesFlow = + MutableSharedFlow>(replay = 0, extraBufferCapacity = 64) + + private val crudTableUpdatesFlow = MutableSharedFlow>(replay = 0, extraBufferCapacity = 64) // In-memory buffer to store table names before flushing @@ -28,22 +34,52 @@ internal class PsSqlDriver( pendingUpdates.clear() } - // Flows on table updates - fun tableUpdates(): Flow> = tableUpdatesFlow.asSharedFlow() + fun crudTableUpdates(): Flow> = crudTableUpdatesFlow.asSharedFlow() - // Flows on table updates containing a specific table - fun updatesOnTable(tableName: String): Flow = tableUpdates().filter { it.contains(tableName) }.map { } + fun updatesOnCrudTable(tableName: String): Flow = crudTableUpdates().filter { it.contains(tableName) }.map { } + + fun uiTableUpdates(): Flow> = uiTableUpdatesFlow.asSharedFlow() + + fun otherTableUpdates(): Flow> = otherTableUpdatesFlow.asSharedFlow() fun fireTableUpdates() { val updates = pendingUpdates.toList() if (updates.isEmpty()) { return } - Logger.i(pendingUpdates.toString()) - if (!tableUpdatesFlow.tryEmit(updates)) { - Logger.i("Failed to emit table updates") + val otherUpdates = updates.filter { !isDataTable(it) && !isCrudTable(it) } + val uiUpdates = updates.filter { isDataTable(it) } + val crudUpdates = updates.filter { isCrudTable(it) } + + if (!otherTableUpdatesFlow.tryEmit(otherUpdates)) { + Logger.i("Failed to emit other table updates") + } else { + Logger.i(("RUNNING OTHER UPDATES")) + Logger.i(otherUpdates.toString()) + } + + if (!crudTableUpdatesFlow.tryEmit(crudUpdates)) { + Logger.i("Failed to emit CRUD table updates will try normal emit and suspend") + scope.launch { + crudTableUpdatesFlow.emit(crudUpdates) + } + } else if (crudUpdates.isNotEmpty()) { + Logger.i(("RUNNING CRUD UPDATES")) + Logger.i(crudUpdates.toString()) + } + + if (!uiTableUpdatesFlow.tryEmit(uiUpdates)) { + Logger.i("Failed to emit ui table updates") + } else if (uiUpdates.isNotEmpty()) { + Logger.i(("RUNNING UI UPDATES")) + Logger.i(uiUpdates.toString()) } pendingUpdates.clear() } + + // Determine if a table requires high-priority processing + private fun isDataTable(tableName: String): Boolean = tableName.startsWith("ps_data") + + private fun isCrudTable(tableName: String): Boolean = tableName.startsWith("ps_crud") } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index c2f264ae..a5f8b26e 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -6,6 +6,7 @@ import app.cash.sqldelight.coroutines.asFlow import app.cash.sqldelight.coroutines.mapToList import app.cash.sqldelight.db.QueryResult import app.cash.sqldelight.db.SqlPreparedStatement +import co.touchlab.kermit.Logger import com.persistence.PowersyncQueries import com.powersync.PowerSyncException import com.powersync.PsSqlDriver @@ -20,6 +21,8 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex @@ -37,10 +40,16 @@ internal class InternalDatabaseImpl( override val queries: PowersyncQueries = transactor.powersyncQueries // Register callback for table updates - private fun tableUpdates(): Flow> = driver.tableUpdates() + private fun crudTableUpdates(): Flow> = driver.crudTableUpdates() + + private fun uiTableUpdates(): Flow> = driver.uiTableUpdates() + + private fun otherTableUpdates(): Flow> = driver.otherTableUpdates() // Debounced by transaction completion private val tableUpdatesMutex = Mutex() + private val tableUpdatesMutex2 = Mutex() + private val tableUpdatesMutex3 = Mutex() // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. private val dbContext = Dispatchers.IO @@ -76,25 +85,70 @@ internal class InternalDatabaseImpl( } init { - scope.launch { + scope.launch(Dispatchers.IO) { val accumulatedUpdates = mutableSetOf() // Store table changes in an accumulated array which will be (debounced) emitted on transaction end - tableUpdates() - .onEach { tables -> - val dataTables = - tables - .map { toFriendlyTableName(it) } - .filter { it.isNotBlank() } - tableUpdatesMutex.withLock { + uiTableUpdates() + .map { tables -> + tables + .map { toFriendlyTableName(it) } + .filter { it.isNotBlank() } + }.filter { dataTables -> dataTables.isNotEmpty() } + .onEach { dataTables -> + tableUpdatesMutex2.withLock { + accumulatedUpdates.addAll(dataTables) + } + }.debounce(DEFAULT_WATCH_THROTTLE_MS) + .collect { _ -> + tableUpdatesMutex2.withLock { + Logger.i("Notifying Listeners of UI Updates") + driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) + accumulatedUpdates.clear() + } + } + } + + scope.launch(Dispatchers.IO) { + val accumulatedUpdates = mutableSetOf() + // Store table changes in an accumulated array which will be (debounced) emitted on transaction end + otherTableUpdates() + .map { tables -> + tables + .map { toFriendlyTableName(it) } + .filter { it.isNotBlank() } + }.filter { dataTables -> dataTables.isNotEmpty() } + .onEach { dataTables -> + tableUpdatesMutex3.withLock { accumulatedUpdates.addAll(dataTables) } + }.debounce(DEFAULT_WATCH_THROTTLE_MS) + .collect { _ -> + tableUpdatesMutex3.withLock { + Logger.i("Notifying Listeners of Other Table Updates") + driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) + accumulatedUpdates.clear() + } + } + } - readDriver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) + scope.launch(Dispatchers.IO) { + val accumulatedUpdates = mutableSetOf() + crudTableUpdates() + .map { tables -> + tables + .map { toFriendlyTableName(it) } + .filter { it.isNotBlank() } + }.filter { dataTables -> dataTables.isNotEmpty() } + .onEach { tables -> + tableUpdatesMutex.withLock { + accumulatedUpdates.addAll(tables) + } } // debounce ignores events inside the throttle. Debouncing needs to be done after accumulation .debounce(DEFAULT_WATCH_THROTTLE_MS) .collect { _ -> tableUpdatesMutex.withLock { + Logger.i("Notifying Listeners of Crud Updates") driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray()) accumulatedUpdates.clear() } @@ -199,6 +253,7 @@ internal class InternalDatabaseImpl( .map { toFriendlyTableName(it) } .filter { it.isNotBlank() } .toSet() + // Explore how this works internally return watchQuery( query = sql, parameters = parameters?.size ?: 0, @@ -231,15 +286,15 @@ internal class InternalDatabaseImpl( object : Query(wrapperMapper(mapper)) { override fun execute(mapper: (app.cash.sqldelight.db.SqlCursor) -> QueryResult): QueryResult = runWrapped { - readDriver.executeQuery(null, query, mapper, parameters, binders) + driver.executeQuery(null, query, mapper, parameters, binders) } override fun addListener(listener: Listener) { - readDriver.addListener(queryKeys = tables.toTypedArray(), listener = listener) + driver.addListener(queryKeys = tables.toTypedArray(), listener = listener) } override fun removeListener(listener: Listener) { - readDriver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) + driver.removeListener(queryKeys = tables.toTypedArray(), listener = listener) } } @@ -274,7 +329,7 @@ internal class InternalDatabaseImpl( } // Register callback for table updates on a specific table - override fun updatesOnTable(tableName: String): Flow = driver.updatesOnTable(tableName) + override fun updatesOnTable(tableName: String): Flow = driver.updatesOnCrudTable(tableName) private fun toFriendlyTableName(tableName: String): String { val regex = POWERSYNC_TABLE_MATCH.toRegex() diff --git a/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt b/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt index 13278a2c..08ea4c53 100644 --- a/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt +++ b/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt @@ -99,29 +99,6 @@ public actual class DatabaseDriverFactory { ), ) - // Create a separate read-only driver - this.readDriver = - PsSqlDriver( - scope = scope, - driver = - NativeSqliteDriver( - configuration = - DatabaseConfiguration( - name = dbFilename, - version = schema.version.toInt(), - create = { }, // No need to create schema again - loggingConfig = Logging(logger = sqlLogger), - lifecycleConfig = - DatabaseConfiguration.Lifecycle( - onCreateConnection = { connection -> - // Set connection to read-only mode - connection.rawExecSql("PRAGMA query_only = 1;") - }, - ), - ), - ), - ) - return this.driver as PsSqlDriver } @@ -174,6 +151,4 @@ public actual class DatabaseDriverFactory { null, ) } - - internal actual fun getReadDriver(): PsSqlDriver = this.readDriver ?: throw IllegalStateException("Read driver not initialized") } diff --git a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt index cacd7106..d01de086 100644 --- a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt +++ b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt @@ -58,7 +58,6 @@ public actual class DatabaseDriverFactory { return this.driver!! } - internal actual fun getReadDriver(): PsSqlDriver = driver ?: throw IllegalStateException("Read driver not initialized") public companion object { private val jniExtension: Path From a6fa6b4c4f2d31d93b6fce7dd3021d33f278fba1 Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Wed, 26 Feb 2025 15:11:50 +0200 Subject: [PATCH 6/6] chore: revert readDriver and implement 3 streams --- .../kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt | 3 +-- .../kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 103d869a..b7a7d5dc 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -50,8 +50,7 @@ internal class PowerSyncDatabaseImpl( val logger: Logger = Logger, driver: PsSqlDriver = factory.createDriver(scope, dbFilename), ) : PowerSyncDatabase { - private val readDriver = factory.getReadDriver() - private val internalDb = InternalDatabaseImpl(driver, scope, readDriver) + private val internalDb = InternalDatabaseImpl(driver, scope) private val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) /** diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index a5f8b26e..3f1f250c 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -34,7 +34,6 @@ import kotlinx.serialization.encodeToString internal class InternalDatabaseImpl( private val driver: PsSqlDriver, private val scope: CoroutineScope, - private val readDriver: PsSqlDriver, ) : InternalDatabase { override val transactor: PsDatabase = PsDatabase(driver) override val queries: PowersyncQueries = transactor.powersyncQueries