From dc70bb46253632dd1931192a32e18fa19d7d59fd Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 14:38:43 -0600 Subject: [PATCH 1/3] Rust client: Schedule crud uploads after connecting --- CHANGELOG.md | 1 + .../com/powersync/sync/SyncIntegrationTest.kt | 47 +++++++++++++++++++ .../kotlin/com/powersync/sync/SyncStream.kt | 37 ++++++++++++--- 3 files changed, 78 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db4f36da..e95827c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [Supabase Connector] Fixed issue where only `400` HTTP status code errors where reported as connection errors. The connector now reports errors for codes `>=400`. * Update PowerSync core extension to `0.4.1`, fixing an issue with the new Rust client. +* Rust sync client: Fix writes made while offline not being uploaded reliably. ## 1.2.0 diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index b6b214eb..ebca4797 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -563,6 +563,53 @@ abstract class BaseSyncIntegrationTest( database.expectUserCount(2) } + @Test + fun `handles write made while offline`() = databaseTest { + connector = TestConnector() + val uploadCompleted = CompletableDeferred() + checkpointResponse = { + uploadCompleted.complete(Unit) + WriteCheckpointResponse(WriteCheckpointData("1")) + } + + database.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("local write")) + database.connect(connector, options = options) + + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(scope) + turbine.waitFor { it.connected } + + val query = database.watch("SELECT name FROM users") { it.getString(0)!! }.testIn(scope) + query.awaitItem() shouldBe listOf("local write") + + syncLines.send(SyncLine.FullCheckpoint( + Checkpoint( + writeCheckpoint = "1", + lastOpId = "1", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ), + )) + syncLines.send(SyncLine.SyncDataBucket(bucket = "a", data = listOf( + OplogEntry( + checksum = 0, + opId = "1", + op = OpType.PUT, + rowId = "1", + rowType = "users", + data = """{"id": "test1", "name": "from server"}""", + ), + ), after = null, nextAfter = null)) + + uploadCompleted.await() + syncLines.send(SyncLine.CheckpointComplete("1")) + + query.awaitItem() shouldBe listOf("from server") + + turbine.cancelAndIgnoreRemainingEvents() + query.cancelAndIgnoreRemainingEvents() + } + } + @Test fun testTokenExpired() = databaseTest { diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 8d0dbc43..9559f6fb 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -48,7 +48,6 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import kotlinx.datetime.Clock import kotlinx.serialization.Serializable -import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.encodeToJsonElement @@ -133,6 +132,7 @@ internal class SyncStream( uploadAllCrud() } finally { if (holdingUploadLock) { + logger.v { "crud upload: notify completion" } completedCrudUploads.send(Unit) isUploadingCrud.set(null) } @@ -297,6 +297,7 @@ internal class SyncStream( */ private inner class ActiveIteration( val scope: CoroutineScope, + var hadSyncLine: Boolean = false, var fetchLinesJob: Job? = null, var credentialsInvalidation: Job? = null, ) { @@ -342,6 +343,8 @@ internal class SyncStream( fetchLinesJob = scope.launch { launch { + logger.v { "listening for completed uploads" } + for (completion in completedCrudUploads) { control("completed_upload") } @@ -406,16 +409,36 @@ internal class SyncStream( } } + /** + * Triggers a crud upload when called for the first time. + * + * We could have pending local writes made while disconnected, so in addition to listening + * on updates to `ps_crud`, we also need to trigger a CRUD upload in some other cases. We + * do this on the first sync line because the client is likely to be online in that case. + */ + private fun triggerCrudUploadIfFirstLine() { + if (!hadSyncLine) { + triggerCrudUploadAsync() + hadSyncLine = true + } + } + + private suspend fun line(text: String) { + triggerCrudUploadIfFirstLine() + control("line_text", text) + } + + private suspend fun line(blob: ByteArray) { + triggerCrudUploadIfFirstLine() + control("line_binary", blob) + } + private suspend fun connect(start: Instruction.EstablishSyncStream) { when (val method = options.method) { ConnectionMethod.Http -> - connectViaHttp(start.request).collect { rawLine -> - control("line_text", rawLine) - } + connectViaHttp(start.request).collect(this::line) is ConnectionMethod.WebSocket -> - connectViaWebSocket(start.request, method).collect { binaryLine -> - control("line_binary", binaryLine) - } + connectViaWebSocket(start.request, method).collect(this::line) } } } From 6930f1fe1c231f3c4d52c8620e424139feabdfa5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 15:02:38 -0600 Subject: [PATCH 2/3] Reformat --- .../com/powersync/sync/SyncIntegrationTest.kt | 85 +++++++++++-------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index ebca4797..6a97ad1d 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -564,51 +564,62 @@ abstract class BaseSyncIntegrationTest( } @Test - fun `handles write made while offline`() = databaseTest { - connector = TestConnector() - val uploadCompleted = CompletableDeferred() - checkpointResponse = { - uploadCompleted.complete(Unit) - WriteCheckpointResponse(WriteCheckpointData("1")) - } + fun `handles write made while offline`() = + databaseTest { + connector = TestConnector() + val uploadCompleted = CompletableDeferred() + checkpointResponse = { + uploadCompleted.complete(Unit) + WriteCheckpointResponse(WriteCheckpointData("1")) + } - database.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("local write")) - database.connect(connector, options = options) + database.execute("INSERT INTO users (id, name) VALUES (uuid(), ?)", listOf("local write")) + database.connect(connector, options = options) - turbineScope(timeout = 10.0.seconds) { - val turbine = database.currentStatus.asFlow().testIn(scope) - turbine.waitFor { it.connected } + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(scope) + turbine.waitFor { it.connected } - val query = database.watch("SELECT name FROM users") { it.getString(0)!! }.testIn(scope) - query.awaitItem() shouldBe listOf("local write") + val query = database.watch("SELECT name FROM users") { it.getString(0)!! }.testIn(scope) + query.awaitItem() shouldBe listOf("local write") - syncLines.send(SyncLine.FullCheckpoint( - Checkpoint( - writeCheckpoint = "1", - lastOpId = "1", - checksums = listOf(BucketChecksum("a", checksum = 0)), - ), - )) - syncLines.send(SyncLine.SyncDataBucket(bucket = "a", data = listOf( - OplogEntry( - checksum = 0, - opId = "1", - op = OpType.PUT, - rowId = "1", - rowType = "users", - data = """{"id": "test1", "name": "from server"}""", - ), - ), after = null, nextAfter = null)) + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + writeCheckpoint = "1", + lastOpId = "1", + checksums = listOf(BucketChecksum("a", checksum = 0)), + ), + ), + ) + syncLines.send( + SyncLine.SyncDataBucket( + bucket = "a", + data = + listOf( + OplogEntry( + checksum = 0, + opId = "1", + op = OpType.PUT, + rowId = "1", + rowType = "users", + data = """{"id": "test1", "name": "from server"}""", + ), + ), + after = null, + nextAfter = null, + ), + ) - uploadCompleted.await() - syncLines.send(SyncLine.CheckpointComplete("1")) + uploadCompleted.await() + syncLines.send(SyncLine.CheckpointComplete("1")) - query.awaitItem() shouldBe listOf("from server") + query.awaitItem() shouldBe listOf("from server") - turbine.cancelAndIgnoreRemainingEvents() - query.cancelAndIgnoreRemainingEvents() + turbine.cancelAndIgnoreRemainingEvents() + query.cancelAndIgnoreRemainingEvents() + } } - } @Test fun testTokenExpired() = From 7d3f555ecf5e846c70d5a8dcac21010712cbad78 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 15:14:02 -0600 Subject: [PATCH 3/3] Send keepalive for legacy sync --- .../kotlin/com/powersync/sync/SyncIntegrationTest.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 6a97ad1d..f780a0d7 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -583,6 +583,7 @@ abstract class BaseSyncIntegrationTest( val query = database.watch("SELECT name FROM users") { it.getString(0)!! }.testIn(scope) query.awaitItem() shouldBe listOf("local write") + syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 1234)) syncLines.send( SyncLine.FullCheckpoint( Checkpoint(