Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.0.0-BETA32

* Added `onChange` method to the PowerSync client. This allows for observing table changes.

## 1.0.0-BETA31

* Added helpers for Attachment syncing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,25 @@ class DatabaseTest {
}
}

@Test
fun testTableChangesUpdates() =
databaseTest {
turbineScope {
val query = database.onChange(tables = setOf("users")).testIn(this)

database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("Test", "[email protected]"),
)

val changeSet = query.awaitItem()
changeSet.count() shouldBe 1
changeSet.contains("users") shouldBe true

query.cancel()
}
}

@Test
fun testClosingReadPool() =
databaseTest {
Expand Down Expand Up @@ -373,11 +392,20 @@ class DatabaseTest {
@Test
fun testCrudTransaction() =
databaseTest {
database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("a", "[email protected]"))
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("a", "[email protected]"),
)

database.writeTransaction {
it.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("b", "[email protected]"))
it.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("c", "[email protected]"))
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("b", "[email protected]"),
)
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("c", "[email protected]"),
)
}

var transaction = database.getNextCrudTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,17 @@ internal class PowerSyncDatabaseImpl(
return internalDb.getOptional(sql, parameters, mapper)
}

override fun onChange(
tables: Set<String>,
throttleMs: Long?,
): Flow<Set<String>> =
flow {
waitReady()
emitAll(
internalDb.onChange(tables, throttleMs),
)
}

