From 58ce096144e3f68044aefd5fe6f4a1ee20f1455d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 8 Sep 2025 17:44:49 +0200 Subject: [PATCH 1/7] Support sync streams # Conflicts: # core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt # core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt --- .../com/powersync/sync/SyncIntegrationTest.kt | 14 +- .../com/powersync/sync/SyncProgressTest.kt | 31 +-- .../com/powersync/sync/SyncStreamTest.kt | 229 ++++++++++++++++++ .../com/powersync/testutils/TestUtils.kt | 18 ++ .../kotlin/com/powersync/PowerSyncDatabase.kt | 15 +- .../com/powersync/bucket/BucketChecksum.kt | 4 +- .../com/powersync/bucket/BucketStorage.kt | 14 +- .../com/powersync/bucket/BucketStorageImpl.kt | 8 +- .../kotlin/com/powersync/bucket/Checkpoint.kt | 2 + .../{BucketPriority.kt => StreamPriority.kt} | 16 +- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 82 +++---- .../kotlin/com/powersync/db/StreamImpl.kt | 169 +++++++++++++ .../com/powersync/db/crud/SerializedRow.kt | 28 ++- .../kotlin/com/powersync/sync/Instruction.kt | 39 ++- .../kotlin/com/powersync/sync/Progress.kt | 13 +- .../kotlin/com/powersync/sync/Stream.kt | 102 ++++++++ .../sync/{SyncStream.kt => StreamingSync.kt} | 64 +++-- .../kotlin/com/powersync/sync/SyncLine.kt | 4 +- .../kotlin/com/powersync/sync/SyncOptions.kt | 5 + .../kotlin/com/powersync/sync/SyncStatus.kt | 100 ++++++-- .../kotlin/com/powersync/sync/SyncLineTest.kt | 8 +- .../com/powersync/sync/SyncStreamTest.kt | 36 +-- .../androidexample/screens/HomeScreen.kt | 4 +- .../powersync/demos/components/GuardBySync.kt | 4 +- .../com/powersync/demos/screens/HomeScreen.kt | 8 +- 25 files changed, 849 insertions(+), 168 deletions(-) create mode 100644 core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt rename core/src/commonMain/kotlin/com/powersync/bucket/{BucketPriority.kt => StreamPriority.kt} (52%) create mode 100644 core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt create mode 100644 core/src/commonMain/kotlin/com/powersync/sync/Stream.kt rename core/src/commonMain/kotlin/com/powersync/sync/{SyncStream.kt => StreamingSync.kt} (93%) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 306db1f9..a1eb15d2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -7,10 +7,10 @@ import com.powersync.PowerSyncDatabase import com.powersync.PowerSyncException import com.powersync.TestConnector import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.StreamPriority import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector @@ -165,7 +165,7 @@ abstract class BaseSyncIntegrationTest( add( BucketChecksum( bucket = "bucket$prio", - priority = BucketPriority(prio), + priority = StreamPriority(prio), checksum = 10 + prio, ), ) @@ -218,7 +218,7 @@ abstract class BaseSyncIntegrationTest( // Emit a partial sync complete for each priority but the last. for (priorityNo in 0..<3) { - val priority = BucketPriority(priorityNo) + val priority = StreamPriority(priorityNo) pushData(priorityNo) syncLines.send( SyncLine.CheckpointPartiallyComplete( @@ -258,7 +258,7 @@ abstract class BaseSyncIntegrationTest( listOf( BucketChecksum( bucket = "bkt", - priority = BucketPriority(1), + priority = StreamPriority(1), checksum = 0, ), ), @@ -268,17 +268,17 @@ abstract class BaseSyncIntegrationTest( syncLines.send( SyncLine.CheckpointPartiallyComplete( lastOpId = "0", - priority = BucketPriority(1), + priority = StreamPriority(1), ), ) - database.waitForFirstSync(BucketPriority(1)) + database.waitForFirstSync(StreamPriority(1)) database.close() // Connect to the same database again database = openDatabaseAndInitialize() database.currentStatus.hasSynced shouldBe false - database.currentStatus.statusForPriority(BucketPriority(1)).hasSynced shouldBe true + database.currentStatus.statusForPriority(StreamPriority(1)).hasSynced shouldBe true } @Test diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index c2b3cb9e..a4ee9b34 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -3,11 +3,12 @@ package com.powersync.sync import app.cash.turbine.ReceiveTurbine import app.cash.turbine.turbineScope import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.StreamPriority import com.powersync.testutils.ActiveDatabaseTest +import com.powersync.testutils.bucket import com.powersync.testutils.databaseTest import com.powersync.testutils.waitFor import io.kotest.assertions.withClue @@ -32,18 +33,6 @@ abstract class BaseSyncProgressTest( lastOpId = 0 } - private fun bucket( - name: String, - count: Int, - priority: BucketPriority = BucketPriority(3), - ): BucketChecksum = - BucketChecksum( - bucket = name, - priority = priority, - checksum = 0, - count = count, - ) - private suspend fun ActiveDatabaseTest.addDataLine( bucket: String, amount: Int, @@ -68,7 +57,7 @@ abstract class BaseSyncProgressTest( ) } - private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) { + private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: StreamPriority? = null) { if (priority != null) { syncLines.send( SyncLine.CheckpointPartiallyComplete( @@ -93,7 +82,7 @@ abstract class BaseSyncProgressTest( private suspend fun ReceiveTurbine.expectProgress( total: Pair, - priorities: Map> = emptyMap(), + priorities: Map> = emptyMap(), ) { val item = awaitItem() val progress = item.downloadProgress ?: error("Expected download progress on $item") @@ -357,7 +346,7 @@ abstract class BaseSyncProgressTest( ) { turbine.expectProgress( prio2, - mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2), + mapOf(StreamPriority(0) to prio0, StreamPriority(2) to prio2), ) } @@ -367,8 +356,8 @@ abstract class BaseSyncProgressTest( lastOpId = "10", checksums = listOf( - bucket("a", 5, BucketPriority(0)), - bucket("b", 5, BucketPriority(2)), + bucket("a", 5, StreamPriority(0)), + bucket("b", 5, StreamPriority(2)), ), ), ), @@ -378,7 +367,7 @@ abstract class BaseSyncProgressTest( addDataLine("a", 5) expectProgress(5 to 5, 5 to 10) - addCheckpointComplete(BucketPriority(0)) + addCheckpointComplete(StreamPriority(0)) expectProgress(5 to 5, 5 to 10) addDataLine("b", 2) @@ -390,8 +379,8 @@ abstract class BaseSyncProgressTest( lastOpId = "14", updatedBuckets = listOf( - bucket("a", 8, BucketPriority(0)), - bucket("b", 6, BucketPriority(2)), + bucket("a", 8, StreamPriority(0)), + bucket("b", 6, StreamPriority(2)), ), removedBuckets = emptyList(), ), diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt new file mode 100644 index 00000000..5302d1f0 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -0,0 +1,229 @@ +package com.powersync.sync + +import app.cash.turbine.turbineScope +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.bucket.Checkpoint +import com.powersync.bucket.StreamPriority +import com.powersync.testutils.bucket +import com.powersync.testutils.databaseTest +import com.powersync.testutils.waitFor +import com.powersync.utils.JsonParam +import com.powersync.utils.JsonUtil +import io.kotest.matchers.collections.shouldHaveSingleElement +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.serialization.json.JsonArray +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.buildJsonArray +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.jsonArray +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import kotlinx.serialization.json.put +import kotlin.test.Test + +@OptIn(ExperimentalPowerSyncAPI::class, LegacySyncImplementation::class) +class SyncStreamTest : AbstractSyncTest(true) { + @Test + fun `can disable default streams`() = + databaseTest { + database.connect( + connector, + options = + SyncOptions( + newClientImplementation = true, + includeDefaultStreams = false, + clientConfiguration = SyncClientConfiguration.ExistingClient(createSyncClient()), + ), + ) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + streams["include_defaults"]!!.jsonPrimitive.content shouldBe "false" + + true + } + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `subscribes with streams`() = + databaseTest { + val a = database.syncStream("stream", mapOf("foo" to JsonParam.String("a"))).subscribe() + val b = database.syncStream("stream", mapOf("foo" to JsonParam.String("b"))).subscribe(priority = StreamPriority(1)) + + database.connect(connector, options = getOptions()) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Should request subscriptions + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + + subscriptions shouldHaveSize 2 + JsonUtil.json.encodeToString(subscriptions[0]) shouldBe + """{"stream":"stream","parameters":{"foo":"a"},"override_priority":null}""" + JsonUtil.json.encodeToString(subscriptions[1]) shouldBe + """{"stream":"stream","parameters":{"foo":"b"},"override_priority":1}""" + true + } + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "1", + checksums = + listOf( + bucket( + "a", + 0, + subscriptions = + buildJsonArray { + add(defaultSubscription(0)) + }, + ), + bucket( + "b", + 0, + priority = StreamPriority(1), + subscriptions = + buildJsonArray { + add(defaultSubscription(1)) + }, + ), + ), + streams = listOf(stream("stream", false)), + ), + ), + ) + + // Subscriptions should be active now, but not marked as synced. + var status = turbine.awaitItem() + for (subscription in listOf(a, b)) { + val subscriptionStatus = status.forStream(subscription)!! + subscriptionStatus.subscription.active shouldBe true + subscriptionStatus.subscription.lastSyncedAt shouldBe null + subscriptionStatus.subscription.hasExplicitSubscription shouldBe true + } + + syncLines.send( + SyncLine.CheckpointPartiallyComplete( + lastOpId = "0", + priority = StreamPriority(1), + ), + ) + status = turbine.awaitItem() + status.forStream(a)!!.subscription.lastSyncedAt shouldBe null + status.forStream(b)!!.subscription.lastSyncedAt shouldNotBeNull {} + b.waitForFirstSync() + + syncLines.send(SyncLine.CheckpointComplete(lastOpId = "0")) + a.waitForFirstSync() + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `reports default streams`() = + databaseTest { + database.connect(connector, options = getOptions()) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "1", + checksums = listOf(), + streams = listOf(stream("default_stream", true)), + ), + ), + ) + + val status = turbine.awaitItem() + status.syncStreams!! shouldHaveSingleElement { + it.subscription.name shouldBe "default_stream" + it.subscription.parameters shouldBe null + it.subscription.isDefault shouldBe true + it.subscription.hasExplicitSubscription shouldBe false + true + } + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @OptIn(DelicateCoroutinesApi::class) + @Test + fun `changes subscriptions dynamically`() = + databaseTest { + database.connect(connector, options = getOptions()) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + requestedSyncStreams.clear() + + val subscription = database.syncStream("a").subscribe() + + // Adding the subscription should reconnect + turbine.waitFor { it.connected && !it.downloading } + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + + subscriptions shouldHaveSize 1 + JsonUtil.json.encodeToString(subscriptions[0]) shouldBe """{"stream":"a","parameters":null,"override_priority":null}""" + true + } + + // Given that the subscription has a default TTL, unsubscribing should not re-subscribe. + subscription.unsubscribe() + delay(100) + turbine.expectNoEvents() + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `subscriptions update while offline`() = + databaseTest { + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.awaitItem() // Ignore initial + + // Subscribing while offline should add the stream to the subscriptions reported in the + // status. + val subscription = database.syncStream("foo").subscribe() + val status = turbine.awaitItem() + status.forStream(subscription) shouldNotBeNull {} + + turbine.cancelAndIgnoreRemainingEvents() + } + } +} + +private fun stream( + name: String, + isDefault: Boolean, +): JsonObject = + buildJsonObject { + put("name", name) + put("is_default", isDefault) + put("errors", JsonArray(emptyList())) + } + +private fun defaultSubscription(index: Int): JsonObject = buildJsonObject { put("sub", index) } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 0b533cfd..207de1a4 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -11,6 +11,8 @@ import com.powersync.DatabaseDriverFactory import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncTestLogWriter import com.powersync.TestConnector +import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.StreamPriority import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.createPowerSyncDatabaseImpl @@ -26,6 +28,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.io.files.Path +import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonElement expect val factory: DatabaseDriverFactory @@ -153,3 +156,18 @@ internal class ActiveDatabaseTest( cleanup(path) } } + +internal fun bucket( + name: String, + count: Int, + priority: StreamPriority = StreamPriority(3), + checksum: Int = 0, + subscriptions: JsonArray? = null, +): BucketChecksum = + BucketChecksum( + bucket = name, + priority = priority, + checksum = checksum, + count = count, + subscriptions = subscriptions, + ) diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index de2b1399..9a70cc5c 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -1,7 +1,7 @@ package com.powersync import co.touchlab.kermit.Logger -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.ActiveDatabaseGroup import com.powersync.db.ActiveDatabaseResource @@ -13,6 +13,7 @@ import com.powersync.db.driver.SQLiteConnectionPool import com.powersync.db.schema.Schema import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatus +import com.powersync.sync.SyncStream import com.powersync.utils.JsonParam import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow @@ -66,7 +67,7 @@ public interface PowerSyncDatabase : Queries { * given [priority] (or a higher one, since those would be synchronized first) has completed. */ @Throws(PowerSyncException::class, CancellationException::class) - public suspend fun waitForFirstSync(priority: BucketPriority) + public suspend fun waitForFirstSync(priority: StreamPriority) /** * Connect to the PowerSync service, and keep the databases in sync. @@ -181,6 +182,16 @@ public interface PowerSyncDatabase : Queries { @Throws(PowerSyncException::class, CancellationException::class) public suspend fun getPowerSyncVersion(): String + /** + * Create a [SyncStream] instance for the given [name] and [parameters]. + * + * Use [SyncStream.subscribe] on the returned instance to subscribe to the stream. + */ + public fun syncStream( + name: String, + parameters: Map? = null, + ): SyncStream + /** * Close the sync connection. * diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt index 2fe4c042..11bba3ec 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt @@ -3,13 +3,15 @@ package com.powersync.bucket import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonArray @LegacySyncImplementation @Serializable internal data class BucketChecksum( val bucket: String, - val priority: BucketPriority = BucketPriority.DEFAULT_PRIORITY, + val priority: StreamPriority = StreamPriority.DEFAULT_PRIORITY, val checksum: Int, val count: Int? = null, @SerialName("last_op_id") val lastOpId: String? = null, + val subscriptions: JsonArray? = null, ) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index bf4fa151..8ea60a38 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -1,6 +1,7 @@ package com.powersync.bucket import com.powersync.db.SqlCursor +import com.powersync.db.StreamKey import com.powersync.db.crud.CrudEntry import com.powersync.db.internal.PowerSyncTransaction import com.powersync.db.schema.SerializableSchema @@ -9,6 +10,7 @@ import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult import com.powersync.utils.JsonUtil +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonObject @@ -49,7 +51,7 @@ internal interface BucketStorage { @LegacySyncImplementation suspend fun syncLocalDatabase( targetCheckpoint: Checkpoint, - partialPriority: BucketPriority? = null, + partialPriority: StreamPriority? = null, ): SyncLocalDatabaseResult suspend fun control(args: PowerSyncControlArguments): List @@ -65,6 +67,10 @@ internal sealed interface PowerSyncControlArguments { class Start( val parameters: JsonObject, val schema: SerializableSchema, + @SerialName("include_defaults") + val includeDefaults: Boolean, + @SerialName("active_streams") + val activeStreams: List, ) : PowerSyncControlArguments { override val sqlArguments: Pair get() = "start" to JsonUtil.json.encodeToString(this) @@ -99,6 +105,12 @@ internal sealed interface PowerSyncControlArguments { data object ResponseStreamEnd : PowerSyncControlArguments { override val sqlArguments: Pair = "connection" to "end" } + + class UpdateSubscriptions( + activeStreams: List, + ) : PowerSyncControlArguments { + override val sqlArguments: Pair = "update_subscriptions" to JsonUtil.json.encodeToString(activeStreams) + } } @Serializable diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index b9c9e58a..15b126b2 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -196,7 +196,7 @@ internal class BucketStorageImpl( @LegacySyncImplementation override suspend fun syncLocalDatabase( targetCheckpoint: Checkpoint, - partialPriority: BucketPriority?, + partialPriority: StreamPriority?, ): SyncLocalDatabaseResult { val result = validateChecksums(targetCheckpoint, partialPriority) @@ -250,7 +250,7 @@ internal class BucketStorageImpl( @LegacySyncImplementation private suspend fun validateChecksums( checkpoint: Checkpoint, - priority: BucketPriority? = null, + priority: StreamPriority? = null, ): SyncLocalDatabaseResult { val serializedCheckpoint = JsonUtil.json.encodeToString( @@ -286,11 +286,11 @@ internal class BucketStorageImpl( @LegacySyncImplementation private suspend fun updateObjectsFromBuckets( checkpoint: Checkpoint, - priority: BucketPriority? = null, + priority: StreamPriority? = null, ): Boolean { @Serializable data class SyncLocalArgs( - val priority: BucketPriority, + val priority: StreamPriority, val buckets: List, ) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt index 5dc4823b..9e3814a7 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt @@ -3,6 +3,7 @@ package com.powersync.bucket import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement @LegacySyncImplementation @Serializable @@ -10,6 +11,7 @@ internal data class Checkpoint( @SerialName("last_op_id") val lastOpId: String, @SerialName("buckets") val checksums: List, @SerialName("write_checkpoint") val writeCheckpoint: String? = null, + val streams: List? = null, ) { fun clone(): Checkpoint = Checkpoint(lastOpId, checksums, writeCheckpoint) } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt b/core/src/commonMain/kotlin/com/powersync/bucket/StreamPriority.kt similarity index 52% rename from core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt rename to core/src/commonMain/kotlin/com/powersync/bucket/StreamPriority.kt index 60073707..333e4274 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/StreamPriority.kt @@ -2,25 +2,29 @@ package com.powersync.bucket import kotlinx.serialization.Serializable import kotlin.jvm.JvmInline +import kotlin.jvm.JvmName + +@Deprecated("Use StreamPriority instead") +public typealias BucketPriority = StreamPriority @JvmInline @Serializable -public value class BucketPriority( +public value class StreamPriority( private val priorityCode: Int, -) : Comparable { +) : Comparable { init { require(priorityCode >= 0) } - override fun compareTo(other: BucketPriority): Int = other.priorityCode.compareTo(priorityCode) + override fun compareTo(other: StreamPriority): Int = other.priorityCode.compareTo(priorityCode) - public companion object { - internal val FULL_SYNC_PRIORITY: BucketPriority = BucketPriority(Int.MAX_VALUE) + public companion object Companion { + internal val FULL_SYNC_PRIORITY: StreamPriority = StreamPriority(Int.MAX_VALUE) /** * The assumed priority for buckets when talking to older sync service instances that don't * support bucket priorities. */ - internal val DEFAULT_PRIORITY: BucketPriority = BucketPriority(3) + internal val DEFAULT_PRIORITY: StreamPriority = StreamPriority(3) } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 2a02d86e..ddb8abe5 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -4,9 +4,9 @@ import co.touchlab.kermit.Logger import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase import com.powersync.PowerSyncException -import com.powersync.bucket.BucketPriority import com.powersync.bucket.BucketStorage import com.powersync.bucket.BucketStorageImpl +import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudBatch import com.powersync.db.crud.CrudEntry @@ -19,7 +19,8 @@ import com.powersync.db.internal.InternalTable import com.powersync.db.internal.PowerSyncVersion import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable -import com.powersync.sync.PriorityStatusEntry +import com.powersync.sync.CoreSyncStatus +import com.powersync.sync.StreamingSyncClient import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatus import com.powersync.sync.SyncStatusData @@ -42,11 +43,7 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -import kotlinx.datetime.LocalDateTime -import kotlinx.datetime.TimeZone -import kotlinx.datetime.toInstant import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.Instant /** * A PowerSync managed database. @@ -80,6 +77,7 @@ internal class PowerSyncDatabaseImpl( get() = activeDatabaseGroup.first.group.identifier private val resource = activeDatabaseGroup.first + private val streams = StreamTracker(this) private val internalDb = InternalDatabaseImpl(pool, logger) @@ -111,7 +109,7 @@ internal class PowerSyncDatabaseImpl( } updateSchemaInternal(schema) - updateHasSynced() + resolveOfflineSyncStatus() } private suspend fun waitReady() { @@ -150,7 +148,7 @@ internal class PowerSyncDatabaseImpl( disconnectInternal() connectInternal(crudThrottleMs) { scope -> - SyncStream( + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = suspend { connector.uploadData(this) }, @@ -160,6 +158,7 @@ internal class PowerSyncDatabaseImpl( uploadScope = scope, options = options, schema = schema, + activeSubscriptions = streams.currentlyReferencedStreams, ) } } @@ -167,12 +166,12 @@ internal class PowerSyncDatabaseImpl( private fun connectInternal( crudThrottleMs: Long, - createStream: (CoroutineScope) -> SyncStream, + createStream: (CoroutineScope) -> StreamingSyncClient, ) { val db = this val job = SupervisorJob(scope.coroutineContext[Job]) syncSupervisorJob = job - var activeStream: SyncStream? = null + var activeStream: StreamingSyncClient? = null scope.launch(job) { // Create the stream in this scope so that everything launched by the stream is bound to @@ -316,6 +315,11 @@ internal class PowerSyncDatabaseImpl( } } + override fun syncStream( + name: String, + parameters: Map?, + ): SyncStream = PendingStream(streams, name, parameters) + override suspend fun getPowerSyncVersion(): String { // The initialization sets powerSyncVersion. waitReady() @@ -466,57 +470,31 @@ internal class PowerSyncDatabaseImpl( currentStatus.update { copy(lastSyncedAt = null, hasSynced = false) } } - private suspend fun updateHasSynced() { - data class SyncedAt( - val priority: BucketPriority, - val syncedAt: Instant?, - ) - - // Query the database to see if any data has been synced - val syncedAtRows = - internalDb.getAll("SELECT * FROM ps_sync_state ORDER BY priority") { - val rawTime = it.getString(1)!! - - SyncedAt( - priority = BucketPriority(it.getLong(0)!!.toInt()), - syncedAt = - LocalDateTime - .parse(rawTime.replace(" ", "T")) - .toInstant(TimeZone.UTC), - ) + internal suspend fun resolveOfflineSyncStatusIfNotConnected() { + mutex.withLock { + if (syncSupervisorJob == null) { + // Not connected or connecting + resolveOfflineSyncStatus() } + } + } - val priorityStatus = mutableListOf() - var lastSyncedAt: Instant? = null - - for (row in syncedAtRows) { - if (row.priority == BucketPriority.FULL_SYNC_PRIORITY) { - lastSyncedAt = row.syncedAt - } else { - priorityStatus.add( - PriorityStatusEntry( - priority = row.priority, - lastSyncedAt = row.syncedAt, - hasSynced = true, - ), - ) + private suspend fun resolveOfflineSyncStatus() { + val offlineSyncStatus = + internalDb.get("SELECT powersync_offline_sync_status()") { + JsonUtil.json.decodeFromString(it.getString(0)!!) } - } currentStatus.update { - copy( - hasSynced = lastSyncedAt != null, - lastSyncedAt = lastSyncedAt, - priorityStatusEntries = priorityStatus, - ) + applyCoreChanges(offlineSyncStatus) } } override suspend fun waitForFirstSync() = waitForFirstSyncImpl(null) - override suspend fun waitForFirstSync(priority: BucketPriority) = waitForFirstSyncImpl(priority) + override suspend fun waitForFirstSync(priority: StreamPriority) = waitForFirstSyncImpl(priority) - private suspend fun waitForFirstSyncImpl(priority: BucketPriority?) { + private suspend fun waitForFirstSyncImpl(priority: StreamPriority?) { val predicate: (SyncStatusData) -> Boolean = if (priority == null) { { it.hasSynced == true } @@ -524,6 +502,10 @@ internal class PowerSyncDatabaseImpl( { it.statusForPriority(priority).hasSynced == true } } + waitForStatusMatching(predicate) + } + + internal suspend fun waitForStatusMatching(predicate: (SyncStatusData) -> Boolean) { if (predicate(currentStatus)) { return } diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt new file mode 100644 index 00000000..a18c4bba --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -0,0 +1,169 @@ +package com.powersync.db + +import com.powersync.bucket.StreamPriority +import com.powersync.db.crud.TypedRow +import com.powersync.sync.SyncStream +import com.powersync.sync.SyncStreamSubscription +import com.powersync.utils.JsonParam +import com.powersync.utils.toJsonObject +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlin.time.Duration + +internal class StreamTracker( + val db: PowerSyncDatabaseImpl, +) { + val groupMutex = Mutex() + val streamGroups = mutableMapOf() + val currentlyReferencedStreams = MutableStateFlow(listOf()) + + private suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { + db.writeTransaction { tx -> + tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) + } + db.resolveOfflineSyncStatusIfNotConnected() + } + + internal suspend fun subscribe( + stream: PendingStream, + ttl: Duration?, + priority: StreamPriority?, + ): SyncStreamSubscription { + val key = stream.key + subscriptionsCommand( + RustSubscriptionChangeRequest( + subscribe = + SubscribeToStream( + stream = key, + ttl = ttl?.inWholeSeconds?.toInt(), + priority = priority, + ), + ), + ) + + return groupMutex.withLock { + var didAddNewGroup = false + val group = + streamGroups.getOrPut(key) { + didAddNewGroup = true + SubscriptionGroup(this, key) + } + + if (didAddNewGroup) { + val updatedStreams = streamGroups.values.toList() + currentlyReferencedStreams.value = updatedStreams + } + + SubscriptionImplementation(group) + } + } + + internal fun removeStreamGroup(key: StreamKey) { + streamGroups.remove(key)?.also { it.active = false } + currentlyReferencedStreams.value = streamGroups.values.toList() + } + + private companion object { + private val jsonDontEncodeDefaults = + Json { + // We don't want to encode defaults so that the RustSubscriptionChangeRequest encodes to the + // correct enum structure with only one field set. + encodeDefaults = false + } + } +} + +internal class PendingStream( + private val tracker: StreamTracker, + override val name: String, + val userParameters: Map?, +) : SyncStream { + override val parameters: Map? + get() { + val obj = userParameters?.toJsonObject() ?: return null + return TypedRow(obj) + } + + val key: StreamKey get() { + val jsonParameters = userParameters?.toJsonObject() + return StreamKey(name, jsonParameters) + } + + override suspend fun subscribe( + ttl: Duration?, + priority: StreamPriority?, + ): SyncStreamSubscription = tracker.subscribe(this, ttl, priority) + + override suspend fun unsubscribeAll() { + tracker.groupMutex.withLock { + tracker.removeStreamGroup(key) + } + } +} + +internal class SubscriptionGroup( + val tracker: StreamTracker, + val key: StreamKey, + var refcount: Int = 0, + var active: Boolean = true, +) { + suspend fun decrementRefCount() { + tracker.groupMutex.withLock { + refcount-- + if (refcount == 0 && active) { + tracker.removeStreamGroup(key) + } + } + } +} + +private class SubscriptionImplementation( + val group: SubscriptionGroup, +) : SyncStreamSubscription { + init { + group.refcount++ + } + + private var subscribed = false + + override val name: String + get() = group.key.name + + override val parameters: Map? = group.key.params?.let { TypedRow(it) } + + override suspend fun waitForFirstSync() { + group.tracker.db.waitForStatusMatching { it.forStream(this)?.subscription?.hasSynced == true } + } + + override suspend fun unsubscribe() { + if (subscribed) { + subscribed = false + group.decrementRefCount() + } + } +} + +@Serializable +internal class RustSubscriptionChangeRequest( + // this is actually an enum with associated data, but this serializes into the form we want + // when only a single field is set. + val subscribe: SubscribeToStream? = null, + val unsubscribe: StreamKey? = null, +) + +@Serializable +internal class SubscribeToStream( + val stream: StreamKey, + val ttl: Int? = null, + val priority: StreamPriority? = null, +) + +@Serializable +internal data class StreamKey( + val name: String, + val params: JsonObject?, +) diff --git a/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt b/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt index 43e7e127..78c384c7 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt @@ -1,5 +1,13 @@ package com.powersync.db.crud +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.serialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonNull import kotlinx.serialization.json.JsonObject @@ -8,6 +16,7 @@ import kotlinx.serialization.json.contentOrNull import kotlinx.serialization.json.jsonPrimitive import kotlin.experimental.ExperimentalObjCRefinement import kotlin.native.HiddenFromObjC +import kotlin.time.Instant /** * A named collection of values as they appear in a SQLite row. @@ -55,14 +64,29 @@ private data class ToStringEntry( get() = inner.value.jsonPrimitive.contentOrNull } -private class TypedRow( - inner: JsonObject, +@Serializable(with = TypedRow.Serializer::class) +internal class TypedRow( + private val inner: JsonObject, ) : AbstractMap() { override val entries: Set> = inner.entries.mapTo( mutableSetOf(), ::ToTypedEntry, ) + + private object Serializer : KSerializer { + override val descriptor: SerialDescriptor + get() = serialDescriptor() + + override fun deserialize(decoder: Decoder): TypedRow = TypedRow(JsonObject.serializer().deserialize(decoder)) + + override fun serialize( + encoder: Encoder, + value: TypedRow, + ) { + encoder.encodeSerializableValue(JsonObject.serializer(), value.inner) + } + } } private data class ToTypedEntry( diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt b/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt index ced90401..fb0bbe65 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt @@ -1,6 +1,7 @@ package com.powersync.sync -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority +import com.powersync.db.crud.TypedRow import kotlinx.serialization.KSerializer import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -46,7 +47,11 @@ internal sealed interface Instruction { data object FlushSileSystem : Instruction - data object CloseSyncStream : Instruction + @Serializable + data class CloseSyncStream( + @SerialName("hide_disconnect") + val hideDisconnect: Boolean, + ) : Instruction data object DidCompleteSync : Instruction @@ -60,7 +65,7 @@ internal sealed interface Instruction { private val establishSyncStream = serializer() private val fetchCredentials = serializer() private val flushFileSystem = serializer() - private val closeSyncStream = serializer() + private val closeSyncStream = serializer() private val didCompleteSync = serializer() override val descriptor = @@ -88,7 +93,6 @@ internal sealed interface Instruction { } 5 -> { decodeSerializableElement(descriptor, 5, closeSyncStream) - CloseSyncStream } 6 -> { decodeSerializableElement(descriptor, 6, didCompleteSync) @@ -127,8 +131,31 @@ internal data class CoreSyncStatus( val downloading: CoreDownloadProgress?, @SerialName("priority_status") val priorityStatus: List, + val streams: List, ) +@Serializable +internal data class CoreActiveStreamSubscription( + override val name: String, + override val parameters: TypedRow?, + val priority: StreamPriority?, + val progress: ProgressInfo, + override val active: Boolean, + @SerialName("is_default") + override val isDefault: Boolean, + @SerialName("has_explicit_subscription") + override val hasExplicitSubscription: Boolean, + @SerialName("expires_at") + @Serializable(with = InstantTimestampSerializer::class) + override val expiresAt: Instant?, + @SerialName("last_synced_at") + @Serializable(with = InstantTimestampSerializer::class) + override val lastSyncedAt: Instant?, +) : SyncSubscriptionDescription { + override val hasSynced: Boolean + get() = lastSyncedAt != null +} + @Serializable internal data class CoreDownloadProgress( val buckets: Map, @@ -136,7 +163,7 @@ internal data class CoreDownloadProgress( @Serializable internal data class CoreBucketProgress( - val priority: BucketPriority, + val priority: StreamPriority, @SerialName("at_last") val atLast: Long, @SerialName("since_last") @@ -147,7 +174,7 @@ internal data class CoreBucketProgress( @Serializable internal data class CorePriorityStatus( - val priority: BucketPriority, + val priority: StreamPriority, @SerialName("last_synced_at") @Serializable(with = InstantTimestampSerializer::class) val lastSyncedAt: Instant?, diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt index d260624c..ae5833f2 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -1,8 +1,10 @@ package com.powersync.sync -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.LocalOperationCounters +import com.powersync.bucket.StreamPriority +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable import kotlin.math.min /** @@ -41,8 +43,11 @@ public interface ProgressWithOperations { } } +@Serializable internal data class ProgressInfo( + @SerialName("downloaded") override val downloadedOperations: Int, + @SerialName("total") override val totalOperations: Int, ) : ProgressWithOperations @@ -73,7 +78,7 @@ public data class SyncDownloadProgress internal constructor( override val totalOperations: Int init { - val (target, completed) = targetAndCompletedCounts(BucketPriority.FULL_SYNC_PRIORITY) + val (target, completed) = targetAndCompletedCounts(StreamPriority.FULL_SYNC_PRIORITY) totalOperations = target downloadedOperations = completed } @@ -127,7 +132,7 @@ public data class SyncDownloadProgress internal constructor( * The returned [ProgressWithOperations] instance tracks the target amount of operations that need to be downloaded * in total and how many of them have already been received. */ - public fun untilPriority(priority: BucketPriority): ProgressWithOperations { + public fun untilPriority(priority: StreamPriority): ProgressWithOperations { val (total, completed) = targetAndCompletedCounts(priority) return ProgressInfo(totalOperations = total, downloadedOperations = completed) } @@ -150,7 +155,7 @@ public data class SyncDownloadProgress internal constructor( }, ) - private fun targetAndCompletedCounts(priority: BucketPriority): Pair = + private fun targetAndCompletedCounts(priority: StreamPriority): Pair = buckets.values .asSequence() .filter { it.priority >= priority } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Stream.kt b/core/src/commonMain/kotlin/com/powersync/sync/Stream.kt new file mode 100644 index 00000000..c5977157 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/Stream.kt @@ -0,0 +1,102 @@ +package com.powersync.sync + +import com.powersync.bucket.StreamPriority +import kotlin.time.Duration +import kotlin.time.Instant + +public interface SyncStreamDescription { + /** + * THe name of the stream as it appears in the stream definition for the PowerSync service. + */ + public val name: String + + /** + * The parameters used to subscribe to the stream, if any. + * + * The same stream can be subscribed to multiple times with different parameters. + */ + public val parameters: Map? +} + +/** + * Information about a subscribed sync stream. + * + * This includes the [SyncStreamDescription] along with information about the current sync status. + */ +public interface SyncSubscriptionDescription : SyncStreamDescription { + /** + * Whether this stream is active, meaning that the subscription has been acknowledged by the + * sync service. + */ + public val active: Boolean + + /** + * Whether this stream subscription is included yb default, regardless of whether the stream has + * explicitly been subscribed to or not. + * + * Default streams are created by applying `auto_subscribe: true` in their definition on the + * sync service. + * + * It's possible for both [isDefault] and [hasExplicitSubscription] to be true at the same time. + * This happens when a default stream was subscribed explicitly. + */ + public val isDefault: Boolean + + /** + * Whether this stream been subscribed to explicitly. + * + * It's possible for both [isDefault] and [hasExplicitSubscription] to be true at the same time. + * This happens when a default stream was subscribed explicitly. + */ + public val hasExplicitSubscription: Boolean + + /** + * For sync streams that have a time-to-live, the current time at which the stream would expire + * if not subscribed to again. + */ + public val expiresAt: Instant? + + /** + * Whether this stream subscription has been synced at least once. + */ + public val hasSynced: Boolean + + /** + * If [hasSynced] is true, the last time data from this stream has been synced. + */ + public val lastSyncedAt: Instant? +} + +/** + * A handle to a [SyncStreamDescription] that allows subscribing to the stream. + * + * To obtain an instance of [SyncStream], call [com.powersync.PowerSyncDatabase.syncStream]. + */ +public interface SyncStream : SyncStreamDescription { + public suspend fun subscribe( + ttl: Duration? = null, + priority: StreamPriority? = null, + ): SyncStreamSubscription + + public suspend fun unsubscribeAll() +} + +/** + * A [SyncStream] that has been subscribed to. + */ +public interface SyncStreamSubscription : SyncStreamDescription { + /** + * A variant of [com.powersync.PowerSyncDatabase.waitForFirstSync] that is specific to this + * stream subscription. + */ + public suspend fun waitForFirstSync() + + /** + * Removes this subscription. + * + * Once all [SyncStreamSubscription]s for a [SyncStream] have been unsubscribed, the `ttl` for + * that stream starts running. When it expires without subscribing again, the stream will be + * evicted. + */ + public suspend fun unsubscribe() +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt similarity index 93% rename from core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt rename to core/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt index 00ead40f..10997a34 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt @@ -12,10 +12,11 @@ import com.powersync.bucket.Checkpoint import com.powersync.bucket.PowerSyncControlArguments import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.db.SubscriptionGroup import com.powersync.db.crud.CrudEntry import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable -import com.powersync.sync.SyncStream.Companion.SOCKET_TIMEOUT +import com.powersync.sync.StreamingSyncClient.Companion.SOCKET_TIMEOUT import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig @@ -52,6 +53,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map @@ -102,7 +104,7 @@ public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userA } @OptIn(ExperimentalPowerSyncAPI::class) -internal class SyncStream( +internal class StreamingSyncClient( private val bucketStorage: BucketStorage, private val connector: PowerSyncBackendConnector, private val uploadCrud: suspend () -> Unit, @@ -112,6 +114,7 @@ internal class SyncStream( private val uploadScope: CoroutineScope, private val options: SyncOptions, private val schema: Schema, + private val activeSubscriptions: StateFlow>, ) { private var isUploadingCrud = AtomicReference(null) private var completedCrudUploads = Channel(onBufferOverflow = BufferOverflow.DROP_OLDEST) @@ -148,8 +151,14 @@ internal class SyncStream( suspend fun streamingSync() { var invalidCredentials = false clientId = bucketStorage.getClientId() + var result = SyncIterationResult() + while (true) { - status.update { copy(connecting = true) } + if (!result.hideDisconnectStateAndReconnectImmediately) { + status.update { copy(connecting = true) } + } + result = SyncIterationResult() + try { if (invalidCredentials) { // This may error. In that case it will be retried again on the next @@ -157,7 +166,7 @@ internal class SyncStream( connector.invalidateCredentials() invalidCredentials = false } - streamingSyncIteration() + result = streamingSyncIteration() } catch (e: Exception) { if (e is CancellationException) { throw e @@ -166,8 +175,10 @@ internal class SyncStream( logger.e("Error in streamingSync: ${e.message}") status.update { copy(downloadError = e) } } finally { - status.update { copy(connected = false, connecting = true, downloading = false) } - delay(retryDelayMs) + if (!result.hideDisconnectStateAndReconnectImmediately) { + status.update { copy(connected = false, connecting = true, downloading = false) } + delay(retryDelayMs) + } } } } @@ -329,7 +340,7 @@ internal class SyncStream( } } - private suspend fun streamingSyncIteration() { + private suspend fun streamingSyncIteration(): SyncIterationResult = coroutineScope { if (options.newClientImplementation) { val iteration = ActiveIteration(this) @@ -345,9 +356,9 @@ internal class SyncStream( } } else { legacySyncIteration() + SyncIterationResult() } } - } @OptIn(LegacySyncImplementation::class) private suspend fun CoroutineScope.legacySyncIteration() { @@ -363,26 +374,42 @@ internal class SyncStream( */ private inner class ActiveIteration( val scope: CoroutineScope, - var fetchLinesJob: Job? = null, - var credentialsInvalidation: Job? = null, ) { + var fetchLinesJob: Job? = null + var credentialsInvalidation: Job? = null + // Using a channel for control invocations so that they're handled by a single coroutine, // avoiding races between concurrent jobs like fetching credentials. private val controlInvocations = Channel() + private var result = SyncIterationResult() private suspend fun invokeControl(args: PowerSyncControlArguments) { val instructions = bucketStorage.control(args) instructions.forEach { handleInstruction(it) } } - suspend fun start() { + suspend fun start(): SyncIterationResult { + var subscriptions = activeSubscriptions.value + invokeControl( PowerSyncControlArguments.Start( parameters = params, schema = schema.toSerializable(), + includeDefaults = options.includeDefaultStreams, + activeStreams = subscriptions.map { it.key }, ), ) + val listenForUpdatedSubscriptions = + scope.launch { + activeSubscriptions.collect { + if (subscriptions !== it) { + subscriptions = it + controlInvocations.send(PowerSyncControlArguments.UpdateSubscriptions(activeSubscriptions.value.map { it.key })) + } + } + } + var hadSyncLine = false for (line in controlInvocations) { val instructions = bucketStorage.control(line) @@ -398,6 +425,9 @@ internal class SyncStream( triggerCrudUploadAsync() } } + + listenForUpdatedSubscriptions.cancel() + return result } suspend fun stop() { @@ -429,8 +459,10 @@ internal class SyncStream( } } - Instruction.CloseSyncStream -> { - logger.v { "Closing sync stream connection" } + is Instruction.CloseSyncStream -> { + val hideDisconnect = instruction.hideDisconnect + logger.v { "Closing sync stream connection. Hide disconnect: $hideDisconnect" } + result = SyncIterationResult(hideDisconnect) fetchLinesJob!!.cancelAndJoin() fetchLinesJob = null logger.v { "Sync stream connection shut down" } @@ -771,7 +803,7 @@ internal class SyncStream( } } - internal companion object { + internal companion object Companion { // The sync service sends a token keepalive message roughly every 20 seconds. So if we don't receive a message // in twice that time, assume the connection is broken. internal const val SOCKET_TIMEOUT: Long = 40_000 @@ -854,3 +886,7 @@ internal data class SyncStreamState( private class PendingCrudUpload( val done: CompletableDeferred, ) + +private class SyncIterationResult( + val hideDisconnectStateAndReconnectImmediately: Boolean = false, +) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt index a9a9a4f3..557b8346 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt @@ -1,9 +1,9 @@ package com.powersync.sync import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OplogEntry +import com.powersync.bucket.StreamPriority import kotlinx.serialization.KSerializer import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -39,7 +39,7 @@ internal sealed interface SyncLine { @Serializable data class CheckpointPartiallyComplete( @SerialName("last_op_id") val lastOpId: String, - @SerialName("priority") val priority: BucketPriority, + @SerialName("priority") val priority: StreamPriority, ) : SyncLine @Serializable diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt index c8c89f5b..30902d3d 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt @@ -57,6 +57,11 @@ public class SyncOptions * Allows configuring the [HttpClient] used for connecting to the PowerSync service. */ public val clientConfiguration: SyncClientConfiguration? = null, + /** + * Whether streams that have been defined with `auto_subscribe: true` should be synced even + * when they don't have an explicit subscription. + */ + public val includeDefaultStreams: Boolean = true, ) { public companion object { /** diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 180e1429..8ae3d48c 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -1,6 +1,6 @@ package com.powersync.sync -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow @@ -10,32 +10,32 @@ import kotlin.time.Instant @ConsistentCopyVisibility public data class PriorityStatusEntry internal constructor( - val priority: BucketPriority, + val priority: StreamPriority, val lastSyncedAt: Instant?, val hasSynced: Boolean?, ) -public interface SyncStatusData { +public sealed class SyncStatusData { /** * true if currently connected. * * This means the PowerSync connection is ready to download, and [PowerSyncBackendConnector.uploadData] may be called for any local changes. */ - public val connected: Boolean + public abstract val connected: Boolean /** * true if the PowerSync connection is busy connecting. * * During this stage, [PowerSyncBackendConnector.uploadData] may already be called, and [uploading] may be true. */ - public val connecting: Boolean + public abstract val connecting: Boolean /** * true if actively downloading changes. * * This is only true when [connected] is also true. */ - public val downloading: Boolean + public abstract val downloading: Boolean /** * Realtime progress information about downloaded operations during an active sync. @@ -44,45 +44,45 @@ public interface SyncStatusData { * For more information on what progress is reported, see [SyncDownloadProgress]. * This value will be non-null only if [downloading] is true. */ - public val downloadProgress: SyncDownloadProgress? + public abstract val downloadProgress: SyncDownloadProgress? /** * true if uploading changes */ - public val uploading: Boolean + public abstract val uploading: Boolean /** * Time that a last sync has fully completed, if any. * * Currently this is reset to null after a restart. */ - public val lastSyncedAt: Instant? + public abstract val lastSyncedAt: Instant? /** * Indicates whether there has been at least one full sync, if any. * * Is null when unknown, for example when state is still being loaded from the database. */ - public val hasSynced: Boolean? + public abstract val hasSynced: Boolean? /** * Error during uploading. * * Cleared on the next successful upload. */ - public val uploadError: Any? + public abstract val uploadError: Any? /** * Error during downloading (including connecting). * * Cleared on the next successful data download. */ - public val downloadError: Any? + public abstract val downloadError: Any? /** * Convenience getter for either the value of downloadError or uploadError */ - public val anyError: Any? + public abstract val anyError: Any? /** * Available [PriorityStatusEntry] reporting the sync status for buckets within priorities. @@ -91,12 +91,14 @@ public interface SyncStatusData { * and [lastSyncedAt] are set to indicate that a partial (but no complete) sync has completed. * A completed [PriorityStatusEntry] at one priority level always includes all higher priorities too. */ - public val priorityStatusEntries: List + public abstract val priorityStatusEntries: List + + internal abstract val internalSubscriptions: List? /** * Status information for whether buckets in [priority] have been synchronized. */ - public fun statusForPriority(priority: BucketPriority): PriorityStatusEntry { + public fun statusForPriority(priority: StreamPriority): PriorityStatusEntry { val byDescendingPriorities = priorityStatusEntries.sortedByDescending { it.priority } for (entry in byDescendingPriorities) { @@ -110,6 +112,36 @@ public interface SyncStatusData { // A complete sync necessarily includes all priorities. return PriorityStatusEntry(priority, lastSyncedAt, hasSynced) } + + /** + * All sync streams currently being tracked in the database. + * + * This returns null when the database is currently being opened and we don't have reliable + * information about included streams yet. + */ + public val syncStreams: List? get() = internalSubscriptions?.map(this::exposeStreamStatus) + + /** + * Status information for [stream], if it's a stream that is currently tracked by the sync + * client. + */ + public fun forStream(stream: SyncStreamDescription): SyncStreamStatus? { + val raw = internalSubscriptions?.firstOrNull { it.name == stream.name && it.parameters == stream.parameters } ?: return null + return exposeStreamStatus(raw) + } + + private fun exposeStreamStatus(internal: CoreActiveStreamSubscription): SyncStreamStatus { + val progress = + if (this.downloadProgress == null) { + null + } else { + // The core extension will always give us progress numbers, but we should only expose + // them when that makes sense (i.e. we're actually downloading). + internal.progress + } + + return SyncStreamStatus(progress, internal) + } } internal data class SyncStatusDataContainer( @@ -123,12 +155,13 @@ internal data class SyncStatusDataContainer( override val uploadError: Any? = null, override val downloadError: Any? = null, override val priorityStatusEntries: List = emptyList(), -) : SyncStatusData { + override val internalSubscriptions: List = emptyList(), +) : SyncStatusData() { override val anyError get() = downloadError ?: uploadError internal fun applyCoreChanges(status: CoreSyncStatus): SyncStatusDataContainer { - val completeSync = status.priorityStatus.firstOrNull { it.priority == BucketPriority.FULL_SYNC_PRIORITY } + val completeSync = status.priorityStatus.firstOrNull { it.priority == StreamPriority.FULL_SYNC_PRIORITY } return copy( connected = status.connected, @@ -145,6 +178,7 @@ internal data class SyncStatusDataContainer( hasSynced = it.hasSynced, ) }, + internalSubscriptions = status.streams, ) } @@ -169,7 +203,7 @@ internal data class SyncStatusDataContainer( @ConsistentCopyVisibility public data class SyncStatus internal constructor( private var data: SyncStatusDataContainer = SyncStatusDataContainer(), -) : SyncStatusData { +) : SyncStatusData() { private val stateFlow: MutableStateFlow = MutableStateFlow(data) /** @@ -224,6 +258,9 @@ public data class SyncStatus internal constructor( override val priorityStatusEntries: List get() = data.priorityStatusEntries + override val internalSubscriptions: List + get() = data.internalSubscriptions + override fun toString(): String = "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, hasSynced=$hasSynced, error=$anyError)" @@ -231,3 +268,30 @@ public data class SyncStatus internal constructor( public fun empty(): SyncStatus = SyncStatus() } } + +/** + * Current information about a [SyncStreamSubscription]. + */ +@ConsistentCopyVisibility +public data class SyncStreamStatus internal constructor( + /** + * If the sync status is currently downloading, information about download progress related to + * this stream. + */ + val progress: ProgressWithOperations?, + internal val internal: CoreActiveStreamSubscription, +) { + /** + * The [SyncSubscriptionDescription] providing information about the subscription. + */ + val subscription: SyncSubscriptionDescription + get() = internal + + /** + * The priority of this stream. + * + * New data on higher-priority streams can interrupt low-priority streams. + */ + val priority: StreamPriority + get() = internal.priority ?: StreamPriority.FULL_SYNC_PRIORITY +} diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt index 5236d1e0..c5fff921 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt @@ -1,8 +1,8 @@ package com.powersync.sync import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint +import com.powersync.bucket.StreamPriority import com.powersync.utils.JsonUtil import kotlin.test.Test import kotlin.test.assertEquals @@ -35,7 +35,7 @@ class SyncLineTest { SyncLine.FullCheckpoint( Checkpoint( lastOpId = "10", - checksums = listOf(BucketChecksum(bucket = "a", priority = BucketPriority(3), checksum = 10)), + checksums = listOf(BucketChecksum(bucket = "a", priority = StreamPriority(3), checksum = 10)), ), ), """{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "checksum": 10}]}}""", @@ -48,7 +48,7 @@ class SyncLineTest { SyncLine.FullCheckpoint( Checkpoint( lastOpId = "10", - checksums = listOf(BucketChecksum(bucket = "a", priority = BucketPriority(1), checksum = 10)), + checksums = listOf(BucketChecksum(bucket = "a", priority = StreamPriority(1), checksum = 10)), ), ), """{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "priority": 1, "checksum": 10}]}}""", @@ -77,7 +77,7 @@ class SyncLineTest { checkDeserializing( SyncLine.CheckpointPartiallyComplete( lastOpId = "10", - priority = BucketPriority(1), + priority = StreamPriority(1), ), """{"partial_checkpoint_complete": {"last_op_id": "10", "priority": 1}}""", ) diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 3872da0a..5592dfa6 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -13,7 +13,7 @@ import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import com.powersync.db.schema.Schema -import com.powersync.sync.SyncStream.Companion.bsonObjects +import com.powersync.sync.StreamingSyncClient.Companion.bsonObjects import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock @@ -24,6 +24,7 @@ import io.ktor.client.engine.mock.MockEngine import io.ktor.utils.io.ByteChannel import io.ktor.utils.io.writeByteArray import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout @@ -34,10 +35,10 @@ import kotlin.test.assertContains import kotlin.test.assertEquals @OptIn(ExperimentalKermitApi::class, ExperimentalPowerSyncAPI::class) -class SyncStreamTest { +class StreamingSyncClientTest { private lateinit var bucketStorage: BucketStorage private lateinit var connector: PowerSyncBackendConnector - private lateinit var syncStream: SyncStream + private lateinit var streamingSyncClient: StreamingSyncClient private val testLogWriter = TestLogWriter( loggable = Severity.Verbose, @@ -66,8 +67,8 @@ class SyncStreamTest { @Test fun testInvalidateCredentials() = runTest { - syncStream = - SyncStream( + streamingSyncClient = + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = {}, @@ -84,10 +85,11 @@ class SyncStreamTest { ), ), schema = Schema(), + activeSubscriptions = MutableStateFlow(emptyList()), ) connector.cachedCredentials = TestConnector.testCredentials - syncStream.invalidateCredentials() + streamingSyncClient.invalidateCredentials() connector.cachedCredentials shouldBe null } @@ -109,8 +111,8 @@ class SyncStreamTest { everySuspend { nextCrudItem() } returns mockCrudEntry } - syncStream = - SyncStream( + streamingSyncClient = + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = { }, @@ -128,10 +130,11 @@ class SyncStreamTest { ), ), schema = Schema(), + activeSubscriptions = MutableStateFlow(emptyList()), ) - syncStream.status.update { copy(connected = true) } - syncStream.triggerCrudUploadAsync().join() + streamingSyncClient.status.update { copy(connected = true) } + streamingSyncClient.triggerCrudUploadAsync().join() testLogWriter.assertCount(2) @@ -160,8 +163,8 @@ class SyncStreamTest { everySuspend { getClientId() } returns "test-client-id" } - syncStream = - SyncStream( + streamingSyncClient = + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = { }, @@ -179,24 +182,25 @@ class SyncStreamTest { ), ), schema = Schema(), + activeSubscriptions = MutableStateFlow(emptyList()), ) // Launch streaming sync in a coroutine that we'll cancel after verification val job = launch { - syncStream.streamingSync() + streamingSyncClient.streamingSync() } // Wait for status to update withTimeout(1000) { - while (!syncStream.status.connecting) { + while (!streamingSyncClient.status.connecting) { delay(10) } } // Verify initial state - assertEquals(true, syncStream.status.connecting) - assertEquals(false, syncStream.status.connected) + assertEquals(true, streamingSyncClient.status.connecting) + assertEquals(false, streamingSyncClient.status.connected) // Clean up job.cancel() diff --git a/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt b/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt index 2b95c3e3..0cb1b4a1 100644 --- a/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt +++ b/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt @@ -13,7 +13,7 @@ import androidx.compose.runtime.Composable import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.demos.Screen import com.powersync.demos.components.Input import com.powersync.demos.components.ListContent @@ -66,7 +66,7 @@ internal fun HomeScreen( // When giving lists a higher priority than items, we can have a consistent snapshot of // lists without items. In the case where many items exist (that might take longer to // sync initially), this allows us to display lists earlier. - if (status.statusForPriority(BucketPriority(1)).hasSynced == true) { + if (status.statusForPriority(StreamPriority(1)).hasSynced == true) { ListContent( items = items, onItemClicked = onItemClicked, diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt index 52e5b521..9ebd9596 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt @@ -15,7 +15,7 @@ import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.unit.dp import com.powersync.PowerSyncDatabase -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.compose.composeState import com.powersync.sync.SyncStatusData import org.koin.compose.koinInject @@ -28,7 +28,7 @@ import org.koin.compose.koinInject @Composable fun GuardBySync( db: PowerSyncDatabase = koinInject(), - priority: BucketPriority? = null, + priority: StreamPriority? = null, content: @Composable () -> Unit ) { val state: SyncStatusData by db.currentStatus.composeState() diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt index 4e53852e..97eb9dce 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt @@ -1,22 +1,18 @@ package com.powersync.demos.screens -import androidx.compose.foundation.background import androidx.compose.foundation.layout.Box import androidx.compose.foundation.layout.Column import androidx.compose.foundation.layout.Spacer -import androidx.compose.foundation.layout.fillMaxSize import androidx.compose.foundation.layout.fillMaxWidth import androidx.compose.foundation.layout.padding import androidx.compose.foundation.layout.width -import androidx.compose.material.MaterialTheme import androidx.compose.material.Text import androidx.compose.material.TopAppBar import androidx.compose.runtime.Composable -import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.demos.Screen import com.powersync.demos.components.GuardBySync import com.powersync.demos.components.Input @@ -64,7 +60,7 @@ internal fun HomeScreen( // than items, we can have a consistent snapshot of lists without items. In the case where // many items exist (that might take longer to sync initially), this allows us to display // lists earlier. - GuardBySync(priority = BucketPriority(1)) { + GuardBySync(priority = StreamPriority(1)) { Input( text = inputText, onAddClicked = onAddItemClicked, From 536593022bbd9c371844e9f35f9e3fa6031fcf3c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 9 Sep 2025 12:19:18 +0200 Subject: [PATCH 2/7] New tests --- .../com/powersync/sync/SyncProgressTest.kt | 14 +- .../com/powersync/sync/SyncStreamTest.kt | 129 +++++++++++++----- .../com/powersync/testutils/TestUtils.kt | 18 --- .../com/powersync/bucket/BucketChecksum.kt | 2 - .../kotlin/com/powersync/bucket/Checkpoint.kt | 1 - .../kotlin/com/powersync/db/StreamImpl.kt | 3 +- ...reamTest.kt => StreamingSyncClientTest.kt} | 0 .../powersync/testutils/MockSyncService.kt | 5 + 8 files changed, 114 insertions(+), 58 deletions(-) rename core/src/commonTest/kotlin/com/powersync/sync/{SyncStreamTest.kt => StreamingSyncClientTest.kt} (100%) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index a4ee9b34..c53c1e5a 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -3,12 +3,12 @@ package com.powersync.sync import app.cash.turbine.ReceiveTurbine import app.cash.turbine.turbineScope import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry import com.powersync.bucket.StreamPriority import com.powersync.testutils.ActiveDatabaseTest -import com.powersync.testutils.bucket import com.powersync.testutils.databaseTest import com.powersync.testutils.waitFor import io.kotest.assertions.withClue @@ -33,6 +33,18 @@ abstract class BaseSyncProgressTest( lastOpId = 0 } + private fun bucket( + name: String, + count: Int, + priority: StreamPriority = StreamPriority(3), + ): BucketChecksum = + BucketChecksum( + bucket = name, + priority = priority, + checksum = 0, + count = count, + ) + private suspend fun ActiveDatabaseTest.addDataLine( bucket: String, amount: Int, diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 5302d1f0..1a8ed4f4 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -2,9 +2,7 @@ package com.powersync.sync import app.cash.turbine.turbineScope import com.powersync.ExperimentalPowerSyncAPI -import com.powersync.bucket.Checkpoint import com.powersync.bucket.StreamPriority -import com.powersync.testutils.bucket import com.powersync.testutils.databaseTest import com.powersync.testutils.waitFor import com.powersync.utils.JsonParam @@ -15,6 +13,7 @@ import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.shouldBe import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.buildJsonArray @@ -80,31 +79,26 @@ class SyncStreamTest : AbstractSyncTest(true) { } syncLines.send( - SyncLine.FullCheckpoint( - Checkpoint( - lastOpId = "1", - checksums = - listOf( - bucket( - "a", - 0, - subscriptions = - buildJsonArray { - add(defaultSubscription(0)) - }, - ), - bucket( - "b", - 0, - priority = StreamPriority(1), - subscriptions = - buildJsonArray { - add(defaultSubscription(1)) - }, - ), - ), - streams = listOf(stream("stream", false)), + checkpointLine( + listOf( + bucket( + "a", + 3, + subscriptions = + buildJsonArray { + add(defaultSubscription(0)) + }, + ), + bucket( + "b", + 1, + subscriptions = + buildJsonArray { + add(defaultSubscription(1)) + }, + ), ), + listOf(stream("stream", false)), ), ) @@ -143,15 +137,7 @@ class SyncStreamTest : AbstractSyncTest(true) { val turbine = database.currentStatus.asFlow().testIn(this) turbine.waitFor { it.connected && !it.downloading } - syncLines.send( - SyncLine.FullCheckpoint( - Checkpoint( - lastOpId = "1", - checksums = listOf(), - streams = listOf(stream("default_stream", true)), - ), - ), - ) + syncLines.send(checkpointLine(listOf(), listOf(stream("default_stream", true)))) val status = turbine.awaitItem() status.syncStreams!! shouldHaveSingleElement { @@ -214,8 +200,81 @@ class SyncStreamTest : AbstractSyncTest(true) { turbine.cancelAndIgnoreRemainingEvents() } } + + @Test + fun `unsubscribing multiple times has no effect`() = + databaseTest { + val a = database.syncStream("a").subscribe() + val aAgain = database.syncStream("a").subscribe() + a.unsubscribe() + a.unsubscribe() + + // Pretend the streams are expired - they should still be requested because the core + // extension extends the lifetime of streams currently referenced before connecting. + database.execute("UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000") + + database.connect(connector, options = getOptions()) + database.waitForStatusMatching { it.connected } + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + subscriptions shouldHaveSize 1 + true + } + aAgain.unsubscribe() + } + + @Test + fun unsubscribeAll() = + databaseTest { + val a = database.syncStream("a").subscribe() + database.syncStream("a").unsubscribeAll() + + // Despite a being active, it should not be requested. + database.connect(connector, options = getOptions()) + database.waitForStatusMatching { it.connected } + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + subscriptions shouldHaveSize 0 + true + } + a.unsubscribe() + } } +@OptIn(ExperimentalSerializationApi::class) +private fun checkpointLine( + buckets: List, + streams: List, +): JsonObject = + buildJsonObject { + put("checkpoint", checkpoint(buckets, streams)) + } + +@OptIn(ExperimentalSerializationApi::class) +private fun checkpoint( + buckets: List, + streams: List, +): JsonObject = + buildJsonObject { + put("last_op_id", "0") + put("buckets", buildJsonArray { addAll(buckets) }) + put("streams", buildJsonArray { addAll(streams) }) + } + +private fun bucket( + name: String, + priority: Int, + subscriptions: JsonArray? = null, +): JsonObject = + buildJsonObject { + put("bucket", name) + put("priority", priority) + put("checksum", 0) + subscriptions?.let { put("subscriptions", it) } + } + private fun stream( name: String, isDefault: Boolean, diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 207de1a4..0b533cfd 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -11,8 +11,6 @@ import com.powersync.DatabaseDriverFactory import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncTestLogWriter import com.powersync.TestConnector -import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.StreamPriority import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.createPowerSyncDatabaseImpl @@ -28,7 +26,6 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.io.files.Path -import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonElement expect val factory: DatabaseDriverFactory @@ -156,18 +153,3 @@ internal class ActiveDatabaseTest( cleanup(path) } } - -internal fun bucket( - name: String, - count: Int, - priority: StreamPriority = StreamPriority(3), - checksum: Int = 0, - subscriptions: JsonArray? = null, -): BucketChecksum = - BucketChecksum( - bucket = name, - priority = priority, - checksum = checksum, - count = count, - subscriptions = subscriptions, - ) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt index 11bba3ec..30269d69 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt @@ -3,7 +3,6 @@ package com.powersync.bucket import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable -import kotlinx.serialization.json.JsonArray @LegacySyncImplementation @Serializable @@ -13,5 +12,4 @@ internal data class BucketChecksum( val checksum: Int, val count: Int? = null, @SerialName("last_op_id") val lastOpId: String? = null, - val subscriptions: JsonArray? = null, ) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt index 9e3814a7..4cf92bdf 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt @@ -11,7 +11,6 @@ internal data class Checkpoint( @SerialName("last_op_id") val lastOpId: String, @SerialName("buckets") val checksums: List, @SerialName("write_checkpoint") val writeCheckpoint: String? = null, - val streams: List? = null, ) { fun clone(): Checkpoint = Checkpoint(lastOpId, checksums, writeCheckpoint) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index a18c4bba..7d3251c2 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -21,7 +21,7 @@ internal class StreamTracker( val streamGroups = mutableMapOf() val currentlyReferencedStreams = MutableStateFlow(listOf()) - private suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { + suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { db.writeTransaction { tx -> tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) } @@ -101,6 +101,7 @@ internal class PendingStream( override suspend fun unsubscribeAll() { tracker.groupMutex.withLock { tracker.removeStreamGroup(key) + tracker.subscriptionsCommand(RustSubscriptionChangeRequest(unsubscribe = key)) } } } diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncClientTest.kt similarity index 100% rename from core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt rename to core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncClientTest.kt diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index 48cf0dfd..ea3cbf77 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -27,6 +27,7 @@ import io.ktor.utils.io.writer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.consume +import kotlinx.serialization.json.JsonElement /** * A mock HTTP engine providing sync lines read from a coroutines [ReceiveChannel]. @@ -69,6 +70,10 @@ internal class MockSyncService( val serializedLine = JsonUtil.json.encodeToString(line) channel.writeStringUtf8("$serializedLine\n") } + is JsonElement -> { + val serializedLine = JsonUtil.json.encodeToString(line) + channel.writeStringUtf8("$serializedLine\n") + } is ByteArray -> { channel.writeByteArray(line) } From 35c57c9dac45c308a87f8a517fef5cb5b17ae04e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 9 Sep 2025 12:23:54 +0200 Subject: [PATCH 3/7] Remove deprecated import --- .../kotlin/com/powersync/sync/SyncProgressTest.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index c53c1e5a..a3f1e0c2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -3,7 +3,6 @@ package com.powersync.sync import app.cash.turbine.ReceiveTurbine import app.cash.turbine.turbineScope import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry From 78658d7704bab78f9623afcbc441505027c84e52 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 9 Sep 2025 12:28:07 +0200 Subject: [PATCH 4/7] Fix SQLiter crash --- core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index 7d3251c2..bc3db857 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -23,7 +23,7 @@ internal class StreamTracker( suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { db.writeTransaction { tx -> - tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) + tx.get("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) {} } db.resolveOfflineSyncStatusIfNotConnected() } From e7a41831bf0d732c1c0286c4a7c76ae502925394 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 23 Sep 2025 11:34:08 +0200 Subject: [PATCH 5/7] Mark as experimental --- core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt | 3 ++- core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt | 2 ++ core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index 9a70cc5c..c56ac048 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -183,10 +183,11 @@ public interface PowerSyncDatabase : Queries { public suspend fun getPowerSyncVersion(): String /** - * Create a [SyncStream] instance for the given [name] and [parameters]. + * Create a [SyncStream] instance for the given [name] and [parameters]. * * Use [SyncStream.subscribe] on the returned instance to subscribe to the stream. */ + @ExperimentalPowerSyncAPI public fun syncStream( name: String, parameters: Map? = null, diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index bc3db857..270e542b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -1,5 +1,6 @@ package com.powersync.db +import com.powersync.ExperimentalPowerSyncAPI import com.powersync.bucket.StreamPriority import com.powersync.db.crud.TypedRow import com.powersync.sync.SyncStream @@ -136,6 +137,7 @@ private class SubscriptionImplementation( override val parameters: Map? = group.key.params?.let { TypedRow(it) } + @OptIn(ExperimentalPowerSyncAPI::class) override suspend fun waitForFirstSync() { group.tracker.db.waitForStatusMatching { it.forStream(this)?.subscription?.hasSynced == true } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 8ae3d48c..9bdc9354 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -1,5 +1,6 @@ package com.powersync.sync +import com.powersync.ExperimentalPowerSyncAPI import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow @@ -119,12 +120,14 @@ public sealed class SyncStatusData { * This returns null when the database is currently being opened and we don't have reliable * information about included streams yet. */ + @ExperimentalPowerSyncAPI public val syncStreams: List? get() = internalSubscriptions?.map(this::exposeStreamStatus) /** * Status information for [stream], if it's a stream that is currently tracked by the sync * client. */ + @ExperimentalPowerSyncAPI public fun forStream(stream: SyncStreamDescription): SyncStreamStatus? { val raw = internalSubscriptions?.firstOrNull { it.name == stream.name && it.parameters == stream.parameters } ?: return null return exposeStreamStatus(raw) From 245d0ce6e0a4033f943103954c7d675446e85a5f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 23 Sep 2025 11:37:55 +0200 Subject: [PATCH 6/7] Revert SQLiter workaround --- core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index 270e542b..d138d337 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -24,7 +24,7 @@ internal class StreamTracker( suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { db.writeTransaction { tx -> - tx.get("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) {} + tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) } db.resolveOfflineSyncStatusIfNotConnected() } From 2ee4fe2f4292e21f2c5aba51e98d433af507be97 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 23 Sep 2025 11:50:50 +0200 Subject: [PATCH 7/7] Fix unsubscribe doing nothing --- core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index d138d337..200a410b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -130,7 +130,7 @@ private class SubscriptionImplementation( group.refcount++ } - private var subscribed = false + private var subscribed = true override val name: String get() = group.key.name