Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 1.4.0 (unreleased)

* Add `getNextCrudTransactionBatch` method to `PowerSyncDatabase` which allows for fetching a batch of CRUD operations to upload.
This is useful for uploading multiple transactions in a single batch.

## 1.3.0

* Support tables created outside of PowerSync with the `RawTable` API.
Expand Down Expand Up @@ -304,4 +309,4 @@ params = params
* Replaced default Logger with [Kermit Logger](https://kermit.touchlab.co/) which allows users to
more easily use and/or change Logger settings
* Add `retryDelay` and `crudThrottle` options when setting up database connection
* Changed `_viewNameOverride` to `viewNameOverride`
* Changed `_viewNameOverride` to `viewNameOverride`
227 changes: 227 additions & 0 deletions core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -459,4 +459,231 @@ class DatabaseTest {

database.getCrudBatch() shouldBe null
}

@Test
fun testCrudTransactionBatch() =
databaseTest {
// Create a single insert (transaction 1)
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("a", "[email protected]"),
)

// Create a transaction with 2 inserts (transaction 2)
database.writeTransaction {
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("b", "[email protected]"),
)
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("c", "[email protected]"),
)
}

// Create another single insert (transaction 3)
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("d", "[email protected]"),
)

// Create another transaction with 3 inserts (transaction 4)
database.writeTransaction {
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("e", "[email protected]"),
)
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("f", "[email protected]"),
)
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("g", "[email protected]"),
)
}

// Test with limit of 2 transactions
var batch = database.getNextCrudTransactionBatch(2) ?: error("Batch should not be null")
batch.hasMore shouldBe true
batch.crud shouldHaveSize 3 // 1 entry from transaction 1 + 2 entries from transaction 2
batch.complete(null)

// Test with limit of 1 transaction
batch = database.getNextCrudTransactionBatch(1) ?: error("Batch should not be null")
batch.hasMore shouldBe true
batch.crud shouldHaveSize 1 // 1 entry from transaction 3
batch.complete(null)

// Test with large limit that covers remaining transactions
batch = database.getNextCrudTransactionBatch(10) ?: error("Batch should not be null")
batch.hasMore shouldBe false
batch.crud shouldHaveSize 3 // 3 entries from transaction 4
batch.complete(null)

// Should be no more transactions
database.getNextCrudTransactionBatch() shouldBe null
}

@Test
fun testCrudTransactionBatchWithNullTxId() =
databaseTest {
// Create operations without transactions (NULL tx_id)
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("a", "[email protected]"),
)
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("b", "[email protected]"),
)
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("c", "[email protected]"),
)

// Each NULL tx_id operation should be treated as its own transaction
var batch = database.getNextCrudTransactionBatch(2) ?: error("Batch should not be null")
batch.hasMore shouldBe true
batch.crud shouldHaveSize 2 // 2 individual transactions
batch.complete(null)

// Get the remaining transaction
batch = database.getNextCrudTransactionBatch(10) ?: error("Batch should not be null")
batch.hasMore shouldBe false
batch.crud shouldHaveSize 1 // 1 remaining transaction
batch.complete(null)

database.getNextCrudTransactionBatch() shouldBe null
}

@Test
fun testCrudTransactionBatchLargeTransaction() =
databaseTest {
// Create a large transaction with many operations
database.writeTransaction {
repeat(10) { i ->
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("user$i", "[email protected]"),
)
}
}

// Add a single operation
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("single", "[email protected]"),
)

// Should get the entire large transaction (10 operations) in one batch
var batch = database.getNextCrudTransactionBatch(1) ?: error("Batch should not be null")
batch.hasMore shouldBe true
batch.crud shouldHaveSize 10
batch.complete(null)

// Should get the single operation
batch = database.getNextCrudTransactionBatch(1) ?: error("Batch should not be null")
batch.hasMore shouldBe false
batch.crud shouldHaveSize 1
batch.complete(null)

database.getNextCrudTransactionBatch() shouldBe null
}

@Test
fun testCrudTransactionBatchOrdering() =
databaseTest {
// Create operations in a specific order to test ordering
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("first", "[email protected]"),
)

database.writeTransaction {
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("second_a", "[email protected]"),
)
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("second_b", "[email protected]"),
)
}

database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("third", "[email protected]"),
)

// Operations should be processed in order
val batch = database.getNextCrudTransactionBatch(10) ?: error("Batch should not be null")
batch.hasMore shouldBe false
batch.crud shouldHaveSize 4

// Verify order by checking operation data
val operations = batch.crud
operations[0].opData!!["name"] shouldBe "first"
operations[1].opData!!["name"] shouldBe "second_a"
operations[2].opData!!["name"] shouldBe "second_b"
operations[3].opData!!["name"] shouldBe "third"

