diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 4e6fe394..413e3749 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -235,6 +235,8 @@ kotlin { implementation(libs.test.coroutines) implementation(libs.test.turbine) implementation(libs.kermit.test) + implementation(libs.ktor.client.mock) + implementation(libs.test.turbine) } // We're putting the native libraries into our JAR, so integration tests for the JVM can run as part of the unit diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 33435263..3472c880 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -1,11 +1,8 @@ package com.powersync import app.cash.turbine.turbineScope -import com.powersync.db.SqlCursor -import com.powersync.db.getString -import com.powersync.db.schema.Column import com.powersync.db.schema.Schema -import com.powersync.db.schema.Table +import com.powersync.testutils.UserRow import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlin.test.AfterTest @@ -21,10 +18,7 @@ class DatabaseTest { database = PowerSyncDatabase( factory = com.powersync.testutils.factory, - schema = - Schema( - Table(name = "users", columns = listOf(Column.text("name"), Column.text("email"))), - ), + schema = Schema(UserRow.table), dbFilename = "testdb", ) @@ -49,7 +43,7 @@ class DatabaseTest { fun testTableUpdates() = runTest { turbineScope { - val query = database.watch("SELECT * FROM users") { User.from(it) }.testIn(this) + val query = database.watch("SELECT * FROM users") { UserRow.from(it) }.testIn(this) // Wait for initial query assertEquals(0, query.awaitItem().size) @@ -92,19 +86,4 @@ class DatabaseTest { query.cancel() } } - - private data class User( - val id: String, - val name: String, - val email: String, - ) { - companion object { - fun from(cursor: SqlCursor): User = - User( - id = cursor.getString("id"), - name = cursor.getString("name"), - email = cursor.getString("email"), - ) - } - } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt new file mode 100644 index 00000000..02ebe334 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -0,0 +1,231 @@ +package com.powersync + +import app.cash.turbine.turbineScope +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.TestConfig +import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.BucketPriority +import com.powersync.bucket.Checkpoint +import com.powersync.bucket.OpType +import com.powersync.bucket.OplogEntry +import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.connectors.PowerSyncCredentials +import com.powersync.db.PowerSyncDatabaseImpl +import com.powersync.db.schema.Schema +import com.powersync.sync.SyncLine +import com.powersync.sync.SyncStream +import com.powersync.testutils.MockSyncService +import com.powersync.testutils.UserRow +import com.powersync.testutils.cleanup +import com.powersync.testutils.waitFor +import com.powersync.utils.JsonUtil +import dev.mokkery.answering.returns +import dev.mokkery.everySuspend +import dev.mokkery.mock +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.receiveAsFlow +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.JsonObject +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds + +@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class) +class SyncIntegrationTest { + private val logger = + Logger( + TestConfig( + minSeverity = Severity.Debug, + logWriterList = listOf(), + ), + ) + private lateinit var database: PowerSyncDatabaseImpl + private lateinit var connector: PowerSyncBackendConnector + private lateinit var syncLines: Channel + + @BeforeTest + fun setup() { + cleanup("testdb") + database = openDb() + connector = + mock { + everySuspend { getCredentialsCached() } returns + PowerSyncCredentials( + token = "test-token", + userId = "test-user", + endpoint = "https://test.com", + ) + + everySuspend { invalidateCredentials() } returns Unit + } + syncLines = Channel() + + runBlocking { + database.disconnectAndClear(true) + } + } + + @AfterTest + fun teardown() { + cleanup("testdb") + } + + private fun openDb() = + PowerSyncDatabase( + factory = com.powersync.testutils.factory, + schema = Schema(UserRow.table), + dbFilename = "testdb", + ) as PowerSyncDatabaseImpl + + private fun CoroutineScope.syncStream(): SyncStream { + val client = MockSyncService.client(this, syncLines.receiveAsFlow()) + return SyncStream( + bucketStorage = database.bucketStorage, + connector = connector, + httpEngine = client, + uploadCrud = { }, + retryDelayMs = 10, + logger = logger, + params = JsonObject(emptyMap()), + ) + } + + private suspend fun expectUserCount(amount: Int) { + val users = database.getAll("SELECT * FROM users;") { UserRow.from(it) } + assertEquals(amount, users.size, "Expected $amount users, got $users") + } + + @Test + fun testPartialSync() = + runTest { + val syncStream = syncStream() + database.connect(syncStream, 1000L) + + val checksums = + buildList { + for (prio in 0..3) { + add( + BucketChecksum( + bucket = "bucket$prio", + priority = BucketPriority(prio), + checksum = 10 + prio, + ), + ) + } + } + var operationId = 1 + + suspend fun pushData(priority: Int) { + val id = operationId++ + + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "bucket$priority", + data = + listOf( + OplogEntry( + checksum = (priority + 10).toLong(), + data = + JsonUtil.json.encodeToString( + mapOf( + "name" to "user $priority", + "email" to "$priority@example.org", + ), + ), + op = OpType.PUT, + opId = id.toString(), + rowId = "prio$priority", + rowType = "users", + ), + ), + after = null, + nextAfter = null, + ), + ) + } + + turbineScope(timeout = 10.0.seconds) { + val turbine = syncStream.status.asFlow().testIn(this) + turbine.waitFor { it.connected } + expectUserCount(0) + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "4", + checksums = checksums, + ), + ), + ) + + // Emit a partial sync complete for each priority but the last. + for (priorityNo in 0..<3) { + val priority = BucketPriority(priorityNo) + pushData(priorityNo) + syncLines.send( + SyncLine.CheckpointPartiallyComplete( + lastOpId = operationId.toString(), + priority = priority, + ), + ) + + turbine.waitFor { it.priorityStatusFor(priority).hasSynced == true } + expectUserCount(priorityNo + 1) + } + + // Then complete the sync + pushData(3) + syncLines.send( + SyncLine.CheckpointComplete( + lastOpId = operationId.toString(), + ), + ) + turbine.waitFor { it.hasSynced == true } + expectUserCount(4) + + turbine.cancel() + } + + syncLines.close() + } + + @Test + fun testRemembersLastPartialSync() = + runTest { + val syncStream = syncStream() + database.connect(syncStream, 1000L) + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "4", + checksums = listOf(BucketChecksum(bucket = "bkt", priority = BucketPriority(1), checksum = 0)), + ), + ), + ) + syncLines.send( + SyncLine.CheckpointPartiallyComplete( + lastOpId = "0", + priority = BucketPriority(1), + ), + ) + + database.waitForFirstSync(BucketPriority(1)) + database.close() + + // Connect to the same database again + database = openDb() + assertFalse { database.currentStatus.hasSynced == true } + assertTrue { database.currentStatus.priorityStatusFor(BucketPriority(1)).hasSynced == true } + database.close() + syncLines.close() + } +} diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/UserRow.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/UserRow.kt new file mode 100644 index 00000000..caf65765 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/UserRow.kt @@ -0,0 +1,23 @@ +package com.powersync.testutils + +import com.powersync.db.SqlCursor +import com.powersync.db.getString +import com.powersync.db.schema.Column +import com.powersync.db.schema.Table + +data class UserRow( + val id: String, + val name: String, + val email: String, +) { + companion object { + fun from(cursor: SqlCursor): UserRow = + UserRow( + id = cursor.getString("id"), + name = cursor.getString("name"), + email = cursor.getString("email"), + ) + + val table = Table(name = "users", columns = listOf(Column.text("name"), Column.text("email"))) + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index 60fa5575..9e0a9564 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -1,5 +1,6 @@ package com.powersync +import com.powersync.bucket.BucketPriority import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.Queries import com.powersync.db.crud.CrudBatch @@ -29,6 +30,13 @@ public interface PowerSyncDatabase : Queries { @Throws(PowerSyncException::class, CancellationException::class) public suspend fun waitForFirstSync() + /** + * Suspend function that resolves when the first sync covering at least all buckets with the + * given [priority] (or a higher one, since those would be synchronized first) has completed. + */ + @Throws(PowerSyncException::class, CancellationException::class) + public suspend fun waitForFirstSync(priority: BucketPriority) + /** * Connect to the PowerSync service, and keep the databases in sync. * diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt index 8c5544c4..335b4429 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt @@ -6,6 +6,7 @@ import kotlinx.serialization.Serializable @Serializable internal data class BucketChecksum( val bucket: String, + val priority: BucketPriority = BucketPriority.DEFAULT_PRIORITY, val checksum: Int, val count: Int? = null, @SerialName("last_op_id") val lastOpId: String? = null, diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt new file mode 100644 index 00000000..60073707 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt @@ -0,0 +1,26 @@ +package com.powersync.bucket + +import kotlinx.serialization.Serializable +import kotlin.jvm.JvmInline + +@JvmInline +@Serializable +public value class BucketPriority( + private val priorityCode: Int, +) : Comparable { + init { + require(priorityCode >= 0) + } + + override fun compareTo(other: BucketPriority): Int = other.priorityCode.compareTo(priorityCode) + + public companion object { + internal val FULL_SYNC_PRIORITY: BucketPriority = BucketPriority(Int.MAX_VALUE) + + /** + * The assumed priority for buckets when talking to older sync service instances that don't + * support bucket priorities. + */ + internal val DEFAULT_PRIORITY: BucketPriority = BucketPriority(3) + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 5478d0da..94ca52df 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -28,7 +28,10 @@ internal interface BucketStorage { suspend fun hasCompletedSync(): Boolean - suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult + suspend fun syncLocalDatabase( + targetCheckpoint: Checkpoint, + partialPriority: BucketPriority? = null, + ): SyncLocalDatabaseResult fun setTargetCheckpoint(checkpoint: Checkpoint) } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 11b21524..ec61a209 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -11,6 +11,7 @@ import com.powersync.db.internal.PowerSyncTransaction import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult import com.powersync.utils.JsonUtil +import kotlinx.serialization.Serializable import kotlinx.serialization.encodeToString internal class BucketStorageImpl( @@ -193,8 +194,11 @@ internal class BucketStorageImpl( } } - override suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult { - val result = validateChecksums(targetCheckpoint) + override suspend fun syncLocalDatabase( + targetCheckpoint: Checkpoint, + partialPriority: BucketPriority?, + ): SyncLocalDatabaseResult { + val result = validateChecksums(targetCheckpoint, partialPriority) if (!result.checkpointValid) { logger.w { "[SyncLocalDatabase] Checksums failed for ${result.checkpointFailures}" } @@ -205,7 +209,15 @@ internal class BucketStorageImpl( return result } - val bucketNames = targetCheckpoint.checksums.map { it.bucket } + val bucketNames = + targetCheckpoint.checksums + .let { + if (partialPriority == null) { + it + } else { + it.filter { cs -> cs.priority >= partialPriority } + } + }.map { it.bucket } db.writeTransaction { tx -> tx.execute( @@ -213,7 +225,7 @@ internal class BucketStorageImpl( listOf(targetCheckpoint.lastOpId, JsonUtil.json.encodeToString(bucketNames)), ) - if (targetCheckpoint.writeCheckpoint != null) { + if (partialPriority == null && targetCheckpoint.writeCheckpoint != null) { tx.execute( "UPDATE ps_buckets SET last_op = ? WHERE name = '\$local'", listOf(targetCheckpoint.writeCheckpoint), @@ -221,7 +233,7 @@ internal class BucketStorageImpl( } } - val valid = updateObjectsFromBuckets() + val valid = updateObjectsFromBuckets(targetCheckpoint, partialPriority) if (!valid) { return SyncLocalDatabaseResult( @@ -237,11 +249,23 @@ internal class BucketStorageImpl( ) } - private suspend fun validateChecksums(checkpoint: Checkpoint): SyncLocalDatabaseResult { + private suspend fun validateChecksums( + checkpoint: Checkpoint, + priority: BucketPriority? = null, + ): SyncLocalDatabaseResult { + val serializedCheckpoint = + JsonUtil.json.encodeToString( + when (priority) { + null -> checkpoint + // Only validate buckets with a priority included in this partial sync. + else -> checkpoint.copy(checksums = checkpoint.checksums.filter { it.priority >= priority }) + }, + ) + val res = db.getOptional( "SELECT powersync_validate_checkpoint(?) AS result", - parameters = listOf(JsonUtil.json.encodeToString(checkpoint)), + parameters = listOf(serializedCheckpoint), mapper = { cursor -> cursor.getString(0)!! }, @@ -260,12 +284,33 @@ internal class BucketStorageImpl( * * This includes creating new tables, dropping old tables, and copying data over from the oplog. */ - private suspend fun updateObjectsFromBuckets(): Boolean { + private suspend fun updateObjectsFromBuckets( + checkpoint: Checkpoint, + priority: BucketPriority? = null, + ): Boolean { + @Serializable + data class SyncLocalArgs( + val priority: BucketPriority, + val buckets: List, + ) + + val args = + if (priority != null) { + JsonUtil.json.encodeToString( + SyncLocalArgs( + priority = priority, + buckets = checkpoint.checksums.filter { it.priority >= priority }.map { it.bucket }, + ), + ) + } else { + "" + } + return db.writeTransaction { tx -> tx.execute( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)", - listOf("sync_local", ""), + listOf("sync_local", args), ) val res = diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index b7a7d5dc..492e6b17 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -4,6 +4,7 @@ import co.touchlab.kermit.Logger import com.powersync.DatabaseDriverFactory import com.powersync.PowerSyncDatabase import com.powersync.PsSqlDriver +import com.powersync.bucket.BucketPriority import com.powersync.bucket.BucketStorage import com.powersync.bucket.BucketStorageImpl import com.powersync.connectors.PowerSyncBackendConnector @@ -15,7 +16,9 @@ import com.powersync.db.internal.InternalDatabaseImpl import com.powersync.db.internal.InternalTable import com.powersync.db.internal.ThrowableTransactionCallback import com.powersync.db.schema.Schema +import com.powersync.sync.PriorityStatusEntry import com.powersync.sync.SyncStatus +import com.powersync.sync.SyncStatusData import com.powersync.sync.SyncStream import com.powersync.utils.JsonParam import com.powersync.utils.JsonUtil @@ -30,6 +33,8 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.datetime.Instant +import kotlinx.datetime.TimeZone +import kotlinx.datetime.toInstant import kotlinx.datetime.toLocalDateTime import kotlinx.serialization.encodeToString @@ -51,7 +56,7 @@ internal class PowerSyncDatabaseImpl( driver: PsSqlDriver = factory.createDriver(scope, dbFilename), ) : PowerSyncDatabase { private val internalDb = InternalDatabaseImpl(driver, scope) - private val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) + internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) /** * The current sync status. @@ -84,7 +89,6 @@ internal class PowerSyncDatabaseImpl( } } - @OptIn(FlowPreview::class) override suspend fun connect( connector: PowerSyncBackendConnector, crudThrottleMs: Long, @@ -94,7 +98,7 @@ internal class PowerSyncDatabaseImpl( // close connection if one is open disconnect() - this.syncStream = + connect( SyncStream( bucketStorage = bucketStorage, connector = connector, @@ -102,8 +106,17 @@ internal class PowerSyncDatabaseImpl( retryDelayMs = retryDelayMs, logger = logger, params = params.toJsonObject(), - ) + ), + crudThrottleMs, + ) + } + @OptIn(FlowPreview::class) + internal fun connect( + stream: SyncStream, + crudThrottleMs: Long, + ) { + this.syncStream = stream syncJob = scope.launch { syncStream!!.streamingSync() @@ -121,6 +134,7 @@ internal class PowerSyncDatabaseImpl( downloadError = it.downloadError, clearDownloadError = it.downloadError == null, clearUploadError = it.uploadError == null, + priorityStatusEntries = it.priorityStatusEntries, ) } } @@ -280,34 +294,62 @@ internal class PowerSyncDatabaseImpl( private suspend fun updateHasSynced() { data class SyncedAt( - val syncedAt: String?, + val priority: BucketPriority, + val syncedAt: Instant?, ) + // Query the database to see if any data has been synced - val timestamp = - internalDb - .getOptional("SELECT powersync_last_synced_at() as synced_at", null) { cursor -> - SyncedAt(syncedAt = cursor.getStringOptional("synced_at")) - }?.syncedAt - if (timestamp != null) { - val hasSynced = true - if (currentStatus.hasSynced != null && hasSynced != currentStatus.hasSynced) { - val formattedDateTime = "${timestamp.replace(" ", "T").toLocalDateTime()}Z" - val lastSyncedAt = Instant.parse(formattedDateTime) - currentStatus.update(hasSynced = hasSynced, lastSyncedAt = lastSyncedAt) + val syncedAtRows = + internalDb.getAll("SELECT * FROM ps_sync_state ORDER BY priority") { + val rawTime = it.getString(1)!! + + SyncedAt( + priority = BucketPriority(it.getLong(0)!!.toInt()), + syncedAt = rawTime.replace(" ", "T").toLocalDateTime().toInstant(TimeZone.UTC), + ) + } + + val priorityStatus = mutableListOf() + var lastSyncedAt: Instant? = null + + for (row in syncedAtRows) { + if (row.priority == BucketPriority.FULL_SYNC_PRIORITY) { + lastSyncedAt = row.syncedAt + } else { + priorityStatus.add( + PriorityStatusEntry( + priority = row.priority, + lastSyncedAt = row.syncedAt, + hasSynced = true, + ), + ) } - } else { - currentStatus.update(hasSynced = false) } + + currentStatus.update( + hasSynced = lastSyncedAt != null, + lastSyncedAt = lastSyncedAt, + priorityStatusEntries = priorityStatus, + ) } - override suspend fun waitForFirstSync() { - if (currentStatus.hasSynced == true) { + override suspend fun waitForFirstSync() = waitForFirstSyncImpl(null) + + override suspend fun waitForFirstSync(priority: BucketPriority) = waitForFirstSyncImpl(priority) + + private suspend fun waitForFirstSyncImpl(priority: BucketPriority?) { + val predicate: (SyncStatusData) -> Boolean = + if (priority == null) { + { it.hasSynced == true } + } else { + { it.priorityStatusFor(priority).hasSynced == true } + } + + if (predicate(currentStatus)) { return } - currentStatus.asFlow().first { status -> - status.hasSynced == true - } + currentStatus.asFlow().first(predicate) } override suspend fun close() { diff --git a/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncCheckpointDiff.kt b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncCheckpointDiff.kt deleted file mode 100644 index 94d26fba..00000000 --- a/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncCheckpointDiff.kt +++ /dev/null @@ -1,13 +0,0 @@ -package com.powersync.sync - -import com.powersync.bucket.BucketChecksum -import kotlinx.serialization.SerialName -import kotlinx.serialization.Serializable - -@Serializable -internal data class StreamingSyncCheckpointDiff( - @SerialName("last_op_id") val lastOpId: String, - @SerialName("updated_buckets") val updatedBuckets: List, - @SerialName("removed_buckets") val removedBuckets: List, - @SerialName("write_checkpoint") val writeCheckpoint: String? = null, -) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBatch.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBatch.kt index 5aa87dfc..65efdb43 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBatch.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBatch.kt @@ -4,5 +4,5 @@ import kotlinx.serialization.Serializable @Serializable internal data class SyncDataBatch( - val buckets: List, + val buckets: List, ) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBucket.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBucket.kt deleted file mode 100644 index b5fd63e6..00000000 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncDataBucket.kt +++ /dev/null @@ -1,14 +0,0 @@ -package com.powersync.sync - -import com.powersync.bucket.OplogEntry -import kotlinx.serialization.SerialName -import kotlinx.serialization.Serializable - -@Serializable -internal data class SyncDataBucket( - val bucket: String, - val data: List, - @SerialName("has_more") val hasMore: Boolean = false, - val after: String?, - @SerialName("next_after")val nextAfter: String?, -) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt new file mode 100644 index 00000000..242542cd --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt @@ -0,0 +1,115 @@ +package com.powersync.sync + +import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.BucketPriority +import com.powersync.bucket.Checkpoint +import com.powersync.bucket.OplogEntry +import kotlinx.serialization.KSerializer +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.descriptors.element +import kotlinx.serialization.encoding.CompositeDecoder +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.encoding.decodeStructure +import kotlinx.serialization.encoding.encodeStructure +import kotlinx.serialization.serializer + +@Serializable(with = SyncLineSerializer::class) +internal sealed interface SyncLine { + data class FullCheckpoint( + val checkpoint: Checkpoint, + ) : SyncLine + + @Serializable + data class CheckpointDiff( + @SerialName("last_op_id") val lastOpId: String, + @SerialName("updated_buckets") val updatedBuckets: List, + @SerialName("removed_buckets") val removedBuckets: List, + @SerialName("write_checkpoint") val writeCheckpoint: String? = null, + ) : SyncLine + + @Serializable + data class CheckpointComplete( + @SerialName("last_op_id") val lastOpId: String, + ) : SyncLine + + @Serializable + data class CheckpointPartiallyComplete( + @SerialName("last_op_id") val lastOpId: String, + @SerialName("priority") val priority: BucketPriority, + ) : SyncLine + + @Serializable + data class SyncDataBucket( + val bucket: String, + val data: List, + @SerialName("has_more") val hasMore: Boolean = false, + val after: String?, + @SerialName("next_after")val nextAfter: String?, + ) : SyncLine + + data class KeepAlive( + val tokenExpiresIn: Int, + ) : SyncLine + + data object UnknownSyncLine : SyncLine +} + +private class SyncLineSerializer : KSerializer { + private val checkpoint = serializer() + private val checkpointDiff = serializer() + private val checkpointComplete = serializer() + private val checkpointPartiallyComplete = serializer() + private val data = serializer() + + override val descriptor = + buildClassSerialDescriptor(SyncLine::class.qualifiedName!!) { + element("checkpoint", checkpoint.descriptor, isOptional = true) + element("checkpoint_diff", checkpointDiff.descriptor, isOptional = true) + element("checkpoint_complete", checkpointComplete.descriptor, isOptional = true) + element("partial_checkpoint_complete", checkpointPartiallyComplete.descriptor, isOptional = true) + element("data", data.descriptor, isOptional = true) + element("token_expires_in", isOptional = true) + } + + override fun deserialize(decoder: Decoder): SyncLine = + decoder.decodeStructure(descriptor) { + val value = + when (val index = decodeElementIndex(descriptor)) { + 0 -> SyncLine.FullCheckpoint(decodeSerializableElement(descriptor, 0, checkpoint)) + 1 -> decodeSerializableElement(descriptor, 1, checkpointDiff) + 2 -> decodeSerializableElement(descriptor, 2, checkpointComplete) + 3 -> decodeSerializableElement(descriptor, 3, checkpointPartiallyComplete) + 4 -> decodeSerializableElement(descriptor, 4, data) + 5 -> SyncLine.KeepAlive(decodeIntElement(descriptor, 5)) + CompositeDecoder.UNKNOWN_NAME, CompositeDecoder.DECODE_DONE -> SyncLine.UnknownSyncLine + else -> error("Unexpected index: $index") + } + + if (decodeElementIndex(descriptor) != CompositeDecoder.DECODE_DONE) { + // Sync lines are single-key objects, make sure there isn't another one. + SyncLine.UnknownSyncLine + } else { + value + } + } + + override fun serialize( + encoder: Encoder, + value: SyncLine, + ) { + encoder.encodeStructure(descriptor) { + when (value) { + is SyncLine.FullCheckpoint -> encodeSerializableElement(descriptor, 0, checkpoint, value.checkpoint) + is SyncLine.CheckpointDiff -> encodeSerializableElement(descriptor, 1, checkpointDiff, value) + is SyncLine.CheckpointComplete -> encodeSerializableElement(descriptor, 2, checkpointComplete, value) + is SyncLine.CheckpointPartiallyComplete -> encodeSerializableElement(descriptor, 3, checkpointPartiallyComplete, value) + is SyncLine.SyncDataBucket -> encodeSerializableElement(descriptor, 4, data, value) + is SyncLine.KeepAlive -> encodeIntElement(descriptor, 5, value.tokenExpiresIn) + is SyncLine.UnknownSyncLine -> throw UnsupportedOperationException("Can't serialize unknown sync line") + } + } + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 698bbbab..2e4b84e6 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -1,11 +1,18 @@ package com.powersync.sync +import com.powersync.bucket.BucketPriority import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow import kotlinx.datetime.Instant +public data class PriorityStatusEntry internal constructor( + val priority: BucketPriority, + val lastSyncedAt: Instant?, + val hasSynced: Boolean?, +) + public interface SyncStatusData { /** * true if currently connected. @@ -65,6 +72,33 @@ public interface SyncStatusData { * Convenience getter for either the value of downloadError or uploadError */ public val anyError: Any? + + /** + * Available [PriorityStatusEntry] reporting the sync status for buckets within priorities. + * + * When buckets with different priorities are defined, this may contain entries before [hasSynced] + * and [lastSyncedAt] are set to indicate that a partial (but no complete) sync has completed. + * A completed [PriorityStatusEntry] at one priority level always includes all higher priorities too. + */ + public val priorityStatusEntries: List + + /** + * Status information for whether buckets in [priority] have been synchronized. + */ + public fun priorityStatusFor(priority: BucketPriority): PriorityStatusEntry { + val byDescendingPriorities = priorityStatusEntries.sortedByDescending { it.priority } + + for (entry in byDescendingPriorities) { + // Lower-priority buckets are synchronized after higher-priority buckets, so we look for the first + // entry that doesn't have a higher priority. + if (entry.priority <= priority) { + return entry + } + } + + // A complete sync necessarily includes all priorities. + return PriorityStatusEntry(priority, lastSyncedAt, hasSynced) + } } internal data class SyncStatusDataContainer( @@ -76,6 +110,7 @@ internal data class SyncStatusDataContainer( override val hasSynced: Boolean? = null, override val uploadError: Any? = null, override val downloadError: Any? = null, + override val priorityStatusEntries: List = emptyList(), ) : SyncStatusData { override val anyError get() = downloadError ?: uploadError @@ -106,6 +141,7 @@ public data class SyncStatus internal constructor( downloadError: Any? = null, clearUploadError: Boolean = false, clearDownloadError: Boolean = false, + priorityStatusEntries: List? = null, ) { data = data.copy( @@ -115,6 +151,7 @@ public data class SyncStatus internal constructor( uploading = uploading ?: data.uploading, lastSyncedAt = lastSyncedAt ?: data.lastSyncedAt, hasSynced = hasSynced ?: data.hasSynced, + priorityStatusEntries = priorityStatusEntries ?: data.priorityStatusEntries, uploadError = if (clearUploadError) null else uploadError, downloadError = if (clearDownloadError) null else downloadError, ) @@ -148,6 +185,9 @@ public data class SyncStatus internal constructor( override val downloadError: Any? get() = data.downloadError + override val priorityStatusEntries: List + get() = data.priorityStatusEntries + override fun toString(): String = "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, hasSynced=$hasSynced, error=$anyError)" diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index e1bf50a1..5f2413d5 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -11,7 +11,9 @@ import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient +import io.ktor.client.HttpClientConfig import io.ktor.client.call.body +import io.ktor.client.engine.HttpClientEngine import io.ktor.client.plugins.HttpTimeout import io.ktor.client.plugins.contentnegotiation.ContentNegotiation import io.ktor.client.plugins.timeout @@ -32,11 +34,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.datetime.Clock import kotlinx.serialization.encodeToString -import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.JsonPrimitive -import kotlinx.serialization.json.decodeFromJsonElement -import kotlinx.serialization.json.jsonObject internal class SyncStream( private val bucketStorage: BucketStorage, @@ -45,6 +43,7 @@ internal class SyncStream( private val retryDelayMs: Long = 5000L, private val logger: Logger, private val params: JsonObject, + httpEngine: HttpClientEngine? = null, ) { private var isUploadingCrud = AtomicBoolean(false) @@ -55,22 +54,24 @@ internal class SyncStream( private var clientId: String? = null - private val httpClient: HttpClient = - HttpClient { + private val httpClient: HttpClient + + init { + fun HttpClientConfig<*>.configureClient() { install(HttpTimeout) install(ContentNegotiation) } - companion object { - fun isStreamingSyncData(obj: JsonObject): Boolean = obj.containsKey("data") - - fun isStreamingKeepAlive(obj: JsonObject): Boolean = obj.containsKey("token_expires_in") - - fun isStreamingSyncCheckpoint(obj: JsonObject): Boolean = obj.containsKey("checkpoint") - - fun isStreamingSyncCheckpointComplete(obj: JsonObject): Boolean = obj.containsKey("checkpoint_complete") - - fun isStreamingSyncCheckpointDiff(obj: JsonObject): Boolean = obj.containsKey("checkpoint_diff") + httpClient = + if (httpEngine == null) { + HttpClient { + configureClient() + } + } else { + HttpClient(httpEngine) { + configureClient() + } + } } fun invalidateCredentials() { @@ -185,7 +186,7 @@ internal class SyncStream( return body.data.writeCheckpoint } - private suspend fun streamingSyncRequest(req: StreamingSyncRequest): Flow = + private fun streamingSyncRequest(req: StreamingSyncRequest): Flow = flow { val credentials = connector.getCredentialsCached() require(credentials != null) { "Not logged in" } @@ -230,7 +231,7 @@ internal class SyncStream( val bucketEntries = bucketStorage.getBucketStates() val initialBuckets = mutableMapOf() - val state = + var state = SyncStreamState( targetCheckpoint = null, validatedCheckpoint = null, @@ -251,44 +252,36 @@ internal class SyncStream( ) streamingSyncRequest(req).collect { value -> - handleInstruction(value, state) + val line = JsonUtil.json.decodeFromString(value) + state = handleInstruction(line, value, state) } return state } private suspend fun handleInstruction( + line: SyncLine, jsonString: String, state: SyncStreamState, - ): SyncStreamState { - val obj = JsonUtil.json.parseToJsonElement(jsonString).jsonObject - // TODO: Clean up - when { - isStreamingSyncCheckpoint(obj) -> return handleStreamingSyncCheckpoint(obj, state) - isStreamingSyncCheckpointComplete(obj) -> return handleStreamingSyncCheckpointComplete( - state, - ) - - isStreamingSyncCheckpointDiff(obj) -> return handleStreamingSyncCheckpointDiff( - obj, - state, - ) - - isStreamingSyncData(obj) -> return handleStreamingSyncData(obj, state) - isStreamingKeepAlive(obj) -> return handleStreamingKeepAlive(obj, state) - else -> { + ): SyncStreamState = + when (line) { + is SyncLine.FullCheckpoint -> handleStreamingSyncCheckpoint(line, state) + is SyncLine.CheckpointDiff -> handleStreamingSyncCheckpointDiff(line, state) + is SyncLine.CheckpointComplete -> handleStreamingSyncCheckpointComplete(state) + is SyncLine.CheckpointPartiallyComplete -> handleStreamingSyncCheckpointPartiallyComplete(line, state) + is SyncLine.KeepAlive -> handleStreamingKeepAlive(line, state) + is SyncLine.SyncDataBucket -> handleStreamingSyncData(line, state) + SyncLine.UnknownSyncLine -> { logger.w { "Unhandled instruction $jsonString" } - return state + state } } - } private suspend fun handleStreamingSyncCheckpoint( - jsonObj: JsonObject, + line: SyncLine.FullCheckpoint, state: SyncStreamState, ): SyncStreamState { - val checkpoint = - JsonUtil.json.decodeFromJsonElement(jsonObj["checkpoint"] as JsonElement) + val (checkpoint) = line state.targetCheckpoint = checkpoint val bucketsToDelete = state.bucketSet!!.toMutableList() val newBuckets = mutableSetOf() @@ -335,16 +328,51 @@ internal class SyncStream( return state } + private suspend fun handleStreamingSyncCheckpointPartiallyComplete( + line: SyncLine.CheckpointPartiallyComplete, + state: SyncStreamState, + ): SyncStreamState { + val priority = line.priority + val result = bucketStorage.syncLocalDatabase(state.targetCheckpoint!!, priority) + if (!result.checkpointValid) { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + delay(50) + state.retry = true + // TODO handle retries + return state + } else if (!result.ready) { + // Checksums valid, but need more data for a consistent checkpoint. + // Continue waiting. + } else { + logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" } + } + + status.update( + priorityStatusEntries = + buildList { + // All states with a higher priority can be deleted since this partial sync includes them. + addAll(status.priorityStatusEntries.filter { it.priority >= line.priority }) + add( + PriorityStatusEntry( + priority = priority, + lastSyncedAt = Clock.System.now(), + hasSynced = true, + ), + ) + }, + ) + return state + } + private suspend fun handleStreamingSyncCheckpointDiff( - jsonObj: JsonObject, + checkpointDiff: SyncLine.CheckpointDiff, state: SyncStreamState, ): SyncStreamState { // TODO: It may be faster to just keep track of the diff, instead of the entire checkpoint if (state.targetCheckpoint == null) { throw Exception("Checkpoint diff without previous checkpoint") } - val checkpointDiff = - JsonUtil.json.decodeFromJsonElement(jsonObj["checkpoint_diff"]!!) val newBuckets = mutableMapOf() @@ -379,22 +407,18 @@ internal class SyncStream( } private suspend fun handleStreamingSyncData( - jsonObj: JsonObject, + data: SyncLine.SyncDataBucket, state: SyncStreamState, ): SyncStreamState { - val syncBuckets = - listOf(JsonUtil.json.decodeFromJsonElement(jsonObj["data"] as JsonElement)) - - bucketStorage.saveSyncData(SyncDataBatch(syncBuckets)) - + bucketStorage.saveSyncData(SyncDataBatch(listOf(data))) return state } private suspend fun handleStreamingKeepAlive( - jsonObj: JsonObject, + keepAlive: SyncLine.KeepAlive, state: SyncStreamState, ): SyncStreamState { - val tokenExpiresIn = (jsonObj["token_expires_in"] as JsonPrimitive).content.toInt() + val (tokenExpiresIn) = keepAlive if (tokenExpiresIn <= 0) { // Connection would be closed automatically right after this diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt new file mode 100644 index 00000000..50a88cd9 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt @@ -0,0 +1,107 @@ +package com.powersync.sync + +import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.BucketPriority +import com.powersync.bucket.Checkpoint +import com.powersync.utils.JsonUtil +import kotlin.test.Test +import kotlin.test.assertEquals + +class SyncLineTest { + private fun checkDeserializing( + expected: SyncLine, + json: String, + ) { + assertEquals(expected, JsonUtil.json.decodeFromString(json)) + } + + @Test + fun testDeserializeCheckpoint() { + checkDeserializing( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(), + ), + ), + """{"checkpoint": {"last_op_id": "10", "buckets": []}}""", + ) + } + + @Test + fun testDeserializeCheckpointNoPriority() { + checkDeserializing( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(BucketChecksum(bucket = "a", priority = BucketPriority(3), checksum = 10)), + ), + ), + """{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "checksum": 10}]}}""", + ) + } + + @Test + fun testDeserializeCheckpointWithPriority() { + checkDeserializing( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(BucketChecksum(bucket = "a", priority = BucketPriority(1), checksum = 10)), + ), + ), + """{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "priority": 1, "checksum": 10}]}}""", + ) + } + + @Test + fun testDeserializeCheckpointDiff() { + checkDeserializing( + SyncLine.CheckpointDiff( + lastOpId = "10", + updatedBuckets = listOf(), + removedBuckets = listOf(), + ), + """{"checkpoint_diff": {"last_op_id": "10", "buckets": [], "updated_buckets": [], "removed_buckets": []}}""", + ) + } + + @Test + fun testDeserializeCheckpointComplete() { + checkDeserializing(SyncLine.CheckpointComplete(lastOpId = "10"), """{"checkpoint_complete": {"last_op_id": "10"}}""") + } + + @Test + fun testDeserializePartialCheckpointComplete() { + checkDeserializing( + SyncLine.CheckpointPartiallyComplete( + lastOpId = "10", + priority = BucketPriority(1), + ), + """{"partial_checkpoint_complete": {"last_op_id": "10", "priority": 1}}""", + ) + } + + @Test + fun testDeserializeData() { + checkDeserializing( + SyncLine.SyncDataBucket( + bucket = "bkt", + data = emptyList(), + after = null, + nextAfter = null, + ), + """{"data": {"bucket": "bkt", "data": [], "after": null, "next_after": null}}""", + ) + } + + @Test + fun testKeepAlive() { + checkDeserializing(SyncLine.KeepAlive(100), """{"token_expires_in": 100}""") + } + + @Test + fun testDeserializeUnknown() { + checkDeserializing(SyncLine.UnknownSyncLine, """{"unknown_key": true}""") + } +} diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 26cf99b7..3d9ebfaf 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -1,27 +1,46 @@ package com.powersync.sync +import app.cash.turbine.turbineScope import co.touchlab.kermit.Logger import co.touchlab.kermit.Severity import co.touchlab.kermit.TestConfig import co.touchlab.kermit.TestLogWriter +import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.BucketPriority import com.powersync.bucket.BucketStorage +import com.powersync.bucket.Checkpoint +import com.powersync.bucket.OpType +import com.powersync.bucket.OplogEntry import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType +import com.powersync.testutils.MockSyncService +import com.powersync.testutils.waitFor +import com.powersync.utils.JsonUtil import dev.mokkery.answering.returns import dev.mokkery.everySuspend +import dev.mokkery.matcher.any import dev.mokkery.mock +import dev.mokkery.resetCalls import dev.mokkery.verify +import dev.mokkery.verify.VerifyMode.Companion.order +import dev.mokkery.verifyNoMoreCalls +import dev.mokkery.verifySuspend +import io.ktor.client.engine.mock.MockEngine +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout +import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonObject import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertContains import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.seconds @OptIn(co.touchlab.kermit.ExperimentalKermitApi::class) class SyncStreamTest { @@ -39,11 +58,36 @@ class SyncStreamTest { logWriterList = listOf(testLogWriter), ), ) + private val assertNoHttpEngine = + MockEngine { request -> + error("Unexpected HTTP request: $request") + } @BeforeTest fun setup() { - bucketStorage = mock() - connector = mock() + bucketStorage = + mock { + everySuspend { getClientId() } returns "test-client-id" + everySuspend { getBucketStates() } returns emptyList() + everySuspend { removeBuckets(any()) } returns Unit + everySuspend { setTargetCheckpoint(any()) } returns Unit + everySuspend { saveSyncData(any()) } returns Unit + everySuspend { syncLocalDatabase(any(), any()) } returns + SyncLocalDatabaseResult( + ready = true, + checkpointValid = true, + checkpointFailures = emptyList(), + ) + } + connector = + mock { + everySuspend { getCredentialsCached() } returns + PowerSyncCredentials( + token = "test-token", + userId = "test-user", + endpoint = "https://test.com", + ) + } } @Test @@ -58,6 +102,7 @@ class SyncStreamTest { SyncStream( bucketStorage = bucketStorage, connector = connector, + httpEngine = assertNoHttpEngine, uploadCrud = {}, logger = logger, params = JsonObject(emptyMap()), @@ -92,6 +137,7 @@ class SyncStreamTest { SyncStream( bucketStorage = bucketStorage, connector = connector, + httpEngine = assertNoHttpEngine, uploadCrud = { }, retryDelayMs = 10, logger = logger, @@ -126,20 +172,11 @@ class SyncStreamTest { everySuspend { getBucketStates() } returns emptyList() } - connector = - mock { - everySuspend { getCredentialsCached() } returns - PowerSyncCredentials( - token = "test-token", - userId = "test-user", - endpoint = "https://test.com", - ) - } - syncStream = SyncStream( bucketStorage = bucketStorage, connector = connector, + httpEngine = assertNoHttpEngine, uploadCrud = { }, retryDelayMs = 10, logger = logger, @@ -166,4 +203,121 @@ class SyncStreamTest { // Clean up job.cancel() } + + @Test + fun testPartialSync() = + runTest { + // TODO: It would be neat if we could use in-memory sqlite instances instead of mocking everything + // Revisit https://github.com/powersync-ja/powersync-kotlin/pull/117/files at some point + val syncLines = Channel() + val client = MockSyncService.client(this, syncLines.receiveAsFlow()) + + syncStream = + SyncStream( + bucketStorage = bucketStorage, + connector = connector, + httpEngine = client, + uploadCrud = { }, + retryDelayMs = 10, + logger = logger, + params = JsonObject(emptyMap()), + ) + + val job = launch { syncStream.streamingSync() } + var operationId = 1 + + suspend fun pushData(priority: Int) { + val id = operationId++ + + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "prio$priority", + data = + listOf( + OplogEntry( + checksum = (priority + 10).toLong(), + data = JsonUtil.json.encodeToString(mapOf("foo" to "bar")), + op = OpType.PUT, + opId = id.toString(), + rowId = "prio$priority", + rowType = "customers", + ), + ), + after = null, + nextAfter = null, + ), + ) + } + + turbineScope(timeout = 10.0.seconds) { + val turbine = syncStream.status.asFlow().testIn(this) + turbine.waitFor { it.connected } + resetCalls(bucketStorage) + + // Start a sync flow + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "4", + checksums = + buildList { + for (priority in 0..3) { + add( + BucketChecksum( + bucket = "prio$priority", + priority = BucketPriority(priority), + checksum = 10 + priority, + ), + ) + } + }, + ), + ), + ) + + // Emit a partial sync complete for each priority but the last. + for (priorityNo in 0..<3) { + val priority = BucketPriority(priorityNo) + pushData(priorityNo) + syncLines.send( + SyncLine.CheckpointPartiallyComplete( + lastOpId = operationId.toString(), + priority = priority, + ), + ) + + turbine.waitFor { it.priorityStatusFor(priority).hasSynced == true } + + verifySuspend(order) { + if (priorityNo == 0) { + bucketStorage.removeBuckets(any()) + bucketStorage.setTargetCheckpoint(any()) + } + + bucketStorage.saveSyncData(any()) + bucketStorage.syncLocalDatabase(any(), priority) + } + } + + // Then complete the sync + pushData(3) + syncLines.send( + SyncLine.CheckpointComplete( + lastOpId = operationId.toString(), + ), + ) + + turbine.waitFor { it.hasSynced == true } + verifySuspend { + bucketStorage.saveSyncData(any()) + bucketStorage.syncLocalDatabase(any(), null) + } + + turbine.cancel() + } + + verifyNoMoreCalls(bucketStorage) + job.cancel() + syncLines.close() + } } diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt new file mode 100644 index 00000000..eb57a232 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -0,0 +1,67 @@ +package com.powersync.testutils + +import app.cash.turbine.ReceiveTurbine +import com.powersync.sync.SyncLine +import com.powersync.sync.SyncStatusData +import com.powersync.utils.JsonUtil +import io.ktor.client.engine.HttpClientEngine +import io.ktor.client.engine.mock.MockEngine +import io.ktor.client.engine.mock.MockRequestHandleScope +import io.ktor.client.engine.mock.respond +import io.ktor.client.engine.mock.respondBadRequest +import io.ktor.client.request.HttpRequestData +import io.ktor.client.request.HttpResponseData +import io.ktor.utils.io.ByteChannel +import io.ktor.utils.io.writeStringUtf8 +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.launch +import kotlinx.serialization.encodeToString + +internal class MockSyncService private constructor( + private val scope: CoroutineScope, + private val lines: Flow, +) { + private fun handleRequest( + scope: MockRequestHandleScope, + request: HttpRequestData, + ): HttpResponseData = + if (request.url.encodedPath == "/sync/stream") { + val channel = ByteChannel(autoFlush = true) + this.scope.launch { + lines.collect { + val serializedLine = JsonUtil.json.encodeToString(it) + channel.writeStringUtf8("$serializedLine\n") + } + } + + scope.respond(channel) + } else { + scope.respondBadRequest() + } + + companion object { + fun client( + scope: CoroutineScope, + lines: Flow, + ): HttpClientEngine { + val service = MockSyncService(scope, lines) + return MockEngine { request -> + service.handleRequest(this, request) + } + } + } +} + +suspend inline fun ReceiveTurbine.waitFor(matcher: (SyncStatusData) -> Boolean) { + while (true) { + val item = awaitItem() + if (matcher(item)) { + break + } + + item.anyError?.let { + error("Unexpected error in $item") + } + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d95c5432..73d20659 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -15,7 +15,7 @@ kotlinx-datetime = "0.5.0" kotlinx-io = "0.5.4" ktor = "3.0.1" uuid = "0.8.2" -powersync-core = "0.3.8" +powersync-core = "0.3.11" sqlite-android = "3.45.0" sqlite-jdbc = "3.45.2.0" turbine = "1.2.0" @@ -72,6 +72,7 @@ ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" } ktor-client-ios = { module = "io.ktor:ktor-client-darwin", version.ref = "ktor" } ktor-client-okhttp = { module = "io.ktor:ktor-client-okhttp", version.ref = "ktor" } ktor-client-contentnegotiation = { module = "io.ktor:ktor-client-content-negotiation", version.ref = "ktor" } +ktor-client-mock = { module = "io.ktor:ktor-client-mock", version.ref = "ktor" } ktor-serialization-json = { module = "io.ktor:ktor-serialization-kotlinx-json", version.ref = "ktor" } kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" }