diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f8c327d..68edfbfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.5.0 (unreleased) + +* Add `PowerSyncDatabase.getCrudTransactions()`, returning a flow of transactions. This is useful + to upload multiple transactions in a batch. + ## 1.4.0 * Added the ability to log PowerSync service HTTP request information via specifying a diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 3e5f5f19..96e7ff73 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -1,8 +1,11 @@ package com.powersync +import app.cash.turbine.test import app.cash.turbine.turbineScope import co.touchlab.kermit.ExperimentalKermitApi import com.powersync.db.ActiveDatabaseGroup +import com.powersync.db.crud.CrudEntry +import com.powersync.db.crud.CrudTransaction import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow import com.powersync.testutils.databaseTest @@ -17,6 +20,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import kotlin.test.Test @@ -427,6 +431,38 @@ class DatabaseTest { database.getNextCrudTransaction() shouldBe null } + @Test + fun testCrudTransactions() = + databaseTest { + suspend fun insertInTransaction(size: Int) { + database.writeTransaction { tx -> + repeat(size) { + tx.execute("INSERT INTO users (id, name, email) VALUES (uuid(), null, null)") + } + } + } + + // Before inserting data, the flow should be empty + database.getCrudTransactions().test { awaitComplete() } + + insertInTransaction(5) + insertInTransaction(10) + insertInTransaction(15) + + val batch = mutableListOf() + var lastTx: CrudTransaction? = null + database.getCrudTransactions().takeWhile { batch.size < 10 }.collect { + batch.addAll(it.crud) + lastTx = it + } + + batch shouldHaveSize 15 + lastTx!!.complete(null) + + val finalTx = database.getNextCrudTransaction() + finalTx!!.crud shouldHaveSize 15 + } + @Test fun testCrudBatch() = databaseTest { diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index d587932d..daa60697 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -9,6 +9,8 @@ import com.powersync.db.schema.Schema import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatus import com.powersync.utils.JsonParam +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.firstOrNull import kotlin.coroutines.cancellation.CancellationException /** @@ -123,7 +125,7 @@ public interface PowerSyncDatabase : Queries { * * Returns null if there is no data to upload. * - * Use this from the [PowerSyncBackendConnector.uploadData]` callback. + * Use this from the [PowerSyncBackendConnector.uploadData] callback. * * Once the data have been successfully uploaded, call [CrudTransaction.complete] before * requesting the next transaction. @@ -132,7 +134,41 @@ public interface PowerSyncDatabase : Queries { * All data for the transaction is loaded into memory. */ @Throws(PowerSyncException::class, CancellationException::class) - public suspend fun getNextCrudTransaction(): CrudTransaction? + public suspend fun getNextCrudTransaction(): CrudTransaction? = getCrudTransactions().firstOrNull() + + /** + * Obtains a flow emitting completed transactions with local writes against the database. + + * This is typically used from the [PowerSyncBackendConnector.uploadData] callback. + * Each entry emitted by the returned flow is a full transaction containing all local writes + * made while that transaction was active. + * + * Unlike [getNextCrudTransaction], which always returns the oldest transaction that hasn't + * been [CrudTransaction.complete]d yet, this flow can be used to collect multiple transactions. + * Calling [CrudTransaction.complete] will mark that and all prior transactions emitted by the + * flow as completed. + * + * This can be used to upload multiple transactions in a single batch, e.g with: + * + * ```Kotlin + * val batch = mutableListOf() + * var lastTx: CrudTransaction? = null + * + * database.getCrudTransactions().takeWhile { batch.size < 100 }.collect { + * batch.addAll(it.crud) + * lastTx = it + * } + * + * if (batch.isNotEmpty()) { + * uploadChanges(batch) + * lastTx!!.complete(null) + * } + * ```` + * + * If there is no local data to upload, returns an empty flow. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun getCrudTransactions(): Flow /** * Convenience method to get the current version of PowerSync. diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 6d804cd2..0937cdac 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -1,5 +1,6 @@ package com.powersync.bucket +import com.powersync.db.SqlCursor import com.powersync.db.crud.CrudEntry import com.powersync.db.internal.PowerSyncTransaction import com.powersync.db.schema.SerializableSchema @@ -19,15 +20,12 @@ internal interface BucketStorage { fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry? - fun getCrudItemsByTransactionId( - transactionId: Int, - transaction: PowerSyncTransaction, - ): List - suspend fun hasCrud(): Boolean fun hasCrud(transaction: PowerSyncTransaction): Boolean + fun mapCrudEntry(row: SqlCursor): CrudEntry + suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean suspend fun hasCompletedSync(): Boolean diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 9be5c84c..00331c37 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -35,32 +35,21 @@ internal class BucketStorageImpl( return id ?: throw IllegalStateException("Client ID not found") } - override suspend fun nextCrudItem(): CrudEntry? = db.getOptional(sql = nextCrudQuery, mapper = crudEntryMapper) + override suspend fun nextCrudItem(): CrudEntry? = db.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry) override fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry? = - transaction.getOptional(sql = nextCrudQuery, mapper = crudEntryMapper) - - override fun getCrudItemsByTransactionId( - transactionId: Int, - transaction: PowerSyncTransaction, - ): List = - transaction.getAll( - sql = transactionCrudQuery, - parameters = listOf(transactionId), - mapper = crudEntryMapper, - ) + transaction.getOptional(sql = nextCrudQuery, mapper = ::mapCrudEntry) private val nextCrudQuery = "SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1" - private val transactionCrudQuery = "SELECT id, tx_id, data FROM ${InternalTable.CRUD} WHERE tx_id = ? ORDER BY id ASC" - private val crudEntryMapper: (SqlCursor) -> CrudEntry = { cursor -> + + override fun mapCrudEntry(row: SqlCursor): CrudEntry = CrudEntry.fromRow( CrudRow( - id = cursor.getString(0)!!, - txId = cursor.getString(1)?.toInt(), - data = cursor.getString(2)!!, + id = row.getString(0)!!, + txId = row.getString(1)?.toInt(), + data = row.getString(2)!!, ), ) - } override suspend fun hasCrud(): Boolean { val res = db.getOptional(sql = hasCrudQuery, mapper = hasCrudMapper) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 7e44ada0..2a6dfbab 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -276,34 +276,51 @@ internal class PowerSyncDatabaseImpl( }) } - override suspend fun getNextCrudTransaction(): CrudTransaction? { - waitReady() - return internalDb.readTransaction { transaction -> - val entry = - bucketStorage.nextCrudItem(transaction) - ?: return@readTransaction null - - val txId = entry.transactionId - val entries: List = - if (txId == null) { - listOf(entry) - } else { - bucketStorage.getCrudItemsByTransactionId( - transactionId = txId, - transaction = transaction, - ) + override suspend fun getCrudTransactions(): Flow = + flow { + waitReady() + var lastItemId = -1 + + // Note: We try to avoid filtering on tx_id here because there's no index on that column. + // Starting at the first entry we want and then joining by rowid is more efficient. This is + // sound because there can't be concurrent write transactions, so transaction ids are + // increasing when we iterate over rowids. + val query = + """ + WITH RECURSIVE crud_entries AS ( + SELECT id, tx_id, data FROM ps_crud WHERE id = (SELECT min(id) FROM ps_crud WHERE id > ?) + UNION ALL + SELECT ps_crud.id, ps_crud.tx_id, ps_crud.data FROM ps_crud + INNER JOIN crud_entries ON crud_entries.id + 1 = rowid + WHERE crud_entries.tx_id = ps_crud.tx_id + ) + SELECT * FROM crud_entries; + """.trimIndent() + + while (true) { + val items = getAll(query, listOf(lastItemId), bucketStorage::mapCrudEntry) + if (items.isEmpty()) { + break } - return@readTransaction CrudTransaction( - crud = entries, - transactionId = txId, - complete = { writeCheckpoint -> - logger.i { "[CrudTransaction::complete] Completing transaction with checkpoint $writeCheckpoint" } - handleWriteCheckpoint(entries.last().clientId, writeCheckpoint) - }, - ) + val txId = items[0].transactionId + val lastId = items.last().clientId + + lastItemId = lastId + emit( + CrudTransaction( + crud = items, + transactionId = items[0].transactionId, + complete = { writeCheckpoint -> + logger.i { + "[CrudTransaction::complete] Completing transaction $txId (client ids until <=$lastId) with checkpoint $writeCheckpoint" + } + handleWriteCheckpoint(lastId, writeCheckpoint) + }, + ), + ) + } } - } override suspend fun getPowerSyncVersion(): String { // The initialization sets powerSyncVersion.