Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 1.5.0 (unreleased)

* Add `PowerSyncDatabase.getCrudTransactions()`, returning a flow of transactions. This is useful
to upload multiple transactions in a batch.

## 1.4.0

* Added the ability to log PowerSync service HTTP request information via specifying a
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.powersync

import app.cash.turbine.test
import app.cash.turbine.turbineScope
import co.touchlab.kermit.ExperimentalKermitApi
import com.powersync.db.ActiveDatabaseGroup
import com.powersync.db.crud.CrudEntry
import com.powersync.db.crud.CrudTransaction
import com.powersync.db.schema.Schema
import com.powersync.testutils.UserRow
import com.powersync.testutils.databaseTest
Expand All @@ -17,6 +20,7 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlin.test.Test
Expand Down Expand Up @@ -427,6 +431,38 @@ class DatabaseTest {
database.getNextCrudTransaction() shouldBe null
}

@Test
fun testCrudTransactions() =
databaseTest {
suspend fun insertInTransaction(size: Int) {
database.writeTransaction { tx ->
repeat(size) {
tx.execute("INSERT INTO users (id, name, email) VALUES (uuid(), null, null)")
}
}
}

// Before inserting data, the flow should be empty
database.getCrudTransactions().test { awaitComplete() }

insertInTransaction(5)
insertInTransaction(10)
insertInTransaction(15)

val batch = mutableListOf<CrudEntry>()
var lastTx: CrudTransaction? = null
database.getCrudTransactions().takeWhile { batch.size < 10 }.collect {
batch.addAll(it.crud)
lastTx = it
}

batch shouldHaveSize 15
lastTx!!.complete(null)

val finalTx = database.getNextCrudTransaction()
finalTx!!.crud shouldHaveSize 15
}

