Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ kotlin {
implementation(libs.test.coroutines)
implementation(libs.test.turbine)
implementation(libs.kermit.test)
implementation(libs.ktor.client.mock)
implementation(libs.test.turbine)
}

// We're putting the native libraries into our JAR, so integration tests for the JVM can run as part of the unit
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package com.powersync

import app.cash.turbine.turbineScope
import com.powersync.db.SqlCursor
import com.powersync.db.getString
import com.powersync.db.schema.Column
import com.powersync.db.schema.Schema
import com.powersync.db.schema.Table
import com.powersync.testutils.UserRow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlin.test.AfterTest
Expand All @@ -21,10 +18,7 @@ class DatabaseTest {
database =
PowerSyncDatabase(
factory = com.powersync.testutils.factory,
schema =
Schema(
Table(name = "users", columns = listOf(Column.text("name"), Column.text("email"))),
),
schema = Schema(UserRow.table),
dbFilename = "testdb",
)

Expand All @@ -49,7 +43,7 @@ class DatabaseTest {
fun testTableUpdates() =
runTest {
turbineScope {
val query = database.watch("SELECT * FROM users") { User.from(it) }.testIn(this)
val query = database.watch("SELECT * FROM users") { UserRow.from(it) }.testIn(this)

// Wait for initial query
assertEquals(0, query.awaitItem().size)
Expand Down Expand Up @@ -92,19 +86,4 @@ class DatabaseTest {
query.cancel()
}
}

private data class User(
val id: String,
val name: String,
val email: String,
) {
companion object {
fun from(cursor: SqlCursor): User =
User(
id = cursor.getString("id"),
name = cursor.getString("name"),
email = cursor.getString("email"),
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package com.powersync

import app.cash.turbine.turbineScope
import co.touchlab.kermit.Logger
import co.touchlab.kermit.Severity
import co.touchlab.kermit.TestConfig
import com.powersync.bucket.BucketChecksum
import com.powersync.bucket.BucketPriority
import com.powersync.bucket.Checkpoint
import com.powersync.bucket.OpType
import com.powersync.bucket.OplogEntry
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.db.PowerSyncDatabaseImpl
import com.powersync.db.schema.Schema
import com.powersync.sync.SyncLine
import com.powersync.sync.SyncStream
import com.powersync.testutils.MockSyncService
import com.powersync.testutils.UserRow
import com.powersync.testutils.cleanup
import com.powersync.testutils.waitFor
import com.powersync.utils.JsonUtil
import dev.mokkery.answering.returns
import dev.mokkery.everySuspend
import dev.mokkery.mock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.JsonObject
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds

@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class)
class SyncIntegrationTest {
private val logger =
Logger(
TestConfig(
minSeverity = Severity.Debug,
logWriterList = listOf(),
),
)
private lateinit var database: PowerSyncDatabaseImpl
private lateinit var connector: PowerSyncBackendConnector
private lateinit var syncLines: Channel<SyncLine>

@BeforeTest
fun setup() {
cleanup("testdb")
database = openDb()
connector =
mock<PowerSyncBackendConnector> {
everySuspend { getCredentialsCached() } returns
PowerSyncCredentials(
token = "test-token",
userId = "test-user",
endpoint = "https://test.com",
)

everySuspend { invalidateCredentials() } returns Unit
}
syncLines = Channel()

runBlocking {
database.disconnectAndClear(true)
}
}

@AfterTest
fun teardown() {
cleanup("testdb")
}

private fun openDb() =
PowerSyncDatabase(
factory = com.powersync.testutils.factory,
schema = Schema(UserRow.table),
dbFilename = "testdb",
) as PowerSyncDatabaseImpl

private fun CoroutineScope.syncStream(): SyncStream {
val client = MockSyncService.client(this, syncLines.receiveAsFlow())
return SyncStream(
bucketStorage = database.bucketStorage,
connector = connector,
httpEngine = client,
uploadCrud = { },
retryDelayMs = 10,
logger = logger,
params = JsonObject(emptyMap()),
)
}

private suspend fun expectUserCount(amount: Int) {
val users = database.getAll("SELECT * FROM users;") { UserRow.from(it) }
assertEquals(amount, users.size, "Expected $amount users, got $users")
}

@Test
fun testPartialSync() =
runTest {
val syncStream = syncStream()
database.connect(syncStream, 1000L)

val checksums =
buildList {
for (prio in 0..3) {
add(
BucketChecksum(
bucket = "bucket$prio",
priority = BucketPriority(prio),
checksum = 10 + prio,
),
)
}
}
var operationId = 1

suspend fun pushData(priority: Int) {
val id = operationId++

syncLines.send(
SyncLine.SyncDataBucket(
bucket = "bucket$priority",
data =
listOf(
OplogEntry(
checksum = (priority + 10).toLong(),
data =
JsonUtil.json.encodeToString(
mapOf(
"name" to "user $priority",
"email" to "[email protected]",
),
),
op = OpType.PUT,
opId = id.toString(),
rowId = "prio$priority",
rowType = "users",
),
),
after = null,
nextAfter = null,
),
)
}

turbineScope(timeout = 10.0.seconds) {
val turbine = syncStream.status.asFlow().testIn(this)
turbine.waitFor { it.connected }
expectUserCount(0)

syncLines.send(
SyncLine.FullCheckpoint(
Checkpoint(
lastOpId = "4",
checksums = checksums,
),
),
)

// Emit a partial sync complete for each priority but the last.
for (priorityNo in 0..<3) {
val priority = BucketPriority(priorityNo)
pushData(priorityNo)
syncLines.send(
SyncLine.CheckpointPartiallyComplete(
lastOpId = operationId.toString(),
priority = priority,
),
)

turbine.waitFor { it.priorityStatusFor(priority).hasSynced == true }
expectUserCount(priorityNo + 1)
}

// Then complete the sync
pushData(3)
syncLines.send(
SyncLine.CheckpointComplete(
lastOpId = operationId.toString(),
),
)
turbine.waitFor { it.hasSynced == true }
expectUserCount(4)

turbine.cancel()
}

syncLines.close()
}

@Test
fun testRemembersLastPartialSync() =
runTest {
val syncStream = syncStream()
database.connect(syncStream, 1000L)

syncLines.send(
SyncLine.FullCheckpoint(
Checkpoint(
lastOpId = "4",
checksums = listOf(BucketChecksum(bucket = "bkt", priority = BucketPriority(1), checksum = 0)),
),
),
)
syncLines.send(
SyncLine.CheckpointPartiallyComplete(
lastOpId = "0",
priority = BucketPriority(1),
),
)

database.waitForFirstSync(BucketPriority(1))
database.close()

// Connect to the same database again
database = openDb()
assertFalse { database.currentStatus.hasSynced == true }
assertTrue { database.currentStatus.priorityStatusFor(BucketPriority(1)).hasSynced == true }
database.close()
syncLines.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.powersync.testutils

import com.powersync.db.SqlCursor
import com.powersync.db.getString
import com.powersync.db.schema.Column
import com.powersync.db.schema.Table

data class UserRow(
val id: String,
val name: String,
val email: String,
) {
companion object {
fun from(cursor: SqlCursor): UserRow =
UserRow(
id = cursor.getString("id"),
name = cursor.getString("name"),
email = cursor.getString("email"),
)

val table = Table(name = "users", columns = listOf(Column.text("name"), Column.text("email")))
}
}
8 changes: 8 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.powersync

import com.powersync.bucket.BucketPriority
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.db.Queries
import com.powersync.db.crud.CrudBatch
Expand Down Expand Up @@ -29,6 +30,13 @@ public interface PowerSyncDatabase : Queries {
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun waitForFirstSync()

/**
* Suspend function that resolves when the first sync covering at least all buckets with the
* given [priority] (or a higher one, since those would be synchronized first) has completed.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun waitForFirstSync(priority: BucketPriority)

/**
* Connect to the PowerSync service, and keep the databases in sync.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import kotlinx.serialization.Serializable
@Serializable
internal data class BucketChecksum(
val bucket: String,
val priority: BucketPriority = BucketPriority.DEFAULT_PRIORITY,
val checksum: Int,
val count: Int? = null,
@SerialName("last_op_id") val lastOpId: String? = null,
Expand Down
26 changes: 26 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.powersync.bucket

import kotlinx.serialization.Serializable
import kotlin.jvm.JvmInline

@JvmInline
@Serializable
public value class BucketPriority(
private val priorityCode: Int,
) : Comparable<BucketPriority> {
init {
require(priorityCode >= 0)
}

override fun compareTo(other: BucketPriority): Int = other.priorityCode.compareTo(priorityCode)

public companion object {
internal val FULL_SYNC_PRIORITY: BucketPriority = BucketPriority(Int.MAX_VALUE)

/**
* The assumed priority for buckets when talking to older sync service instances that don't
* support bucket priorities.
*/
internal val DEFAULT_PRIORITY: BucketPriority = BucketPriority(3)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ internal interface BucketStorage {

suspend fun hasCompletedSync(): Boolean

suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult
suspend fun syncLocalDatabase(
targetCheckpoint: Checkpoint,
partialPriority: BucketPriority? = null,
): SyncLocalDatabaseResult

fun setTargetCheckpoint(checkpoint: Checkpoint)
}
Loading
Loading