Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ captures
Pods/
dialect/bin
.build
.vscode
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.0.0-BETA27

* Improved watch query internals. Added the ability to throttle watched queries.

## 1.0.0-BETA26

* Support bucket priorities and partial syncs.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package com.powersync

import androidx.test.platform.app.InstrumentationRegistry
import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.platform.app.InstrumentationRegistry
import app.cash.turbine.turbineScope
import com.powersync.db.schema.Schema
import com.powersync.testutils.UserRow
import kotlinx.coroutines.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.After

import org.junit.Test
import org.junit.runner.RunWith

import org.junit.Assert.*
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith

@RunWith(AndroidJUnit4::class)
class AndroidDatabaseTest {
Expand Down Expand Up @@ -91,4 +90,4 @@ class AndroidDatabaseTest {
query.cancel()
}
}
}
}
31 changes: 11 additions & 20 deletions core/src/commonMain/kotlin/com/powersync/PsSqlDriver.kt
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
package com.powersync

import app.cash.sqldelight.db.SqlDriver
import com.powersync.utils.AtomicMutableSet
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch

internal class PsSqlDriver(
private val driver: SqlDriver,
private val scope: CoroutineScope,
) : SqlDriver by driver {
// MutableSharedFlow to emit batched table updates
private val tableUpdatesFlow = MutableSharedFlow<List<String>>(replay = 0)
private val tableUpdatesFlow = MutableSharedFlow<Set<String>>(replay = 0)

// In-memory buffer to store table names before flushing
private val pendingUpdates = mutableSetOf<String>()
private val pendingUpdates = AtomicMutableSet<String>()

fun updateTable(tableName: String) {
pendingUpdates.add(tableName)
Expand All @@ -27,20 +25,13 @@ internal class PsSqlDriver(
pendingUpdates.clear()
}

// Flows on table updates
fun tableUpdates(): Flow<List<String>> = tableUpdatesFlow.asSharedFlow()
// Flows on any table change
// This specifically returns a SharedFlow for downstream timing considerations
fun updatesOnTables(): SharedFlow<Set<String>> =
tableUpdatesFlow
.asSharedFlow()

// Flows on table updates containing a specific table
fun updatesOnTable(tableName: String): Flow<Unit> = tableUpdates().filter { it.contains(tableName) }.map { }

fun fireTableUpdates() {
val updates = pendingUpdates.toList()
if (updates.isEmpty()) {
return
}
scope.launch {
tableUpdatesFlow.emit(updates)
}
pendingUpdates.clear()
suspend fun fireTableUpdates() {
tableUpdatesFlow.emit(pendingUpdates.toSetAndClear())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ internal class BucketStorageImpl(
private val db: InternalDatabase,
private val logger: Logger,
) : BucketStorage {
private val tableNames: MutableSet<String> = mutableSetOf()
private var hasCompletedSync = AtomicBoolean(false)
private var pendingBucketDeletes = AtomicBoolean(false)

Expand All @@ -32,18 +31,6 @@ internal class BucketStorageImpl(
const val COMPACT_OPERATION_INTERVAL = 1_000
}

init {
readTableNames()
}

private fun readTableNames() {
tableNames.clear()
// Query to get existing table names
val names = db.getExistingTableNames("ps_data_*")

tableNames.addAll(names)
}

override fun getMaxOpId(): String = MAX_OP_ID

override suspend fun getClientId(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import com.powersync.sync.SyncStatusData
import com.powersync.sync.SyncStream
import com.powersync.utils.JsonParam
import com.powersync.utils.JsonUtil
import com.powersync.utils.throttle
import com.powersync.utils.toJsonObject
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -141,9 +142,13 @@ internal class PowerSyncDatabaseImpl(

uploadJob =
scope.launch {
internalDb.updatesOnTable(InternalTable.CRUD.toString()).debounce(crudThrottleMs).collect {
syncStream!!.triggerCrudUpload()
}
internalDb
.updatesOnTables()
.filter { it.contains(InternalTable.CRUD.toString()) }
.throttle(crudThrottleMs)
.collect {
syncStream!!.triggerCrudUpload()
}
}
}

Expand Down Expand Up @@ -233,8 +238,9 @@ internal class PowerSyncDatabaseImpl(
override fun <RowType : Any> watch(
sql: String,
parameters: List<Any?>?,
throttleMs: Long?,
mapper: (SqlCursor) -> RowType,
): Flow<List<RowType>> = internalDb.watch(sql, parameters, mapper)
): Flow<List<RowType>> = internalDb.watch(sql, parameters, throttleMs, mapper)

override suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R = internalDb.writeTransaction(callback)

Expand Down Expand Up @@ -280,7 +286,11 @@ internal class PowerSyncDatabaseImpl(
syncStream = null
}

currentStatus.update(connected = false, connecting = false, lastSyncedAt = currentStatus.lastSyncedAt)
currentStatus.update(
connected = false,
connecting = false,
lastSyncedAt = currentStatus.lastSyncedAt,
)
}

override suspend fun disconnectAndClear(clearLocal: Boolean) {
Expand Down
4 changes: 4 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/db/Queries.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public interface Queries {
public fun <RowType : Any> watch(
sql: String,
parameters: List<Any?>? = listOf(),
/**
* Specify the minimum interval, in milliseconds, between queries.
*/
throttleMs: Long? = null,
mapper: (SqlCursor) -> RowType,
): Flow<List<RowType>>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import app.cash.sqldelight.db.Closeable
import com.persistence.PowersyncQueries
import com.powersync.db.Queries
import com.powersync.persistence.PsDatabase
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow

internal interface InternalDatabase :
Queries,
Closeable {
val transactor: PsDatabase
val queries: PowersyncQueries

fun getExistingTableNames(tableGlob: String): List<String>

fun updatesOnTable(tableName: String): Flow<Unit>
fun updatesOnTables(): SharedFlow<Set<String>>
}
Loading
Loading