Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.0.0-BETA26

* Fix issue where event emissions were being suspended for an indeterminate amount of time which resulted in Swift watch queries not being re-run timeously.

## 1.0.0-BETA25

* JVM: Lower minimum supported version from 17 to 8.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public actual class DatabaseDriverFactory(
),
)
setupSqliteBinding()

return this.driver as PsSqlDriver
}

Expand Down
55 changes: 47 additions & 8 deletions core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.powersync

import app.cash.sqldelight.db.SqlDriver
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
Expand All @@ -13,8 +14,14 @@ internal class PsSqlDriver(
private val driver: SqlDriver,
private val scope: CoroutineScope,
) : SqlDriver by driver {
// MutableSharedFlow to emit batched table updates
private val tableUpdatesFlow = MutableSharedFlow<List<String>>(replay = 0)
private val otherTableUpdatesFlow =
MutableSharedFlow<List<String>>(replay = 0, extraBufferCapacity = 64)

private val uiTableUpdatesFlow =
MutableSharedFlow<List<String>>(replay = 0, extraBufferCapacity = 64)

private val crudTableUpdatesFlow =
MutableSharedFlow<List<String>>(replay = 0, extraBufferCapacity = 64)

// In-memory buffer to store table names before flushing
private val pendingUpdates = mutableSetOf<String>()
Expand All @@ -27,20 +34,52 @@ internal class PsSqlDriver(
pendingUpdates.clear()
}

// Flows on table updates
fun tableUpdates(): Flow<List<String>> = tableUpdatesFlow.asSharedFlow()
fun crudTableUpdates(): Flow<List<String>> = crudTableUpdatesFlow.asSharedFlow()

fun updatesOnCrudTable(tableName: String): Flow<Unit> = crudTableUpdates().filter { it.contains(tableName) }.map { }

// Flows on table updates containing a specific table
fun updatesOnTable(tableName: String): Flow<Unit> = tableUpdates().filter { it.contains(tableName) }.map { }
fun uiTableUpdates(): Flow<List<String>> = uiTableUpdatesFlow.asSharedFlow()

fun otherTableUpdates(): Flow<List<String>> = otherTableUpdatesFlow.asSharedFlow()

fun fireTableUpdates() {
val updates = pendingUpdates.toList()
if (updates.isEmpty()) {
return
}
scope.launch {
tableUpdatesFlow.emit(updates)
val otherUpdates = updates.filter { !isDataTable(it) && !isCrudTable(it) }
val uiUpdates = updates.filter { isDataTable(it) }
val crudUpdates = updates.filter { isCrudTable(it) }

if (!otherTableUpdatesFlow.tryEmit(otherUpdates)) {
Logger.i("Failed to emit other table updates")
} else {
Logger.i(("RUNNING OTHER UPDATES"))
Logger.i(otherUpdates.toString())
}

if (!crudTableUpdatesFlow.tryEmit(crudUpdates)) {
Logger.i("Failed to emit CRUD table updates will try normal emit and suspend")
scope.launch {
crudTableUpdatesFlow.emit(crudUpdates)
}
} else if (crudUpdates.isNotEmpty()) {
Logger.i(("RUNNING CRUD UPDATES"))
Logger.i(crudUpdates.toString())
}

if (!uiTableUpdatesFlow.tryEmit(uiUpdates)) {
Logger.i("Failed to emit ui table updates")
} else if (uiUpdates.isNotEmpty()) {
Logger.i(("RUNNING UI UPDATES"))
Logger.i(uiUpdates.toString())
}

pendingUpdates.clear()
}

// Determine if a table requires high-priority processing
private fun isDataTable(tableName: String): Boolean = tableName.startsWith("ps_data")

private fun isCrudTable(tableName: String): Boolean = tableName.startsWith("ps_crud")
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import app.cash.sqldelight.coroutines.asFlow
import app.cash.sqldelight.coroutines.mapToList
import app.cash.sqldelight.db.QueryResult
import app.cash.sqldelight.db.SqlPreparedStatement
import co.touchlab.kermit.Logger
import com.persistence.PowersyncQueries
import com.powersync.PowerSyncException
import com.powersync.PsSqlDriver
Expand All @@ -21,6 +22,7 @@ import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
Expand All @@ -37,10 +39,16 @@ internal class InternalDatabaseImpl(
override val queries: PowersyncQueries = transactor.powersyncQueries

// Register callback for table updates
private fun tableUpdates(): Flow<List<String>> = driver.tableUpdates()
private fun crudTableUpdates(): Flow<List<String>> = driver.crudTableUpdates()

private fun uiTableUpdates(): Flow<List<String>> = driver.uiTableUpdates()

private fun otherTableUpdates(): Flow<List<String>> = driver.otherTableUpdates()

// Debounced by transaction completion
private val tableUpdatesMutex = Mutex()
private val tableUpdatesMutex2 = Mutex()
private val tableUpdatesMutex3 = Mutex()

// Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss.
private val dbContext = Dispatchers.IO
Expand Down Expand Up @@ -76,23 +84,70 @@ internal class InternalDatabaseImpl(
}

init {
scope.launch {
scope.launch(Dispatchers.IO) {
val accumulatedUpdates = mutableSetOf<String>()
// Store table changes in an accumulated array which will be (debounced) emitted on transaction end
uiTableUpdates()
.map { tables ->
tables
.map { toFriendlyTableName(it) }
.filter { it.isNotBlank() }
}.filter { dataTables -> dataTables.isNotEmpty() }
.onEach { dataTables ->
tableUpdatesMutex2.withLock {
accumulatedUpdates.addAll(dataTables)
}
}.debounce(DEFAULT_WATCH_THROTTLE_MS)
.collect { _ ->
tableUpdatesMutex2.withLock {
Logger.i("Notifying Listeners of UI Updates")
driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray())
accumulatedUpdates.clear()
}
}
}

scope.launch(Dispatchers.IO) {
val accumulatedUpdates = mutableSetOf<String>()
// Store table changes in an accumulated array which will be (debounced) emitted on transaction end
tableUpdates()
otherTableUpdates()
.map { tables ->
tables
.map { toFriendlyTableName(it) }
.filter { it.isNotBlank() }
}.filter { dataTables -> dataTables.isNotEmpty() }
.onEach { dataTables ->
tableUpdatesMutex3.withLock {
accumulatedUpdates.addAll(dataTables)
}
}.debounce(DEFAULT_WATCH_THROTTLE_MS)
.collect { _ ->
tableUpdatesMutex3.withLock {
Logger.i("Notifying Listeners of Other Table Updates")
driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray())
accumulatedUpdates.clear()
}
}
}

scope.launch(Dispatchers.IO) {
val accumulatedUpdates = mutableSetOf<String>()
crudTableUpdates()
.map { tables ->
tables
.map { toFriendlyTableName(it) }
.filter { it.isNotBlank() }
}.filter { dataTables -> dataTables.isNotEmpty() }
.onEach { tables ->
val dataTables =
tables
.map { toFriendlyTableName(it) }
.filter { it.isNotBlank() }
tableUpdatesMutex.withLock {
accumulatedUpdates.addAll(dataTables)
accumulatedUpdates.addAll(tables)
}
}
// debounce ignores events inside the throttle. Debouncing needs to be done after accumulation
.debounce(DEFAULT_WATCH_THROTTLE_MS)
.collect { _ ->
tableUpdatesMutex.withLock {
Logger.i("Notifying Listeners of Crud Updates")
driver.notifyListeners(queryKeys = accumulatedUpdates.toTypedArray())
accumulatedUpdates.clear()
}
Expand Down Expand Up @@ -197,6 +252,7 @@ internal class InternalDatabaseImpl(
.map { toFriendlyTableName(it) }
.filter { it.isNotBlank() }
.toSet()
// Explore how this works internally
return watchQuery(
query = sql,
parameters = parameters?.size ?: 0,
Expand Down Expand Up @@ -272,7 +328,7 @@ internal class InternalDatabaseImpl(
}

// Register callback for table updates on a specific table
override fun updatesOnTable(tableName: String): Flow<Unit> = driver.updatesOnTable(tableName)
override fun updatesOnTable(tableName: String): Flow<Unit> = driver.updatesOnCrudTable(tableName)

private fun toFriendlyTableName(tableName: String): String {
val regex = POWERSYNC_TABLE_MATCH.toRegex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import kotlinx.coroutines.CoroutineScope
@OptIn(ExperimentalForeignApi::class)
public actual class DatabaseDriverFactory {
private var driver: PsSqlDriver? = null
private var readDriver: PsSqlDriver? = null

init {
init_powersync_sqlite_extension()
Expand Down Expand Up @@ -97,6 +98,7 @@ public actual class DatabaseDriverFactory {
),
),
)

return this.driver as PsSqlDriver
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlin.io.path.absolutePathString
@Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING", "SqlNoDataSourceInspection")
public actual class DatabaseDriverFactory {
private var driver: PsSqlDriver? = null
private var readDriver: PsSqlDriver? = null

private external fun setupSqliteBinding()

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ development=true
RELEASE_SIGNING_ENABLED=true
# Library config
GROUP=com.powersync
LIBRARY_VERSION=1.0.0-BETA25
LIBRARY_VERSION=1.0.0-BETA26
GITHUB_REPO=https://github.com/powersync-ja/powersync-kotlin.git
# POM
POM_URL=https://github.com/powersync-ja/powersync-kotlin/
Expand Down