batch.complete(null)
database.getNextCrudTransactionBatch() shouldBe null
}

@Test
fun testCrudTransactionBatchEmptyDatabase() =
databaseTest {
val batch = database.getNextCrudTransactionBatch()
batch shouldBe null
}

@Test
fun testCrudTransactionBatchZeroLimit() =
databaseTest {
// Create some operations
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("a", "[email protected]"),
)

// Zero limit should return null even if operations exist
val batch = database.getNextCrudTransactionBatch(0)
batch shouldBe null
}

@Test
fun testCrudTransactionBatchShouldCountTransactionsNotOperations() =
databaseTest {
// Create a transaction with 3 operations
database.writeTransaction {
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("tx1_op1", "[email protected]"),
)
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("tx1_op2", "[email protected]"),
)
it.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("tx1_op3", "[email protected]"),
)
}

// Create a single operation (NULL tx_id)
database.execute(
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
listOf("single", "[email protected]"),
)

// Request limit of 2 transactions - should get all 4 operations (3 from tx + 1 single)
val batch = database.getNextCrudTransactionBatch(2) ?: error("Batch should not be null")
batch.hasMore shouldBe false
batch.crud shouldHaveSize 4
batch.complete(null)

database.getNextCrudTransactionBatch() shouldBe null
}
}
19 changes: 19 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,25 @@ public interface PowerSyncDatabase : Queries {
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun getNextCrudTransaction(): CrudTransaction?

/**
* Get a batch of crud data from multiple transactions to upload.
*
* Returns null if there is no data to upload.
*
* Use this from the [PowerSyncBackendConnector.uploadData]` callback.
*
* Once the data have been successfully uploaded, call [CrudBatch.complete] before
* requesting the next batch.
*
* Unlike [getCrudBatch], this groups data by transaction, allowing developers to
* upload multiple complete transactions in a single batch operation.
*
* @param transactionLimit The maximum number of transactions to include in the batch.
* Default is 5.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun getNextCrudTransactionBatch(transactionLimit: Int = 10): CrudBatch?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing we would likely want here is to have an optional limit on the amount of crud items - here it can essentially be unbounded if you have multiple huge transactions.

I'm not sure yet if we'll need both limits or if a limit on items is enough. Basically the behavior would be to return the maximum amount of transactions so that the total items in those transactions doesn't exceed the limit, but also return at least one transaction.


/**
* Convenience method to get the current version of PowerSync.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,62 @@ internal class PowerSyncDatabaseImpl(
}
}

override suspend fun getNextCrudTransactionBatch(transactionLimit: Int): CrudBatch? {
waitReady()
return internalDb.readTransaction { transaction ->
// Since tx_id can be null, we can't use a WHERE tx_id < ? with transactionLimit + first crud entry tx_id
// So we get all operations and group them by transaction or fall back to an individual transaction if tx_id is null
val allOperations =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would likely need an optimized query here that does the counting in SQL - fetching all items first could be too inefficient.

I know this isn't easy to get right in SQL. Some windowing magic might help, if you need suggestions we're also happy to help.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into it and get back to you - do we have a number of items and a measure of "good" performance ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have specific times in mind. We're just concerned about the case where we're requesting say batches with a limit of 100 total entries in a state where we have a crud backlog of 100,000 items, that still shouldn't take long.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a benchmark test

transaction.getAll(
"SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC",
) { cursor ->
CrudEntry.fromRow(
CrudRow(
id = cursor.getString("id"),
data = cursor.getString("data"),
txId = cursor.getLongOptional("tx_id")?.toInt(),
),
)
}

val result = mutableListOf<CrudEntry>()
val processedTransactions = mutableSetOf<Int>()
var transactionCount = 0

for (operation in allOperations) {
if (transactionCount >= transactionLimit) break

val txId = operation.transactionId
if (txId == null) {
// NULL tx_id operations are individual transactions
result.add(operation)
transactionCount++
} else if (txId !in processedTransactions) {
val transactionOperations = bucketStorage.getCrudItemsByTransactionId(txId, transaction)
result.addAll(transactionOperations)
processedTransactions.add(txId)
transactionCount++
}
}

if (result.isEmpty()) {
return@readTransaction null
}

val hasMore = result.size < allOperations.size
val last = result.last()

return@readTransaction CrudBatch(
crud = result,
hasMore = hasMore,
complete = { writeCheckpoint ->
logger.i { "[CrudTransactionBatch::complete] Completing batch with checkpoint $writeCheckpoint" }
handleWriteCheckpoint(last.clientId, writeCheckpoint)
},
)
}
}

override suspend fun getPowerSyncVersion(): String {
// The initialization sets powerSyncVersion.
waitReady()
Expand Down