override fun <RowType : Any> watch(
sql: String,
parameters: List<Any?>?,
Expand Down
115 changes: 83 additions & 32 deletions core/src/commonMain/kotlin/com/powersync/db/Queries.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ public fun interface ThrowableLockCallback<R> {

public interface Queries {
/**
* Execute a write query (INSERT, UPDATE, DELETE)
* Executes a write query (INSERT, UPDATE, DELETE).
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @return The number of rows affected by the query.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun execute(
Expand All @@ -27,9 +33,14 @@ public interface Queries {
): Long

/**
* Execute a read-only (SELECT) query and return a single result.
* If there is no result, throws an [IllegalArgumentException].
* See [getOptional] for queries where the result might be empty.
* Executes a read-only (SELECT) query and returns a single result.
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @param mapper A function to map the result set to the desired type.
* @return The single result of the query.
* @throws PowerSyncException If a database error occurs or no result is found.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <RowType : Any> get(
Expand All @@ -39,7 +50,14 @@ public interface Queries {
): RowType

/**
* Execute a read-only (SELECT) query and return the results.
* Executes a read-only (SELECT) query and returns all results.
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @param mapper A function to map the result set to the desired type.
* @return A list of results.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <RowType : Any> getAll(
Expand All @@ -49,7 +67,14 @@ public interface Queries {
): List<RowType>

/**
* Execute a read-only (SELECT) query and return a single optional result.
* Executes a read-only (SELECT) query and returns a single optional result.
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @param mapper A function to map the result set to the desired type.
* @return The single result of the query, or null if no result is found.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <RowType : Any> getOptional(
Expand All @@ -59,65 +84,91 @@ public interface Queries {
): RowType?

/**
* Execute a read-only (SELECT) query every time the source tables are modified and return the results as a List in [Flow].
* Returns a [Flow] that emits whenever the source tables are modified.
*
* @param tables The set of tables to monitor for changes.
* @param throttleMs The minimum interval, in milliseconds, between queries. Defaults to null.
* @return A [Flow] emitting the set of modified tables.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public fun onChange(
tables: Set<String>,
throttleMs: Long? = null,
): Flow<Set<String>>

/**
* Executes a read-only (SELECT) query every time the source tables are modified and returns the results as a [Flow] of lists.
*
* @param sql The SQL query to execute.
* @param parameters The parameters for the query, or an empty list if none.
* @param throttleMs The minimum interval, in milliseconds, between queries. Defaults to null.
* @param mapper A function to map the result set to the desired type.
* @return A [Flow] emitting lists of results.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public fun <RowType : Any> watch(
sql: String,
parameters: List<Any?>? = listOf(),
/**
* Specify the minimum interval, in milliseconds, between queries.
*/
throttleMs: Long? = null,
mapper: (SqlCursor) -> RowType,
): Flow<List<RowType>>

/**
* Takes a global lock, without starting a transaction.
*
* This takes a global lock - only one write transaction can execute against
* the database at a time. This applies even when constructing separate
* database instances for the same database file.
* Takes a global lock without starting a transaction.
*
* Locks for separate database instances on the same database file
* may be held concurrently.
* This lock ensures that only one write transaction can execute against the database at a time, even across separate database instances for the same file.
*
* In most cases, [writeTransaction] should be used instead.
*
* @param callback The callback to execute while holding the lock.
* @return The result of the callback.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <R> writeLock(callback: ThrowableLockCallback<R>): R

/**
* Open a read-write transaction.
* Opens a read-write transaction.
*
* This takes a global lock, ensuring that only one write transaction can execute against the database at a time, even across separate database instances for the same file.
*
* This takes a global lock - only one write transaction can execute against
* the database at a time. This applies even when constructing separate
* database instances for the same database file.
* Statements within the transaction must be done on the provided [PowerSyncTransaction] - attempting statements on the database instance will error cause a dead-lock.
*
* Statements within the transaction must be done on the provided
* [PowerSyncTransaction] - attempting statements on the database
* instance will error cause a dead-lock.
* @param callback The callback to execute within the transaction.
* @return The result of the callback.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <R> writeTransaction(callback: ThrowableTransactionCallback<R>): R

/**
* Takes a read lock, without starting a transaction.
* Takes a read lock without starting a transaction.
*
* The lock only applies to a single SQLite connection, and multiple
* connections may hold read locks at the same time.
* The lock applies only to a single SQLite connection, allowing multiple connections to hold read locks simultaneously.
*
* In most cases, [readTransaction] should be used instead.
* @param callback The callback to execute while holding the lock.
* @return The result of the callback.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <R> readLock(callback: ThrowableLockCallback<R>): R

/**
* Open a read-only transaction.
* Opens a read-only transaction.
*
* Statements within the transaction must be done on the provided [PowerSyncTransaction] - executing statements on the database level will be executed on separate connections.
*
* Statements within the transaction must be done on the provided
* [PowerSyncTransaction] - executing statements on the database level
* will be executed on separate connections.
* @param callback The callback to execute within the transaction.
* @return The result of the callback.
* @throws PowerSyncException If a database error occurs.
* @throws CancellationException If the operation is cancelled.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -105,6 +106,34 @@ internal class InternalDatabaseImpl(
mapper: (SqlCursor) -> RowType,
): RowType? = readLock { connection -> connection.getOptional(sql, parameters, mapper) }

override fun onChange(
tables: Set<String>,
throttleMs: Long?,
): Flow<Set<String>> =
channelFlow {
// Match all possible internal table combinations
val watchedTables =
tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet()

updatesOnTables()
.transform { updates ->
val intersection = updates.intersect(watchedTables)
if (intersection.isNotEmpty()) {
// Transform table names using friendlyTableName
emit(intersection.map { friendlyTableName(it) }.toSet())
}
}
// Throttling here is a feature which prevents watch queries from spamming updates.
// Throttling by design discards and delays events within the throttle window. Discarded events
// still trigger a trailing edge update.
// Backpressure is avoided on the throttling and consumer level by buffering the last upstream value.
.throttle(throttleMs?.milliseconds ?: DEFAULT_WATCH_THROTTLE)
.collect {
// Emit the transformed tables which have changed
send(it)
}
}

override fun <RowType : Any> watch(
sql: String,
parameters: List<Any?>?,
Expand Down Expand Up @@ -272,6 +301,18 @@ internal class InternalDatabaseImpl(
)
}

/**
* Converts internal table names (e.g., prefixed with "ps_data__" or "ps_data_local__")
* to their original friendly names by removing the prefixes. If no prefix matches,
* the original table name is returned.
*/
private fun friendlyTableName(table: String): String {
val re = Regex("^ps_data__(.+)$")
val re2 = Regex("^ps_data_local__(.+)$")
val match = re.matchEntire(table) ?: re2.matchEntire(table)
return match?.groupValues?.get(1) ?: table
}

internal fun getBindersFromParams(parameters: List<Any?>?): (SqlPreparedStatement.() -> Unit)? {
if (parameters.isNullOrEmpty()) {
return null
Expand Down