@Test
fun testCrudBatch() =
databaseTest {
Expand Down
40 changes: 38 additions & 2 deletions core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import com.powersync.db.schema.Schema
import com.powersync.sync.SyncOptions
import com.powersync.sync.SyncStatus
import com.powersync.utils.JsonParam
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.firstOrNull
import kotlin.coroutines.cancellation.CancellationException

/**
Expand Down Expand Up @@ -123,7 +125,7 @@ public interface PowerSyncDatabase : Queries {
*
* Returns null if there is no data to upload.
*
* Use this from the [PowerSyncBackendConnector.uploadData]` callback.
* Use this from the [PowerSyncBackendConnector.uploadData] callback.
*
* Once the data have been successfully uploaded, call [CrudTransaction.complete] before
* requesting the next transaction.
Expand All @@ -132,7 +134,41 @@ public interface PowerSyncDatabase : Queries {
* All data for the transaction is loaded into memory.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun getNextCrudTransaction(): CrudTransaction?
public suspend fun getNextCrudTransaction(): CrudTransaction? = getCrudTransactions().firstOrNull()

/**
* Obtains a flow emitting completed transactions with local writes against the database.

* This is typically used from the [PowerSyncBackendConnector.uploadData] callback.
* Each entry emitted by the returned flow is a full transaction containing all local writes
* made while that transaction was active.
*
* Unlike [getNextCrudTransaction], which always returns the oldest transaction that hasn't
* been [CrudTransaction.complete]d yet, this flow can be used to collect multiple transactions.
* Calling [CrudTransaction.complete] will mark that and all prior transactions emitted by the
* flow as completed.
*
* This can be used to upload multiple transactions in a single batch, e.g with:
*
* ```Kotlin
* val batch = mutableListOf<CrudEntry>()
* var lastTx: CrudTransaction? = null
*
* database.getCrudTransactions().takeWhile { batch.size < 100 }.collect {
* batch.addAll(it.crud)
* lastTx = it
* }
*
* if (batch.isNotEmpty()) {
* uploadChanges(batch)
* lastTx!!.complete(null)
* }
* ````
*
* If there is no local data to upload, returns an empty flow.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun getCrudTransactions(): Flow<CrudTransaction>

/**
* Convenience method to get the current version of PowerSync.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.powersync.bucket

import com.powersync.db.SqlCursor
import com.powersync.db.crud.CrudEntry
import com.powersync.db.internal.PowerSyncTransaction
import com.powersync.db.schema.SerializableSchema
Expand All @@ -19,15 +20,12 @@ internal interface BucketStorage {

fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry?

fun getCrudItemsByTransactionId(
transactionId: Int,
transaction: PowerSyncTransaction,
): List<CrudEntry>

suspend fun hasCrud(): Boolean

fun hasCrud(transaction: PowerSyncTransaction): Boolean

fun mapCrudEntry(row: SqlCursor): CrudEntry

suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean

suspend fun hasCompletedSync(): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,21 @@ internal class BucketStorageImpl(
return id ?: throw IllegalStateException("Client ID not found")
}

override suspend fun nextCrudItem(): CrudEntry? = db.getOptional(sql = nextCrudQuery, mapper = crudEntryMapper)
override suspend fun nextCrudItem(): CrudEntry? = db.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry)

override fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry? =
transaction.getOptional(sql = nextCrudQuery, mapper = crudEntryMapper)

override fun getCrudItemsByTransactionId(
transactionId: Int,
transaction: PowerSyncTransaction,
): List<CrudEntry> =
transaction.getAll(
sql = transactionCrudQuery,
parameters = listOf(transactionId),
mapper = crudEntryMapper,
)
transaction.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry)

private val nextCrudQuery = "SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1"
private val transactionCrudQuery = "SELECT id, tx_id, data FROM ${InternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC"
private val crudEntryMapper: (SqlCursor) -> CrudEntry = { cursor ->

override fun mapCrudEntry(row: SqlCursor): CrudEntry =
CrudEntry.fromRow(
CrudRow(
id = cursor.getString(0)!!,
txId = cursor.getString(1)?.toInt(),
data = cursor.getString(2)!!,
id = row.getString(0)!!,
txId = row.getString(1)?.toInt(),
data = row.getString(2)!!,
),
)
}

override suspend fun hasCrud(): Boolean {
val res = db.getOptional(sql = hasCrudQuery, mapper = hasCrudMapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,34 +276,51 @@ internal class PowerSyncDatabaseImpl(
})
}

override suspend fun getNextCrudTransaction(): CrudTransaction? {
waitReady()
return internalDb.readTransaction { transaction ->
val entry =
bucketStorage.nextCrudItem(transaction)
?: return@readTransaction null

val txId = entry.transactionId
val entries: List<CrudEntry> =
if (txId == null) {
listOf(entry)
} else {
bucketStorage.getCrudItemsByTransactionId(
transactionId = txId,
transaction = transaction,
)
override suspend fun getCrudTransactions(): Flow<CrudTransaction> =
flow {
waitReady()
var lastItemId = -1

// Note: We try to avoid filtering on tx_id here because there's no index on that column.
// Starting at the first entry we want and then joining by rowid is more efficient. This is
// sound because there can't be concurrent write transactions, so transaction ids are
// increasing when we iterate over rowids.
val query =
"""
WITH RECURSIVE crud_entries AS (
SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?)
UNION ALL
SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud
INNER JOIN crud_entries ON crud_entries.id + 1 = rowid
WHERE crud_entries.tx_id = ps_crud.tx_id
)
SELECT * FROM crud_entries;
""".trimIndent()

while (true) {
val items = getAll(query, listOf(lastItemId), bucketStorage::mapCrudEntry)
if (items.isEmpty()) {
break
}

return@readTransaction CrudTransaction(
crud = entries,
transactionId = txId,
complete = { writeCheckpoint ->
logger.i { "[CrudTransaction::complete] Completing transaction with checkpoint $writeCheckpoint" }
handleWriteCheckpoint(entries.last().clientId, writeCheckpoint)
},
)
val txId = items[0].transactionId
val lastId = items.last().clientId

lastItemId = lastId
emit(
CrudTransaction(
crud = items,
transactionId = items[0].transactionId,
complete = { writeCheckpoint ->
logger.i {
"[CrudTransaction::complete] Completing transaction $txId (client ids until <=$lastId) with checkpoint $writeCheckpoint"
}
handleWriteCheckpoint(lastId, writeCheckpoint)
},
),
)
}
}
}

override suspend fun getPowerSyncVersion(): String {
// The initialization sets powerSyncVersion.
Expand Down
Loading