From 58ce096144e3f68044aefd5fe6f4a1ee20f1455d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 8 Sep 2025 17:44:49 +0200 Subject: [PATCH 1/9] Support sync streams # Conflicts: # core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt # core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt --- .../com/powersync/sync/SyncIntegrationTest.kt | 14 +- .../com/powersync/sync/SyncProgressTest.kt | 31 +-- .../com/powersync/sync/SyncStreamTest.kt | 229 ++++++++++++++++++ .../com/powersync/testutils/TestUtils.kt | 18 ++ .../kotlin/com/powersync/PowerSyncDatabase.kt | 15 +- .../com/powersync/bucket/BucketChecksum.kt | 4 +- .../com/powersync/bucket/BucketStorage.kt | 14 +- .../com/powersync/bucket/BucketStorageImpl.kt | 8 +- .../kotlin/com/powersync/bucket/Checkpoint.kt | 2 + .../{BucketPriority.kt => StreamPriority.kt} | 16 +- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 82 +++---- .../kotlin/com/powersync/db/StreamImpl.kt | 169 +++++++++++++ .../com/powersync/db/crud/SerializedRow.kt | 28 ++- .../kotlin/com/powersync/sync/Instruction.kt | 39 ++- .../kotlin/com/powersync/sync/Progress.kt | 13 +- .../kotlin/com/powersync/sync/Stream.kt | 102 ++++++++ .../sync/{SyncStream.kt => StreamingSync.kt} | 64 +++-- .../kotlin/com/powersync/sync/SyncLine.kt | 4 +- .../kotlin/com/powersync/sync/SyncOptions.kt | 5 + .../kotlin/com/powersync/sync/SyncStatus.kt | 100 ++++++-- .../kotlin/com/powersync/sync/SyncLineTest.kt | 8 +- .../com/powersync/sync/SyncStreamTest.kt | 36 +-- .../androidexample/screens/HomeScreen.kt | 4 +- .../powersync/demos/components/GuardBySync.kt | 4 +- .../com/powersync/demos/screens/HomeScreen.kt | 8 +- 25 files changed, 849 insertions(+), 168 deletions(-) create mode 100644 core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt rename core/src/commonMain/kotlin/com/powersync/bucket/{BucketPriority.kt => StreamPriority.kt} (52%) create mode 100644 core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt create mode 100644 core/src/commonMain/kotlin/com/powersync/sync/Stream.kt rename core/src/commonMain/kotlin/com/powersync/sync/{SyncStream.kt => StreamingSync.kt} (93%) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 306db1f9..a1eb15d2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -7,10 +7,10 @@ import com.powersync.PowerSyncDatabase import com.powersync.PowerSyncException import com.powersync.TestConnector import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.StreamPriority import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector @@ -165,7 +165,7 @@ abstract class BaseSyncIntegrationTest( add( BucketChecksum( bucket = "bucket$prio", - priority = BucketPriority(prio), + priority = StreamPriority(prio), checksum = 10 + prio, ), ) @@ -218,7 +218,7 @@ abstract class BaseSyncIntegrationTest( // Emit a partial sync complete for each priority but the last. for (priorityNo in 0..<3) { - val priority = BucketPriority(priorityNo) + val priority = StreamPriority(priorityNo) pushData(priorityNo) syncLines.send( SyncLine.CheckpointPartiallyComplete( @@ -258,7 +258,7 @@ abstract class BaseSyncIntegrationTest( listOf( BucketChecksum( bucket = "bkt", - priority = BucketPriority(1), + priority = StreamPriority(1), checksum = 0, ), ), @@ -268,17 +268,17 @@ abstract class BaseSyncIntegrationTest( syncLines.send( SyncLine.CheckpointPartiallyComplete( lastOpId = "0", - priority = BucketPriority(1), + priority = StreamPriority(1), ), ) - database.waitForFirstSync(BucketPriority(1)) + database.waitForFirstSync(StreamPriority(1)) database.close() // Connect to the same database again database = openDatabaseAndInitialize() database.currentStatus.hasSynced shouldBe false - database.currentStatus.statusForPriority(BucketPriority(1)).hasSynced shouldBe true + database.currentStatus.statusForPriority(StreamPriority(1)).hasSynced shouldBe true } @Test diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index c2b3cb9e..a4ee9b34 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -3,11 +3,12 @@ package com.powersync.sync import app.cash.turbine.ReceiveTurbine import app.cash.turbine.turbineScope import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry +import com.powersync.bucket.StreamPriority import com.powersync.testutils.ActiveDatabaseTest +import com.powersync.testutils.bucket import com.powersync.testutils.databaseTest import com.powersync.testutils.waitFor import io.kotest.assertions.withClue @@ -32,18 +33,6 @@ abstract class BaseSyncProgressTest( lastOpId = 0 } - private fun bucket( - name: String, - count: Int, - priority: BucketPriority = BucketPriority(3), - ): BucketChecksum = - BucketChecksum( - bucket = name, - priority = priority, - checksum = 0, - count = count, - ) - private suspend fun ActiveDatabaseTest.addDataLine( bucket: String, amount: Int, @@ -68,7 +57,7 @@ abstract class BaseSyncProgressTest( ) } - private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) { + private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: StreamPriority? = null) { if (priority != null) { syncLines.send( SyncLine.CheckpointPartiallyComplete( @@ -93,7 +82,7 @@ abstract class BaseSyncProgressTest( private suspend fun ReceiveTurbine.expectProgress( total: Pair, - priorities: Map> = emptyMap(), + priorities: Map> = emptyMap(), ) { val item = awaitItem() val progress = item.downloadProgress ?: error("Expected download progress on $item") @@ -357,7 +346,7 @@ abstract class BaseSyncProgressTest( ) { turbine.expectProgress( prio2, - mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2), + mapOf(StreamPriority(0) to prio0, StreamPriority(2) to prio2), ) } @@ -367,8 +356,8 @@ abstract class BaseSyncProgressTest( lastOpId = "10", checksums = listOf( - bucket("a", 5, BucketPriority(0)), - bucket("b", 5, BucketPriority(2)), + bucket("a", 5, StreamPriority(0)), + bucket("b", 5, StreamPriority(2)), ), ), ), @@ -378,7 +367,7 @@ abstract class BaseSyncProgressTest( addDataLine("a", 5) expectProgress(5 to 5, 5 to 10) - addCheckpointComplete(BucketPriority(0)) + addCheckpointComplete(StreamPriority(0)) expectProgress(5 to 5, 5 to 10) addDataLine("b", 2) @@ -390,8 +379,8 @@ abstract class BaseSyncProgressTest( lastOpId = "14", updatedBuckets = listOf( - bucket("a", 8, BucketPriority(0)), - bucket("b", 6, BucketPriority(2)), + bucket("a", 8, StreamPriority(0)), + bucket("b", 6, StreamPriority(2)), ), removedBuckets = emptyList(), ), diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt new file mode 100644 index 00000000..5302d1f0 --- /dev/null +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -0,0 +1,229 @@ +package com.powersync.sync + +import app.cash.turbine.turbineScope +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.bucket.Checkpoint +import com.powersync.bucket.StreamPriority +import com.powersync.testutils.bucket +import com.powersync.testutils.databaseTest +import com.powersync.testutils.waitFor +import com.powersync.utils.JsonParam +import com.powersync.utils.JsonUtil +import io.kotest.matchers.collections.shouldHaveSingleElement +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.serialization.json.JsonArray +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.buildJsonArray +import kotlinx.serialization.json.buildJsonObject +import kotlinx.serialization.json.jsonArray +import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive +import kotlinx.serialization.json.put +import kotlin.test.Test + +@OptIn(ExperimentalPowerSyncAPI::class, LegacySyncImplementation::class) +class SyncStreamTest : AbstractSyncTest(true) { + @Test + fun `can disable default streams`() = + databaseTest { + database.connect( + connector, + options = + SyncOptions( + newClientImplementation = true, + includeDefaultStreams = false, + clientConfiguration = SyncClientConfiguration.ExistingClient(createSyncClient()), + ), + ) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + streams["include_defaults"]!!.jsonPrimitive.content shouldBe "false" + + true + } + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `subscribes with streams`() = + databaseTest { + val a = database.syncStream("stream", mapOf("foo" to JsonParam.String("a"))).subscribe() + val b = database.syncStream("stream", mapOf("foo" to JsonParam.String("b"))).subscribe(priority = StreamPriority(1)) + + database.connect(connector, options = getOptions()) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // Should request subscriptions + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + + subscriptions shouldHaveSize 2 + JsonUtil.json.encodeToString(subscriptions[0]) shouldBe + """{"stream":"stream","parameters":{"foo":"a"},"override_priority":null}""" + JsonUtil.json.encodeToString(subscriptions[1]) shouldBe + """{"stream":"stream","parameters":{"foo":"b"},"override_priority":1}""" + true + } + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "1", + checksums = + listOf( + bucket( + "a", + 0, + subscriptions = + buildJsonArray { + add(defaultSubscription(0)) + }, + ), + bucket( + "b", + 0, + priority = StreamPriority(1), + subscriptions = + buildJsonArray { + add(defaultSubscription(1)) + }, + ), + ), + streams = listOf(stream("stream", false)), + ), + ), + ) + + // Subscriptions should be active now, but not marked as synced. + var status = turbine.awaitItem() + for (subscription in listOf(a, b)) { + val subscriptionStatus = status.forStream(subscription)!! + subscriptionStatus.subscription.active shouldBe true + subscriptionStatus.subscription.lastSyncedAt shouldBe null + subscriptionStatus.subscription.hasExplicitSubscription shouldBe true + } + + syncLines.send( + SyncLine.CheckpointPartiallyComplete( + lastOpId = "0", + priority = StreamPriority(1), + ), + ) + status = turbine.awaitItem() + status.forStream(a)!!.subscription.lastSyncedAt shouldBe null + status.forStream(b)!!.subscription.lastSyncedAt shouldNotBeNull {} + b.waitForFirstSync() + + syncLines.send(SyncLine.CheckpointComplete(lastOpId = "0")) + a.waitForFirstSync() + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `reports default streams`() = + databaseTest { + database.connect(connector, options = getOptions()) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "1", + checksums = listOf(), + streams = listOf(stream("default_stream", true)), + ), + ), + ) + + val status = turbine.awaitItem() + status.syncStreams!! shouldHaveSingleElement { + it.subscription.name shouldBe "default_stream" + it.subscription.parameters shouldBe null + it.subscription.isDefault shouldBe true + it.subscription.hasExplicitSubscription shouldBe false + true + } + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @OptIn(DelicateCoroutinesApi::class) + @Test + fun `changes subscriptions dynamically`() = + databaseTest { + database.connect(connector, options = getOptions()) + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + requestedSyncStreams.clear() + + val subscription = database.syncStream("a").subscribe() + + // Adding the subscription should reconnect + turbine.waitFor { it.connected && !it.downloading } + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + + subscriptions shouldHaveSize 1 + JsonUtil.json.encodeToString(subscriptions[0]) shouldBe """{"stream":"a","parameters":null,"override_priority":null}""" + true + } + + // Given that the subscription has a default TTL, unsubscribing should not re-subscribe. + subscription.unsubscribe() + delay(100) + turbine.expectNoEvents() + + turbine.cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `subscriptions update while offline`() = + databaseTest { + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.awaitItem() // Ignore initial + + // Subscribing while offline should add the stream to the subscriptions reported in the + // status. + val subscription = database.syncStream("foo").subscribe() + val status = turbine.awaitItem() + status.forStream(subscription) shouldNotBeNull {} + + turbine.cancelAndIgnoreRemainingEvents() + } + } +} + +private fun stream( + name: String, + isDefault: Boolean, +): JsonObject = + buildJsonObject { + put("name", name) + put("is_default", isDefault) + put("errors", JsonArray(emptyList())) + } + +private fun defaultSubscription(index: Int): JsonObject = buildJsonObject { put("sub", index) } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 0b533cfd..207de1a4 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -11,6 +11,8 @@ import com.powersync.DatabaseDriverFactory import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncTestLogWriter import com.powersync.TestConnector +import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.StreamPriority import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.createPowerSyncDatabaseImpl @@ -26,6 +28,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.io.files.Path +import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonElement expect val factory: DatabaseDriverFactory @@ -153,3 +156,18 @@ internal class ActiveDatabaseTest( cleanup(path) } } + +internal fun bucket( + name: String, + count: Int, + priority: StreamPriority = StreamPriority(3), + checksum: Int = 0, + subscriptions: JsonArray? = null, +): BucketChecksum = + BucketChecksum( + bucket = name, + priority = priority, + checksum = checksum, + count = count, + subscriptions = subscriptions, + ) diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index de2b1399..9a70cc5c 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -1,7 +1,7 @@ package com.powersync import co.touchlab.kermit.Logger -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.ActiveDatabaseGroup import com.powersync.db.ActiveDatabaseResource @@ -13,6 +13,7 @@ import com.powersync.db.driver.SQLiteConnectionPool import com.powersync.db.schema.Schema import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatus +import com.powersync.sync.SyncStream import com.powersync.utils.JsonParam import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow @@ -66,7 +67,7 @@ public interface PowerSyncDatabase : Queries { * given [priority] (or a higher one, since those would be synchronized first) has completed. */ @Throws(PowerSyncException::class, CancellationException::class) - public suspend fun waitForFirstSync(priority: BucketPriority) + public suspend fun waitForFirstSync(priority: StreamPriority) /** * Connect to the PowerSync service, and keep the databases in sync. @@ -181,6 +182,16 @@ public interface PowerSyncDatabase : Queries { @Throws(PowerSyncException::class, CancellationException::class) public suspend fun getPowerSyncVersion(): String + /** + * Create a [SyncStream] instance for the given [name] and [parameters]. + * + * Use [SyncStream.subscribe] on the returned instance to subscribe to the stream. + */ + public fun syncStream( + name: String, + parameters: Map? = null, + ): SyncStream + /** * Close the sync connection. * diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt index 2fe4c042..11bba3ec 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt @@ -3,13 +3,15 @@ package com.powersync.bucket import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonArray @LegacySyncImplementation @Serializable internal data class BucketChecksum( val bucket: String, - val priority: BucketPriority = BucketPriority.DEFAULT_PRIORITY, + val priority: StreamPriority = StreamPriority.DEFAULT_PRIORITY, val checksum: Int, val count: Int? = null, @SerialName("last_op_id") val lastOpId: String? = null, + val subscriptions: JsonArray? = null, ) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index bf4fa151..8ea60a38 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -1,6 +1,7 @@ package com.powersync.bucket import com.powersync.db.SqlCursor +import com.powersync.db.StreamKey import com.powersync.db.crud.CrudEntry import com.powersync.db.internal.PowerSyncTransaction import com.powersync.db.schema.SerializableSchema @@ -9,6 +10,7 @@ import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult import com.powersync.utils.JsonUtil +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonObject @@ -49,7 +51,7 @@ internal interface BucketStorage { @LegacySyncImplementation suspend fun syncLocalDatabase( targetCheckpoint: Checkpoint, - partialPriority: BucketPriority? = null, + partialPriority: StreamPriority? = null, ): SyncLocalDatabaseResult suspend fun control(args: PowerSyncControlArguments): List @@ -65,6 +67,10 @@ internal sealed interface PowerSyncControlArguments { class Start( val parameters: JsonObject, val schema: SerializableSchema, + @SerialName("include_defaults") + val includeDefaults: Boolean, + @SerialName("active_streams") + val activeStreams: List, ) : PowerSyncControlArguments { override val sqlArguments: Pair get() = "start" to JsonUtil.json.encodeToString(this) @@ -99,6 +105,12 @@ internal sealed interface PowerSyncControlArguments { data object ResponseStreamEnd : PowerSyncControlArguments { override val sqlArguments: Pair = "connection" to "end" } + + class UpdateSubscriptions( + activeStreams: List, + ) : PowerSyncControlArguments { + override val sqlArguments: Pair = "update_subscriptions" to JsonUtil.json.encodeToString(activeStreams) + } } @Serializable diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index b9c9e58a..15b126b2 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -196,7 +196,7 @@ internal class BucketStorageImpl( @LegacySyncImplementation override suspend fun syncLocalDatabase( targetCheckpoint: Checkpoint, - partialPriority: BucketPriority?, + partialPriority: StreamPriority?, ): SyncLocalDatabaseResult { val result = validateChecksums(targetCheckpoint, partialPriority) @@ -250,7 +250,7 @@ internal class BucketStorageImpl( @LegacySyncImplementation private suspend fun validateChecksums( checkpoint: Checkpoint, - priority: BucketPriority? = null, + priority: StreamPriority? = null, ): SyncLocalDatabaseResult { val serializedCheckpoint = JsonUtil.json.encodeToString( @@ -286,11 +286,11 @@ internal class BucketStorageImpl( @LegacySyncImplementation private suspend fun updateObjectsFromBuckets( checkpoint: Checkpoint, - priority: BucketPriority? = null, + priority: StreamPriority? = null, ): Boolean { @Serializable data class SyncLocalArgs( - val priority: BucketPriority, + val priority: StreamPriority, val buckets: List, ) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt index 5dc4823b..9e3814a7 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt @@ -3,6 +3,7 @@ package com.powersync.bucket import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement @LegacySyncImplementation @Serializable @@ -10,6 +11,7 @@ internal data class Checkpoint( @SerialName("last_op_id") val lastOpId: String, @SerialName("buckets") val checksums: List, @SerialName("write_checkpoint") val writeCheckpoint: String? = null, + val streams: List? = null, ) { fun clone(): Checkpoint = Checkpoint(lastOpId, checksums, writeCheckpoint) } diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt b/core/src/commonMain/kotlin/com/powersync/bucket/StreamPriority.kt similarity index 52% rename from core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt rename to core/src/commonMain/kotlin/com/powersync/bucket/StreamPriority.kt index 60073707..333e4274 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketPriority.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/StreamPriority.kt @@ -2,25 +2,29 @@ package com.powersync.bucket import kotlinx.serialization.Serializable import kotlin.jvm.JvmInline +import kotlin.jvm.JvmName + +@Deprecated("Use StreamPriority instead") +public typealias BucketPriority = StreamPriority @JvmInline @Serializable -public value class BucketPriority( +public value class StreamPriority( private val priorityCode: Int, -) : Comparable { +) : Comparable { init { require(priorityCode >= 0) } - override fun compareTo(other: BucketPriority): Int = other.priorityCode.compareTo(priorityCode) + override fun compareTo(other: StreamPriority): Int = other.priorityCode.compareTo(priorityCode) - public companion object { - internal val FULL_SYNC_PRIORITY: BucketPriority = BucketPriority(Int.MAX_VALUE) + public companion object Companion { + internal val FULL_SYNC_PRIORITY: StreamPriority = StreamPriority(Int.MAX_VALUE) /** * The assumed priority for buckets when talking to older sync service instances that don't * support bucket priorities. */ - internal val DEFAULT_PRIORITY: BucketPriority = BucketPriority(3) + internal val DEFAULT_PRIORITY: StreamPriority = StreamPriority(3) } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 2a02d86e..ddb8abe5 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -4,9 +4,9 @@ import co.touchlab.kermit.Logger import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase import com.powersync.PowerSyncException -import com.powersync.bucket.BucketPriority import com.powersync.bucket.BucketStorage import com.powersync.bucket.BucketStorageImpl +import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudBatch import com.powersync.db.crud.CrudEntry @@ -19,7 +19,8 @@ import com.powersync.db.internal.InternalTable import com.powersync.db.internal.PowerSyncVersion import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable -import com.powersync.sync.PriorityStatusEntry +import com.powersync.sync.CoreSyncStatus +import com.powersync.sync.StreamingSyncClient import com.powersync.sync.SyncOptions import com.powersync.sync.SyncStatus import com.powersync.sync.SyncStatusData @@ -42,11 +43,7 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -import kotlinx.datetime.LocalDateTime -import kotlinx.datetime.TimeZone -import kotlinx.datetime.toInstant import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.Instant /** * A PowerSync managed database. @@ -80,6 +77,7 @@ internal class PowerSyncDatabaseImpl( get() = activeDatabaseGroup.first.group.identifier private val resource = activeDatabaseGroup.first + private val streams = StreamTracker(this) private val internalDb = InternalDatabaseImpl(pool, logger) @@ -111,7 +109,7 @@ internal class PowerSyncDatabaseImpl( } updateSchemaInternal(schema) - updateHasSynced() + resolveOfflineSyncStatus() } private suspend fun waitReady() { @@ -150,7 +148,7 @@ internal class PowerSyncDatabaseImpl( disconnectInternal() connectInternal(crudThrottleMs) { scope -> - SyncStream( + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = suspend { connector.uploadData(this) }, @@ -160,6 +158,7 @@ internal class PowerSyncDatabaseImpl( uploadScope = scope, options = options, schema = schema, + activeSubscriptions = streams.currentlyReferencedStreams, ) } } @@ -167,12 +166,12 @@ internal class PowerSyncDatabaseImpl( private fun connectInternal( crudThrottleMs: Long, - createStream: (CoroutineScope) -> SyncStream, + createStream: (CoroutineScope) -> StreamingSyncClient, ) { val db = this val job = SupervisorJob(scope.coroutineContext[Job]) syncSupervisorJob = job - var activeStream: SyncStream? = null + var activeStream: StreamingSyncClient? = null scope.launch(job) { // Create the stream in this scope so that everything launched by the stream is bound to @@ -316,6 +315,11 @@ internal class PowerSyncDatabaseImpl( } } + override fun syncStream( + name: String, + parameters: Map?, + ): SyncStream = PendingStream(streams, name, parameters) + override suspend fun getPowerSyncVersion(): String { // The initialization sets powerSyncVersion. waitReady() @@ -466,57 +470,31 @@ internal class PowerSyncDatabaseImpl( currentStatus.update { copy(lastSyncedAt = null, hasSynced = false) } } - private suspend fun updateHasSynced() { - data class SyncedAt( - val priority: BucketPriority, - val syncedAt: Instant?, - ) - - // Query the database to see if any data has been synced - val syncedAtRows = - internalDb.getAll("SELECT * FROM ps_sync_state ORDER BY priority") { - val rawTime = it.getString(1)!! - - SyncedAt( - priority = BucketPriority(it.getLong(0)!!.toInt()), - syncedAt = - LocalDateTime - .parse(rawTime.replace(" ", "T")) - .toInstant(TimeZone.UTC), - ) + internal suspend fun resolveOfflineSyncStatusIfNotConnected() { + mutex.withLock { + if (syncSupervisorJob == null) { + // Not connected or connecting + resolveOfflineSyncStatus() } + } + } - val priorityStatus = mutableListOf() - var lastSyncedAt: Instant? = null - - for (row in syncedAtRows) { - if (row.priority == BucketPriority.FULL_SYNC_PRIORITY) { - lastSyncedAt = row.syncedAt - } else { - priorityStatus.add( - PriorityStatusEntry( - priority = row.priority, - lastSyncedAt = row.syncedAt, - hasSynced = true, - ), - ) + private suspend fun resolveOfflineSyncStatus() { + val offlineSyncStatus = + internalDb.get("SELECT powersync_offline_sync_status()") { + JsonUtil.json.decodeFromString(it.getString(0)!!) } - } currentStatus.update { - copy( - hasSynced = lastSyncedAt != null, - lastSyncedAt = lastSyncedAt, - priorityStatusEntries = priorityStatus, - ) + applyCoreChanges(offlineSyncStatus) } } override suspend fun waitForFirstSync() = waitForFirstSyncImpl(null) - override suspend fun waitForFirstSync(priority: BucketPriority) = waitForFirstSyncImpl(priority) + override suspend fun waitForFirstSync(priority: StreamPriority) = waitForFirstSyncImpl(priority) - private suspend fun waitForFirstSyncImpl(priority: BucketPriority?) { + private suspend fun waitForFirstSyncImpl(priority: StreamPriority?) { val predicate: (SyncStatusData) -> Boolean = if (priority == null) { { it.hasSynced == true } @@ -524,6 +502,10 @@ internal class PowerSyncDatabaseImpl( { it.statusForPriority(priority).hasSynced == true } } + waitForStatusMatching(predicate) + } + + internal suspend fun waitForStatusMatching(predicate: (SyncStatusData) -> Boolean) { if (predicate(currentStatus)) { return } diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt new file mode 100644 index 00000000..a18c4bba --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -0,0 +1,169 @@ +package com.powersync.db + +import com.powersync.bucket.StreamPriority +import com.powersync.db.crud.TypedRow +import com.powersync.sync.SyncStream +import com.powersync.sync.SyncStreamSubscription +import com.powersync.utils.JsonParam +import com.powersync.utils.toJsonObject +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlin.time.Duration + +internal class StreamTracker( + val db: PowerSyncDatabaseImpl, +) { + val groupMutex = Mutex() + val streamGroups = mutableMapOf() + val currentlyReferencedStreams = MutableStateFlow(listOf()) + + private suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { + db.writeTransaction { tx -> + tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) + } + db.resolveOfflineSyncStatusIfNotConnected() + } + + internal suspend fun subscribe( + stream: PendingStream, + ttl: Duration?, + priority: StreamPriority?, + ): SyncStreamSubscription { + val key = stream.key + subscriptionsCommand( + RustSubscriptionChangeRequest( + subscribe = + SubscribeToStream( + stream = key, + ttl = ttl?.inWholeSeconds?.toInt(), + priority = priority, + ), + ), + ) + + return groupMutex.withLock { + var didAddNewGroup = false + val group = + streamGroups.getOrPut(key) { + didAddNewGroup = true + SubscriptionGroup(this, key) + } + + if (didAddNewGroup) { + val updatedStreams = streamGroups.values.toList() + currentlyReferencedStreams.value = updatedStreams + } + + SubscriptionImplementation(group) + } + } + + internal fun removeStreamGroup(key: StreamKey) { + streamGroups.remove(key)?.also { it.active = false } + currentlyReferencedStreams.value = streamGroups.values.toList() + } + + private companion object { + private val jsonDontEncodeDefaults = + Json { + // We don't want to encode defaults so that the RustSubscriptionChangeRequest encodes to the + // correct enum structure with only one field set. + encodeDefaults = false + } + } +} + +internal class PendingStream( + private val tracker: StreamTracker, + override val name: String, + val userParameters: Map?, +) : SyncStream { + override val parameters: Map? + get() { + val obj = userParameters?.toJsonObject() ?: return null + return TypedRow(obj) + } + + val key: StreamKey get() { + val jsonParameters = userParameters?.toJsonObject() + return StreamKey(name, jsonParameters) + } + + override suspend fun subscribe( + ttl: Duration?, + priority: StreamPriority?, + ): SyncStreamSubscription = tracker.subscribe(this, ttl, priority) + + override suspend fun unsubscribeAll() { + tracker.groupMutex.withLock { + tracker.removeStreamGroup(key) + } + } +} + +internal class SubscriptionGroup( + val tracker: StreamTracker, + val key: StreamKey, + var refcount: Int = 0, + var active: Boolean = true, +) { + suspend fun decrementRefCount() { + tracker.groupMutex.withLock { + refcount-- + if (refcount == 0 && active) { + tracker.removeStreamGroup(key) + } + } + } +} + +private class SubscriptionImplementation( + val group: SubscriptionGroup, +) : SyncStreamSubscription { + init { + group.refcount++ + } + + private var subscribed = false + + override val name: String + get() = group.key.name + + override val parameters: Map? = group.key.params?.let { TypedRow(it) } + + override suspend fun waitForFirstSync() { + group.tracker.db.waitForStatusMatching { it.forStream(this)?.subscription?.hasSynced == true } + } + + override suspend fun unsubscribe() { + if (subscribed) { + subscribed = false + group.decrementRefCount() + } + } +} + +@Serializable +internal class RustSubscriptionChangeRequest( + // this is actually an enum with associated data, but this serializes into the form we want + // when only a single field is set. + val subscribe: SubscribeToStream? = null, + val unsubscribe: StreamKey? = null, +) + +@Serializable +internal class SubscribeToStream( + val stream: StreamKey, + val ttl: Int? = null, + val priority: StreamPriority? = null, +) + +@Serializable +internal data class StreamKey( + val name: String, + val params: JsonObject?, +) diff --git a/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt b/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt index 43e7e127..78c384c7 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/crud/SerializedRow.kt @@ -1,5 +1,13 @@ package com.powersync.db.crud +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.serialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonNull import kotlinx.serialization.json.JsonObject @@ -8,6 +16,7 @@ import kotlinx.serialization.json.contentOrNull import kotlinx.serialization.json.jsonPrimitive import kotlin.experimental.ExperimentalObjCRefinement import kotlin.native.HiddenFromObjC +import kotlin.time.Instant /** * A named collection of values as they appear in a SQLite row. @@ -55,14 +64,29 @@ private data class ToStringEntry( get() = inner.value.jsonPrimitive.contentOrNull } -private class TypedRow( - inner: JsonObject, +@Serializable(with = TypedRow.Serializer::class) +internal class TypedRow( + private val inner: JsonObject, ) : AbstractMap() { override val entries: Set> = inner.entries.mapTo( mutableSetOf(), ::ToTypedEntry, ) + + private object Serializer : KSerializer { + override val descriptor: SerialDescriptor + get() = serialDescriptor() + + override fun deserialize(decoder: Decoder): TypedRow = TypedRow(JsonObject.serializer().deserialize(decoder)) + + override fun serialize( + encoder: Encoder, + value: TypedRow, + ) { + encoder.encodeSerializableValue(JsonObject.serializer(), value.inner) + } + } } private data class ToTypedEntry( diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt b/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt index ced90401..fb0bbe65 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Instruction.kt @@ -1,6 +1,7 @@ package com.powersync.sync -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority +import com.powersync.db.crud.TypedRow import kotlinx.serialization.KSerializer import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -46,7 +47,11 @@ internal sealed interface Instruction { data object FlushSileSystem : Instruction - data object CloseSyncStream : Instruction + @Serializable + data class CloseSyncStream( + @SerialName("hide_disconnect") + val hideDisconnect: Boolean, + ) : Instruction data object DidCompleteSync : Instruction @@ -60,7 +65,7 @@ internal sealed interface Instruction { private val establishSyncStream = serializer() private val fetchCredentials = serializer() private val flushFileSystem = serializer() - private val closeSyncStream = serializer() + private val closeSyncStream = serializer() private val didCompleteSync = serializer() override val descriptor = @@ -88,7 +93,6 @@ internal sealed interface Instruction { } 5 -> { decodeSerializableElement(descriptor, 5, closeSyncStream) - CloseSyncStream } 6 -> { decodeSerializableElement(descriptor, 6, didCompleteSync) @@ -127,8 +131,31 @@ internal data class CoreSyncStatus( val downloading: CoreDownloadProgress?, @SerialName("priority_status") val priorityStatus: List, + val streams: List, ) +@Serializable +internal data class CoreActiveStreamSubscription( + override val name: String, + override val parameters: TypedRow?, + val priority: StreamPriority?, + val progress: ProgressInfo, + override val active: Boolean, + @SerialName("is_default") + override val isDefault: Boolean, + @SerialName("has_explicit_subscription") + override val hasExplicitSubscription: Boolean, + @SerialName("expires_at") + @Serializable(with = InstantTimestampSerializer::class) + override val expiresAt: Instant?, + @SerialName("last_synced_at") + @Serializable(with = InstantTimestampSerializer::class) + override val lastSyncedAt: Instant?, +) : SyncSubscriptionDescription { + override val hasSynced: Boolean + get() = lastSyncedAt != null +} + @Serializable internal data class CoreDownloadProgress( val buckets: Map, @@ -136,7 +163,7 @@ internal data class CoreDownloadProgress( @Serializable internal data class CoreBucketProgress( - val priority: BucketPriority, + val priority: StreamPriority, @SerialName("at_last") val atLast: Long, @SerialName("since_last") @@ -147,7 +174,7 @@ internal data class CoreBucketProgress( @Serializable internal data class CorePriorityStatus( - val priority: BucketPriority, + val priority: StreamPriority, @SerialName("last_synced_at") @Serializable(with = InstantTimestampSerializer::class) val lastSyncedAt: Instant?, diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt index d260624c..ae5833f2 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -1,8 +1,10 @@ package com.powersync.sync -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.LocalOperationCounters +import com.powersync.bucket.StreamPriority +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable import kotlin.math.min /** @@ -41,8 +43,11 @@ public interface ProgressWithOperations { } } +@Serializable internal data class ProgressInfo( + @SerialName("downloaded") override val downloadedOperations: Int, + @SerialName("total") override val totalOperations: Int, ) : ProgressWithOperations @@ -73,7 +78,7 @@ public data class SyncDownloadProgress internal constructor( override val totalOperations: Int init { - val (target, completed) = targetAndCompletedCounts(BucketPriority.FULL_SYNC_PRIORITY) + val (target, completed) = targetAndCompletedCounts(StreamPriority.FULL_SYNC_PRIORITY) totalOperations = target downloadedOperations = completed } @@ -127,7 +132,7 @@ public data class SyncDownloadProgress internal constructor( * The returned [ProgressWithOperations] instance tracks the target amount of operations that need to be downloaded * in total and how many of them have already been received. */ - public fun untilPriority(priority: BucketPriority): ProgressWithOperations { + public fun untilPriority(priority: StreamPriority): ProgressWithOperations { val (total, completed) = targetAndCompletedCounts(priority) return ProgressInfo(totalOperations = total, downloadedOperations = completed) } @@ -150,7 +155,7 @@ public data class SyncDownloadProgress internal constructor( }, ) - private fun targetAndCompletedCounts(priority: BucketPriority): Pair = + private fun targetAndCompletedCounts(priority: StreamPriority): Pair = buckets.values .asSequence() .filter { it.priority >= priority } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Stream.kt b/core/src/commonMain/kotlin/com/powersync/sync/Stream.kt new file mode 100644 index 00000000..c5977157 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/sync/Stream.kt @@ -0,0 +1,102 @@ +package com.powersync.sync + +import com.powersync.bucket.StreamPriority +import kotlin.time.Duration +import kotlin.time.Instant + +public interface SyncStreamDescription { + /** + * THe name of the stream as it appears in the stream definition for the PowerSync service. + */ + public val name: String + + /** + * The parameters used to subscribe to the stream, if any. + * + * The same stream can be subscribed to multiple times with different parameters. + */ + public val parameters: Map? +} + +/** + * Information about a subscribed sync stream. + * + * This includes the [SyncStreamDescription] along with information about the current sync status. + */ +public interface SyncSubscriptionDescription : SyncStreamDescription { + /** + * Whether this stream is active, meaning that the subscription has been acknowledged by the + * sync service. + */ + public val active: Boolean + + /** + * Whether this stream subscription is included yb default, regardless of whether the stream has + * explicitly been subscribed to or not. + * + * Default streams are created by applying `auto_subscribe: true` in their definition on the + * sync service. + * + * It's possible for both [isDefault] and [hasExplicitSubscription] to be true at the same time. + * This happens when a default stream was subscribed explicitly. + */ + public val isDefault: Boolean + + /** + * Whether this stream been subscribed to explicitly. + * + * It's possible for both [isDefault] and [hasExplicitSubscription] to be true at the same time. + * This happens when a default stream was subscribed explicitly. + */ + public val hasExplicitSubscription: Boolean + + /** + * For sync streams that have a time-to-live, the current time at which the stream would expire + * if not subscribed to again. + */ + public val expiresAt: Instant? + + /** + * Whether this stream subscription has been synced at least once. + */ + public val hasSynced: Boolean + + /** + * If [hasSynced] is true, the last time data from this stream has been synced. + */ + public val lastSyncedAt: Instant? +} + +/** + * A handle to a [SyncStreamDescription] that allows subscribing to the stream. + * + * To obtain an instance of [SyncStream], call [com.powersync.PowerSyncDatabase.syncStream]. + */ +public interface SyncStream : SyncStreamDescription { + public suspend fun subscribe( + ttl: Duration? = null, + priority: StreamPriority? = null, + ): SyncStreamSubscription + + public suspend fun unsubscribeAll() +} + +/** + * A [SyncStream] that has been subscribed to. + */ +public interface SyncStreamSubscription : SyncStreamDescription { + /** + * A variant of [com.powersync.PowerSyncDatabase.waitForFirstSync] that is specific to this + * stream subscription. + */ + public suspend fun waitForFirstSync() + + /** + * Removes this subscription. + * + * Once all [SyncStreamSubscription]s for a [SyncStream] have been unsubscribed, the `ttl` for + * that stream starts running. When it expires without subscribing again, the stream will be + * evicted. + */ + public suspend fun unsubscribe() +} diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt similarity index 93% rename from core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt rename to core/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt index 00ead40f..10997a34 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSync.kt @@ -12,10 +12,11 @@ import com.powersync.bucket.Checkpoint import com.powersync.bucket.PowerSyncControlArguments import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.db.SubscriptionGroup import com.powersync.db.crud.CrudEntry import com.powersync.db.schema.Schema import com.powersync.db.schema.toSerializable -import com.powersync.sync.SyncStream.Companion.SOCKET_TIMEOUT +import com.powersync.sync.StreamingSyncClient.Companion.SOCKET_TIMEOUT import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig @@ -52,6 +53,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map @@ -102,7 +104,7 @@ public fun HttpClientConfig<*>.configureSyncHttpClient(userAgent: String = userA } @OptIn(ExperimentalPowerSyncAPI::class) -internal class SyncStream( +internal class StreamingSyncClient( private val bucketStorage: BucketStorage, private val connector: PowerSyncBackendConnector, private val uploadCrud: suspend () -> Unit, @@ -112,6 +114,7 @@ internal class SyncStream( private val uploadScope: CoroutineScope, private val options: SyncOptions, private val schema: Schema, + private val activeSubscriptions: StateFlow>, ) { private var isUploadingCrud = AtomicReference(null) private var completedCrudUploads = Channel(onBufferOverflow = BufferOverflow.DROP_OLDEST) @@ -148,8 +151,14 @@ internal class SyncStream( suspend fun streamingSync() { var invalidCredentials = false clientId = bucketStorage.getClientId() + var result = SyncIterationResult() + while (true) { - status.update { copy(connecting = true) } + if (!result.hideDisconnectStateAndReconnectImmediately) { + status.update { copy(connecting = true) } + } + result = SyncIterationResult() + try { if (invalidCredentials) { // This may error. In that case it will be retried again on the next @@ -157,7 +166,7 @@ internal class SyncStream( connector.invalidateCredentials() invalidCredentials = false } - streamingSyncIteration() + result = streamingSyncIteration() } catch (e: Exception) { if (e is CancellationException) { throw e @@ -166,8 +175,10 @@ internal class SyncStream( logger.e("Error in streamingSync: ${e.message}") status.update { copy(downloadError = e) } } finally { - status.update { copy(connected = false, connecting = true, downloading = false) } - delay(retryDelayMs) + if (!result.hideDisconnectStateAndReconnectImmediately) { + status.update { copy(connected = false, connecting = true, downloading = false) } + delay(retryDelayMs) + } } } } @@ -329,7 +340,7 @@ internal class SyncStream( } } - private suspend fun streamingSyncIteration() { + private suspend fun streamingSyncIteration(): SyncIterationResult = coroutineScope { if (options.newClientImplementation) { val iteration = ActiveIteration(this) @@ -345,9 +356,9 @@ internal class SyncStream( } } else { legacySyncIteration() + SyncIterationResult() } } - } @OptIn(LegacySyncImplementation::class) private suspend fun CoroutineScope.legacySyncIteration() { @@ -363,26 +374,42 @@ internal class SyncStream( */ private inner class ActiveIteration( val scope: CoroutineScope, - var fetchLinesJob: Job? = null, - var credentialsInvalidation: Job? = null, ) { + var fetchLinesJob: Job? = null + var credentialsInvalidation: Job? = null + // Using a channel for control invocations so that they're handled by a single coroutine, // avoiding races between concurrent jobs like fetching credentials. private val controlInvocations = Channel() + private var result = SyncIterationResult() private suspend fun invokeControl(args: PowerSyncControlArguments) { val instructions = bucketStorage.control(args) instructions.forEach { handleInstruction(it) } } - suspend fun start() { + suspend fun start(): SyncIterationResult { + var subscriptions = activeSubscriptions.value + invokeControl( PowerSyncControlArguments.Start( parameters = params, schema = schema.toSerializable(), + includeDefaults = options.includeDefaultStreams, + activeStreams = subscriptions.map { it.key }, ), ) + val listenForUpdatedSubscriptions = + scope.launch { + activeSubscriptions.collect { + if (subscriptions !== it) { + subscriptions = it + controlInvocations.send(PowerSyncControlArguments.UpdateSubscriptions(activeSubscriptions.value.map { it.key })) + } + } + } + var hadSyncLine = false for (line in controlInvocations) { val instructions = bucketStorage.control(line) @@ -398,6 +425,9 @@ internal class SyncStream( triggerCrudUploadAsync() } } + + listenForUpdatedSubscriptions.cancel() + return result } suspend fun stop() { @@ -429,8 +459,10 @@ internal class SyncStream( } } - Instruction.CloseSyncStream -> { - logger.v { "Closing sync stream connection" } + is Instruction.CloseSyncStream -> { + val hideDisconnect = instruction.hideDisconnect + logger.v { "Closing sync stream connection. Hide disconnect: $hideDisconnect" } + result = SyncIterationResult(hideDisconnect) fetchLinesJob!!.cancelAndJoin() fetchLinesJob = null logger.v { "Sync stream connection shut down" } @@ -771,7 +803,7 @@ internal class SyncStream( } } - internal companion object { + internal companion object Companion { // The sync service sends a token keepalive message roughly every 20 seconds. So if we don't receive a message // in twice that time, assume the connection is broken. internal const val SOCKET_TIMEOUT: Long = 40_000 @@ -854,3 +886,7 @@ internal data class SyncStreamState( private class PendingCrudUpload( val done: CompletableDeferred, ) + +private class SyncIterationResult( + val hideDisconnectStateAndReconnectImmediately: Boolean = false, +) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt index a9a9a4f3..557b8346 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncLine.kt @@ -1,9 +1,9 @@ package com.powersync.sync import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OplogEntry +import com.powersync.bucket.StreamPriority import kotlinx.serialization.KSerializer import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable @@ -39,7 +39,7 @@ internal sealed interface SyncLine { @Serializable data class CheckpointPartiallyComplete( @SerialName("last_op_id") val lastOpId: String, - @SerialName("priority") val priority: BucketPriority, + @SerialName("priority") val priority: StreamPriority, ) : SyncLine @Serializable diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt index c8c89f5b..30902d3d 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncOptions.kt @@ -57,6 +57,11 @@ public class SyncOptions * Allows configuring the [HttpClient] used for connecting to the PowerSync service. */ public val clientConfiguration: SyncClientConfiguration? = null, + /** + * Whether streams that have been defined with `auto_subscribe: true` should be synced even + * when they don't have an explicit subscription. + */ + public val includeDefaultStreams: Boolean = true, ) { public companion object { /** diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 180e1429..8ae3d48c 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -1,6 +1,6 @@ package com.powersync.sync -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow @@ -10,32 +10,32 @@ import kotlin.time.Instant @ConsistentCopyVisibility public data class PriorityStatusEntry internal constructor( - val priority: BucketPriority, + val priority: StreamPriority, val lastSyncedAt: Instant?, val hasSynced: Boolean?, ) -public interface SyncStatusData { +public sealed class SyncStatusData { /** * true if currently connected. * * This means the PowerSync connection is ready to download, and [PowerSyncBackendConnector.uploadData] may be called for any local changes. */ - public val connected: Boolean + public abstract val connected: Boolean /** * true if the PowerSync connection is busy connecting. * * During this stage, [PowerSyncBackendConnector.uploadData] may already be called, and [uploading] may be true. */ - public val connecting: Boolean + public abstract val connecting: Boolean /** * true if actively downloading changes. * * This is only true when [connected] is also true. */ - public val downloading: Boolean + public abstract val downloading: Boolean /** * Realtime progress information about downloaded operations during an active sync. @@ -44,45 +44,45 @@ public interface SyncStatusData { * For more information on what progress is reported, see [SyncDownloadProgress]. * This value will be non-null only if [downloading] is true. */ - public val downloadProgress: SyncDownloadProgress? + public abstract val downloadProgress: SyncDownloadProgress? /** * true if uploading changes */ - public val uploading: Boolean + public abstract val uploading: Boolean /** * Time that a last sync has fully completed, if any. * * Currently this is reset to null after a restart. */ - public val lastSyncedAt: Instant? + public abstract val lastSyncedAt: Instant? /** * Indicates whether there has been at least one full sync, if any. * * Is null when unknown, for example when state is still being loaded from the database. */ - public val hasSynced: Boolean? + public abstract val hasSynced: Boolean? /** * Error during uploading. * * Cleared on the next successful upload. */ - public val uploadError: Any? + public abstract val uploadError: Any? /** * Error during downloading (including connecting). * * Cleared on the next successful data download. */ - public val downloadError: Any? + public abstract val downloadError: Any? /** * Convenience getter for either the value of downloadError or uploadError */ - public val anyError: Any? + public abstract val anyError: Any? /** * Available [PriorityStatusEntry] reporting the sync status for buckets within priorities. @@ -91,12 +91,14 @@ public interface SyncStatusData { * and [lastSyncedAt] are set to indicate that a partial (but no complete) sync has completed. * A completed [PriorityStatusEntry] at one priority level always includes all higher priorities too. */ - public val priorityStatusEntries: List + public abstract val priorityStatusEntries: List + + internal abstract val internalSubscriptions: List? /** * Status information for whether buckets in [priority] have been synchronized. */ - public fun statusForPriority(priority: BucketPriority): PriorityStatusEntry { + public fun statusForPriority(priority: StreamPriority): PriorityStatusEntry { val byDescendingPriorities = priorityStatusEntries.sortedByDescending { it.priority } for (entry in byDescendingPriorities) { @@ -110,6 +112,36 @@ public interface SyncStatusData { // A complete sync necessarily includes all priorities. return PriorityStatusEntry(priority, lastSyncedAt, hasSynced) } + + /** + * All sync streams currently being tracked in the database. + * + * This returns null when the database is currently being opened and we don't have reliable + * information about included streams yet. + */ + public val syncStreams: List? get() = internalSubscriptions?.map(this::exposeStreamStatus) + + /** + * Status information for [stream], if it's a stream that is currently tracked by the sync + * client. + */ + public fun forStream(stream: SyncStreamDescription): SyncStreamStatus? { + val raw = internalSubscriptions?.firstOrNull { it.name == stream.name && it.parameters == stream.parameters } ?: return null + return exposeStreamStatus(raw) + } + + private fun exposeStreamStatus(internal: CoreActiveStreamSubscription): SyncStreamStatus { + val progress = + if (this.downloadProgress == null) { + null + } else { + // The core extension will always give us progress numbers, but we should only expose + // them when that makes sense (i.e. we're actually downloading). + internal.progress + } + + return SyncStreamStatus(progress, internal) + } } internal data class SyncStatusDataContainer( @@ -123,12 +155,13 @@ internal data class SyncStatusDataContainer( override val uploadError: Any? = null, override val downloadError: Any? = null, override val priorityStatusEntries: List = emptyList(), -) : SyncStatusData { + override val internalSubscriptions: List = emptyList(), +) : SyncStatusData() { override val anyError get() = downloadError ?: uploadError internal fun applyCoreChanges(status: CoreSyncStatus): SyncStatusDataContainer { - val completeSync = status.priorityStatus.firstOrNull { it.priority == BucketPriority.FULL_SYNC_PRIORITY } + val completeSync = status.priorityStatus.firstOrNull { it.priority == StreamPriority.FULL_SYNC_PRIORITY } return copy( connected = status.connected, @@ -145,6 +178,7 @@ internal data class SyncStatusDataContainer( hasSynced = it.hasSynced, ) }, + internalSubscriptions = status.streams, ) } @@ -169,7 +203,7 @@ internal data class SyncStatusDataContainer( @ConsistentCopyVisibility public data class SyncStatus internal constructor( private var data: SyncStatusDataContainer = SyncStatusDataContainer(), -) : SyncStatusData { +) : SyncStatusData() { private val stateFlow: MutableStateFlow = MutableStateFlow(data) /** @@ -224,6 +258,9 @@ public data class SyncStatus internal constructor( override val priorityStatusEntries: List get() = data.priorityStatusEntries + override val internalSubscriptions: List + get() = data.internalSubscriptions + override fun toString(): String = "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, hasSynced=$hasSynced, error=$anyError)" @@ -231,3 +268,30 @@ public data class SyncStatus internal constructor( public fun empty(): SyncStatus = SyncStatus() } } + +/** + * Current information about a [SyncStreamSubscription]. + */ +@ConsistentCopyVisibility +public data class SyncStreamStatus internal constructor( + /** + * If the sync status is currently downloading, information about download progress related to + * this stream. + */ + val progress: ProgressWithOperations?, + internal val internal: CoreActiveStreamSubscription, +) { + /** + * The [SyncSubscriptionDescription] providing information about the subscription. + */ + val subscription: SyncSubscriptionDescription + get() = internal + + /** + * The priority of this stream. + * + * New data on higher-priority streams can interrupt low-priority streams. + */ + val priority: StreamPriority + get() = internal.priority ?: StreamPriority.FULL_SYNC_PRIORITY +} diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt index 5236d1e0..c5fff921 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncLineTest.kt @@ -1,8 +1,8 @@ package com.powersync.sync import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint +import com.powersync.bucket.StreamPriority import com.powersync.utils.JsonUtil import kotlin.test.Test import kotlin.test.assertEquals @@ -35,7 +35,7 @@ class SyncLineTest { SyncLine.FullCheckpoint( Checkpoint( lastOpId = "10", - checksums = listOf(BucketChecksum(bucket = "a", priority = BucketPriority(3), checksum = 10)), + checksums = listOf(BucketChecksum(bucket = "a", priority = StreamPriority(3), checksum = 10)), ), ), """{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "checksum": 10}]}}""", @@ -48,7 +48,7 @@ class SyncLineTest { SyncLine.FullCheckpoint( Checkpoint( lastOpId = "10", - checksums = listOf(BucketChecksum(bucket = "a", priority = BucketPriority(1), checksum = 10)), + checksums = listOf(BucketChecksum(bucket = "a", priority = StreamPriority(1), checksum = 10)), ), ), """{"checkpoint": {"last_op_id": "10", "buckets": [{"bucket": "a", "priority": 1, "checksum": 10}]}}""", @@ -77,7 +77,7 @@ class SyncLineTest { checkDeserializing( SyncLine.CheckpointPartiallyComplete( lastOpId = "10", - priority = BucketPriority(1), + priority = StreamPriority(1), ), """{"partial_checkpoint_complete": {"last_op_id": "10", "priority": 1}}""", ) diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 3872da0a..5592dfa6 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -13,7 +13,7 @@ import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import com.powersync.db.schema.Schema -import com.powersync.sync.SyncStream.Companion.bsonObjects +import com.powersync.sync.StreamingSyncClient.Companion.bsonObjects import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock @@ -24,6 +24,7 @@ import io.ktor.client.engine.mock.MockEngine import io.ktor.utils.io.ByteChannel import io.ktor.utils.io.writeByteArray import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout @@ -34,10 +35,10 @@ import kotlin.test.assertContains import kotlin.test.assertEquals @OptIn(ExperimentalKermitApi::class, ExperimentalPowerSyncAPI::class) -class SyncStreamTest { +class StreamingSyncClientTest { private lateinit var bucketStorage: BucketStorage private lateinit var connector: PowerSyncBackendConnector - private lateinit var syncStream: SyncStream + private lateinit var streamingSyncClient: StreamingSyncClient private val testLogWriter = TestLogWriter( loggable = Severity.Verbose, @@ -66,8 +67,8 @@ class SyncStreamTest { @Test fun testInvalidateCredentials() = runTest { - syncStream = - SyncStream( + streamingSyncClient = + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = {}, @@ -84,10 +85,11 @@ class SyncStreamTest { ), ), schema = Schema(), + activeSubscriptions = MutableStateFlow(emptyList()), ) connector.cachedCredentials = TestConnector.testCredentials - syncStream.invalidateCredentials() + streamingSyncClient.invalidateCredentials() connector.cachedCredentials shouldBe null } @@ -109,8 +111,8 @@ class SyncStreamTest { everySuspend { nextCrudItem() } returns mockCrudEntry } - syncStream = - SyncStream( + streamingSyncClient = + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = { }, @@ -128,10 +130,11 @@ class SyncStreamTest { ), ), schema = Schema(), + activeSubscriptions = MutableStateFlow(emptyList()), ) - syncStream.status.update { copy(connected = true) } - syncStream.triggerCrudUploadAsync().join() + streamingSyncClient.status.update { copy(connected = true) } + streamingSyncClient.triggerCrudUploadAsync().join() testLogWriter.assertCount(2) @@ -160,8 +163,8 @@ class SyncStreamTest { everySuspend { getClientId() } returns "test-client-id" } - syncStream = - SyncStream( + streamingSyncClient = + StreamingSyncClient( bucketStorage = bucketStorage, connector = connector, uploadCrud = { }, @@ -179,24 +182,25 @@ class SyncStreamTest { ), ), schema = Schema(), + activeSubscriptions = MutableStateFlow(emptyList()), ) // Launch streaming sync in a coroutine that we'll cancel after verification val job = launch { - syncStream.streamingSync() + streamingSyncClient.streamingSync() } // Wait for status to update withTimeout(1000) { - while (!syncStream.status.connecting) { + while (!streamingSyncClient.status.connecting) { delay(10) } } // Verify initial state - assertEquals(true, syncStream.status.connecting) - assertEquals(false, syncStream.status.connected) + assertEquals(true, streamingSyncClient.status.connecting) + assertEquals(false, streamingSyncClient.status.connected) // Clean up job.cancel() diff --git a/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt b/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt index 2b95c3e3..0cb1b4a1 100644 --- a/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt +++ b/demos/android-supabase-todolist/src/main/java/com/powersync/androidexample/screens/HomeScreen.kt @@ -13,7 +13,7 @@ import androidx.compose.runtime.Composable import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.demos.Screen import com.powersync.demos.components.Input import com.powersync.demos.components.ListContent @@ -66,7 +66,7 @@ internal fun HomeScreen( // When giving lists a higher priority than items, we can have a consistent snapshot of // lists without items. In the case where many items exist (that might take longer to // sync initially), this allows us to display lists earlier. - if (status.statusForPriority(BucketPriority(1)).hasSynced == true) { + if (status.statusForPriority(StreamPriority(1)).hasSynced == true) { ListContent( items = items, onItemClicked = onItemClicked, diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt index 52e5b521..9ebd9596 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/components/GuardBySync.kt @@ -15,7 +15,7 @@ import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.unit.dp import com.powersync.PowerSyncDatabase -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.compose.composeState import com.powersync.sync.SyncStatusData import org.koin.compose.koinInject @@ -28,7 +28,7 @@ import org.koin.compose.koinInject @Composable fun GuardBySync( db: PowerSyncDatabase = koinInject(), - priority: BucketPriority? = null, + priority: StreamPriority? = null, content: @Composable () -> Unit ) { val state: SyncStatusData by db.currentStatus.composeState() diff --git a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt index 4e53852e..97eb9dce 100644 --- a/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt +++ b/demos/supabase-todolist/shared/src/commonMain/kotlin/com/powersync/demos/screens/HomeScreen.kt @@ -1,22 +1,18 @@ package com.powersync.demos.screens -import androidx.compose.foundation.background import androidx.compose.foundation.layout.Box import androidx.compose.foundation.layout.Column import androidx.compose.foundation.layout.Spacer -import androidx.compose.foundation.layout.fillMaxSize import androidx.compose.foundation.layout.fillMaxWidth import androidx.compose.foundation.layout.padding import androidx.compose.foundation.layout.width -import androidx.compose.material.MaterialTheme import androidx.compose.material.Text import androidx.compose.material.TopAppBar import androidx.compose.runtime.Composable -import androidx.compose.ui.Alignment import androidx.compose.ui.Modifier import androidx.compose.ui.text.style.TextAlign import androidx.compose.ui.unit.dp -import com.powersync.bucket.BucketPriority +import com.powersync.bucket.StreamPriority import com.powersync.demos.Screen import com.powersync.demos.components.GuardBySync import com.powersync.demos.components.Input @@ -64,7 +60,7 @@ internal fun HomeScreen( // than items, we can have a consistent snapshot of lists without items. In the case where // many items exist (that might take longer to sync initially), this allows us to display // lists earlier. - GuardBySync(priority = BucketPriority(1)) { + GuardBySync(priority = StreamPriority(1)) { Input( text = inputText, onAddClicked = onAddItemClicked, From 536593022bbd9c371844e9f35f9e3fa6031fcf3c Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 9 Sep 2025 12:19:18 +0200 Subject: [PATCH 2/9] New tests --- .../com/powersync/sync/SyncProgressTest.kt | 14 +- .../com/powersync/sync/SyncStreamTest.kt | 129 +++++++++++++----- .../com/powersync/testutils/TestUtils.kt | 18 --- .../com/powersync/bucket/BucketChecksum.kt | 2 - .../kotlin/com/powersync/bucket/Checkpoint.kt | 1 - .../kotlin/com/powersync/db/StreamImpl.kt | 3 +- ...reamTest.kt => StreamingSyncClientTest.kt} | 0 .../powersync/testutils/MockSyncService.kt | 5 + 8 files changed, 114 insertions(+), 58 deletions(-) rename core/src/commonTest/kotlin/com/powersync/sync/{SyncStreamTest.kt => StreamingSyncClientTest.kt} (100%) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index a4ee9b34..c53c1e5a 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -3,12 +3,12 @@ package com.powersync.sync import app.cash.turbine.ReceiveTurbine import app.cash.turbine.turbineScope import com.powersync.bucket.BucketChecksum +import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry import com.powersync.bucket.StreamPriority import com.powersync.testutils.ActiveDatabaseTest -import com.powersync.testutils.bucket import com.powersync.testutils.databaseTest import com.powersync.testutils.waitFor import io.kotest.assertions.withClue @@ -33,6 +33,18 @@ abstract class BaseSyncProgressTest( lastOpId = 0 } + private fun bucket( + name: String, + count: Int, + priority: StreamPriority = StreamPriority(3), + ): BucketChecksum = + BucketChecksum( + bucket = name, + priority = priority, + checksum = 0, + count = count, + ) + private suspend fun ActiveDatabaseTest.addDataLine( bucket: String, amount: Int, diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 5302d1f0..1a8ed4f4 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -2,9 +2,7 @@ package com.powersync.sync import app.cash.turbine.turbineScope import com.powersync.ExperimentalPowerSyncAPI -import com.powersync.bucket.Checkpoint import com.powersync.bucket.StreamPriority -import com.powersync.testutils.bucket import com.powersync.testutils.databaseTest import com.powersync.testutils.waitFor import com.powersync.utils.JsonParam @@ -15,6 +13,7 @@ import io.kotest.matchers.nulls.shouldNotBeNull import io.kotest.matchers.shouldBe import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.buildJsonArray @@ -80,31 +79,26 @@ class SyncStreamTest : AbstractSyncTest(true) { } syncLines.send( - SyncLine.FullCheckpoint( - Checkpoint( - lastOpId = "1", - checksums = - listOf( - bucket( - "a", - 0, - subscriptions = - buildJsonArray { - add(defaultSubscription(0)) - }, - ), - bucket( - "b", - 0, - priority = StreamPriority(1), - subscriptions = - buildJsonArray { - add(defaultSubscription(1)) - }, - ), - ), - streams = listOf(stream("stream", false)), + checkpointLine( + listOf( + bucket( + "a", + 3, + subscriptions = + buildJsonArray { + add(defaultSubscription(0)) + }, + ), + bucket( + "b", + 1, + subscriptions = + buildJsonArray { + add(defaultSubscription(1)) + }, + ), ), + listOf(stream("stream", false)), ), ) @@ -143,15 +137,7 @@ class SyncStreamTest : AbstractSyncTest(true) { val turbine = database.currentStatus.asFlow().testIn(this) turbine.waitFor { it.connected && !it.downloading } - syncLines.send( - SyncLine.FullCheckpoint( - Checkpoint( - lastOpId = "1", - checksums = listOf(), - streams = listOf(stream("default_stream", true)), - ), - ), - ) + syncLines.send(checkpointLine(listOf(), listOf(stream("default_stream", true)))) val status = turbine.awaitItem() status.syncStreams!! shouldHaveSingleElement { @@ -214,8 +200,81 @@ class SyncStreamTest : AbstractSyncTest(true) { turbine.cancelAndIgnoreRemainingEvents() } } + + @Test + fun `unsubscribing multiple times has no effect`() = + databaseTest { + val a = database.syncStream("a").subscribe() + val aAgain = database.syncStream("a").subscribe() + a.unsubscribe() + a.unsubscribe() + + // Pretend the streams are expired - they should still be requested because the core + // extension extends the lifetime of streams currently referenced before connecting. + database.execute("UPDATE ps_stream_subscriptions SET expires_at = unixepoch() - 1000") + + database.connect(connector, options = getOptions()) + database.waitForStatusMatching { it.connected } + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + subscriptions shouldHaveSize 1 + true + } + aAgain.unsubscribe() + } + + @Test + fun unsubscribeAll() = + databaseTest { + val a = database.syncStream("a").subscribe() + database.syncStream("a").unsubscribeAll() + + // Despite a being active, it should not be requested. + database.connect(connector, options = getOptions()) + database.waitForStatusMatching { it.connected } + requestedSyncStreams shouldHaveSingleElement { + val streams = it.jsonObject["streams"]!!.jsonObject + val subscriptions = streams["subscriptions"]!!.jsonArray + subscriptions shouldHaveSize 0 + true + } + a.unsubscribe() + } } +@OptIn(ExperimentalSerializationApi::class) +private fun checkpointLine( + buckets: List, + streams: List, +): JsonObject = + buildJsonObject { + put("checkpoint", checkpoint(buckets, streams)) + } + +@OptIn(ExperimentalSerializationApi::class) +private fun checkpoint( + buckets: List, + streams: List, +): JsonObject = + buildJsonObject { + put("last_op_id", "0") + put("buckets", buildJsonArray { addAll(buckets) }) + put("streams", buildJsonArray { addAll(streams) }) + } + +private fun bucket( + name: String, + priority: Int, + subscriptions: JsonArray? = null, +): JsonObject = + buildJsonObject { + put("bucket", name) + put("priority", priority) + put("checksum", 0) + subscriptions?.let { put("subscriptions", it) } + } + private fun stream( name: String, isDefault: Boolean, diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 207de1a4..0b533cfd 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -11,8 +11,6 @@ import com.powersync.DatabaseDriverFactory import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncTestLogWriter import com.powersync.TestConnector -import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.StreamPriority import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.createPowerSyncDatabaseImpl @@ -28,7 +26,6 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.io.files.Path -import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonElement expect val factory: DatabaseDriverFactory @@ -156,18 +153,3 @@ internal class ActiveDatabaseTest( cleanup(path) } } - -internal fun bucket( - name: String, - count: Int, - priority: StreamPriority = StreamPriority(3), - checksum: Int = 0, - subscriptions: JsonArray? = null, -): BucketChecksum = - BucketChecksum( - bucket = name, - priority = priority, - checksum = checksum, - count = count, - subscriptions = subscriptions, - ) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt index 11bba3ec..30269d69 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt @@ -3,7 +3,6 @@ package com.powersync.bucket import com.powersync.sync.LegacySyncImplementation import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable -import kotlinx.serialization.json.JsonArray @LegacySyncImplementation @Serializable @@ -13,5 +12,4 @@ internal data class BucketChecksum( val checksum: Int, val count: Int? = null, @SerialName("last_op_id") val lastOpId: String? = null, - val subscriptions: JsonArray? = null, ) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt index 9e3814a7..4cf92bdf 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/Checkpoint.kt @@ -11,7 +11,6 @@ internal data class Checkpoint( @SerialName("last_op_id") val lastOpId: String, @SerialName("buckets") val checksums: List, @SerialName("write_checkpoint") val writeCheckpoint: String? = null, - val streams: List? = null, ) { fun clone(): Checkpoint = Checkpoint(lastOpId, checksums, writeCheckpoint) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index a18c4bba..7d3251c2 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -21,7 +21,7 @@ internal class StreamTracker( val streamGroups = mutableMapOf() val currentlyReferencedStreams = MutableStateFlow(listOf()) - private suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { + suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { db.writeTransaction { tx -> tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) } @@ -101,6 +101,7 @@ internal class PendingStream( override suspend fun unsubscribeAll() { tracker.groupMutex.withLock { tracker.removeStreamGroup(key) + tracker.subscriptionsCommand(RustSubscriptionChangeRequest(unsubscribe = key)) } } } diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncClientTest.kt similarity index 100% rename from core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt rename to core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncClientTest.kt diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index 48cf0dfd..ea3cbf77 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -27,6 +27,7 @@ import io.ktor.utils.io.writer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.consume +import kotlinx.serialization.json.JsonElement /** * A mock HTTP engine providing sync lines read from a coroutines [ReceiveChannel]. @@ -69,6 +70,10 @@ internal class MockSyncService( val serializedLine = JsonUtil.json.encodeToString(line) channel.writeStringUtf8("$serializedLine\n") } + is JsonElement -> { + val serializedLine = JsonUtil.json.encodeToString(line) + channel.writeStringUtf8("$serializedLine\n") + } is ByteArray -> { channel.writeByteArray(line) } From 35c57c9dac45c308a87f8a517fef5cb5b17ae04e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 9 Sep 2025 12:23:54 +0200 Subject: [PATCH 3/9] Remove deprecated import --- .../kotlin/com/powersync/sync/SyncProgressTest.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index c53c1e5a..a3f1e0c2 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -3,7 +3,6 @@ package com.powersync.sync import app.cash.turbine.ReceiveTurbine import app.cash.turbine.turbineScope import com.powersync.bucket.BucketChecksum -import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry From 78658d7704bab78f9623afcbc441505027c84e52 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 9 Sep 2025 12:28:07 +0200 Subject: [PATCH 4/9] Fix SQLiter crash --- core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index 7d3251c2..bc3db857 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -23,7 +23,7 @@ internal class StreamTracker( suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { db.writeTransaction { tx -> - tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) + tx.get("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) {} } db.resolveOfflineSyncStatusIfNotConnected() } From e7a41831bf0d732c1c0286c4a7c76ae502925394 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 23 Sep 2025 11:34:08 +0200 Subject: [PATCH 5/9] Mark as experimental --- core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt | 3 ++- core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt | 2 ++ core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt | 3 +++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index 9a70cc5c..c56ac048 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -183,10 +183,11 @@ public interface PowerSyncDatabase : Queries { public suspend fun getPowerSyncVersion(): String /** - * Create a [SyncStream] instance for the given [name] and [parameters]. + * Create a [SyncStream] instance for the given [name] and [parameters]. * * Use [SyncStream.subscribe] on the returned instance to subscribe to the stream. */ + @ExperimentalPowerSyncAPI public fun syncStream( name: String, parameters: Map? = null, diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index bc3db857..270e542b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -1,5 +1,6 @@ package com.powersync.db +import com.powersync.ExperimentalPowerSyncAPI import com.powersync.bucket.StreamPriority import com.powersync.db.crud.TypedRow import com.powersync.sync.SyncStream @@ -136,6 +137,7 @@ private class SubscriptionImplementation( override val parameters: Map? = group.key.params?.let { TypedRow(it) } + @OptIn(ExperimentalPowerSyncAPI::class) override suspend fun waitForFirstSync() { group.tracker.db.waitForStatusMatching { it.forStream(this)?.subscription?.hasSynced == true } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 8ae3d48c..9bdc9354 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -1,5 +1,6 @@ package com.powersync.sync +import com.powersync.ExperimentalPowerSyncAPI import com.powersync.bucket.StreamPriority import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow @@ -119,12 +120,14 @@ public sealed class SyncStatusData { * This returns null when the database is currently being opened and we don't have reliable * information about included streams yet. */ + @ExperimentalPowerSyncAPI public val syncStreams: List? get() = internalSubscriptions?.map(this::exposeStreamStatus) /** * Status information for [stream], if it's a stream that is currently tracked by the sync * client. */ + @ExperimentalPowerSyncAPI public fun forStream(stream: SyncStreamDescription): SyncStreamStatus? { val raw = internalSubscriptions?.firstOrNull { it.name == stream.name && it.parameters == stream.parameters } ?: return null return exposeStreamStatus(raw) From 245d0ce6e0a4033f943103954c7d675446e85a5f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 23 Sep 2025 11:37:55 +0200 Subject: [PATCH 6/9] Revert SQLiter workaround --- core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index 270e542b..d138d337 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -24,7 +24,7 @@ internal class StreamTracker( suspend fun subscriptionsCommand(command: RustSubscriptionChangeRequest) { db.writeTransaction { tx -> - tx.get("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) {} + tx.execute("SELECT powersync_control(?,?)", listOf("subscriptions", jsonDontEncodeDefaults.encodeToString(command))) } db.resolveOfflineSyncStatusIfNotConnected() } From 2ee4fe2f4292e21f2c5aba51e98d433af507be97 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 23 Sep 2025 11:50:50 +0200 Subject: [PATCH 7/9] Fix unsubscribe doing nothing --- core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt index d138d337..200a410b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/StreamImpl.kt @@ -130,7 +130,7 @@ private class SubscriptionImplementation( group.refcount++ } - private var subscribed = false + private var subscribed = true override val name: String get() = group.key.name From 4f47cbe961c075ad7a973e2e76d72812d04a5abd Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 23 Sep 2025 12:07:36 +0200 Subject: [PATCH 8/9] Add sync stream hook --- .../com/powersync/compose/SyncStream.kt | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 compose/src/commonMain/kotlin/com/powersync/compose/SyncStream.kt diff --git a/compose/src/commonMain/kotlin/com/powersync/compose/SyncStream.kt b/compose/src/commonMain/kotlin/com/powersync/compose/SyncStream.kt new file mode 100644 index 00000000..834e037b --- /dev/null +++ b/compose/src/commonMain/kotlin/com/powersync/compose/SyncStream.kt @@ -0,0 +1,55 @@ +package com.powersync.compose + +import androidx.compose.runtime.Composable +import androidx.compose.runtime.LaunchedEffect +import androidx.compose.runtime.getValue +import androidx.compose.runtime.mutableStateOf +import androidx.compose.runtime.setValue +import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.PowerSyncDatabase +import com.powersync.bucket.StreamPriority +import com.powersync.sync.SyncStreamStatus +import com.powersync.sync.SyncStreamSubscription +import com.powersync.utils.JsonParam +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.awaitCancellation +import kotlinx.coroutines.withContext +import kotlin.time.Duration + +/** + * Creates a PowerSync stream subscription. The subscription is kept alive as long as this + * composable. When the composition is left, [SyncStreamSubscription.unsubscribe] is called + * + * For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams). + * + * @returns The status for that stream, or `null` if the stream is currently being resolved. + */ +@ExperimentalPowerSyncAPI +@Composable +public fun PowerSyncDatabase.composeSyncStream( + name: String, + parameters: Map? = null, + ttl: Duration? = null, + priority: StreamPriority? = null, +): SyncStreamStatus? { + val syncStatus by currentStatus.composeState() + var subscriptionHandle by mutableStateOf(null) + + LaunchedEffect(name, parameters) { + var sub: SyncStreamSubscription? = null + try { + sub = syncStream(name, parameters).subscribe(ttl, priority) + subscriptionHandle = sub + // Wait for the composable to unmount + awaitCancellation() + } finally { + withContext(NonCancellable) { + sub?.unsubscribe() + } + + subscriptionHandle = null + } + } + + return subscriptionHandle?.let { syncStatus.forStream(it) } +} From 77eb47c63dfc1ca7e8307ede2d30382819ff205f Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 23 Sep 2025 15:58:10 +0200 Subject: [PATCH 9/9] Add demo --- .../com/powersync/compose/SyncStream.kt | 9 +- demos/self-host-sync-streams/.gitignore | 1 + demos/self-host-sync-streams/README.md | 29 ++++ demos/self-host-sync-streams/build.gradle.kts | 56 ++++++++ .../java/com/powersync/demo/self_host/Main.kt | 89 ++++++++++++ .../demo/self_host/powersync/DemoConnector.kt | 117 +++++++++++++++ .../demo/self_host/powersync/Schema.kt | 51 +++++++ .../demo/self_host/powersync/database.kt | 32 +++++ .../powersync/demo/self_host/views/Lists.kt | 112 +++++++++++++++ .../powersync/demo/self_host/views/Todos.kt | 136 ++++++++++++++++++ .../integrations/sqldelight/todos.sq | 22 +++ .../iosApp/iosApp.xcodeproj/project.pbxproj | 4 +- .../integrations/room/RoomConnectionPool.kt | 3 +- settings.gradle.kts | 1 + 14 files changed, 655 insertions(+), 7 deletions(-) create mode 100644 demos/self-host-sync-streams/.gitignore create mode 100644 demos/self-host-sync-streams/README.md create mode 100644 demos/self-host-sync-streams/build.gradle.kts create mode 100644 demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/Main.kt create mode 100644 demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/DemoConnector.kt create mode 100644 demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/Schema.kt create mode 100644 demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/database.kt create mode 100644 demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/views/Lists.kt create mode 100644 demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/views/Todos.kt create mode 100644 demos/self-host-sync-streams/src/main/sqldelight/com/powersync/integrations/sqldelight/todos.sq diff --git a/compose/src/commonMain/kotlin/com/powersync/compose/SyncStream.kt b/compose/src/commonMain/kotlin/com/powersync/compose/SyncStream.kt index 834e037b..1cafadce 100644 --- a/compose/src/commonMain/kotlin/com/powersync/compose/SyncStream.kt +++ b/compose/src/commonMain/kotlin/com/powersync/compose/SyncStream.kt @@ -4,6 +4,7 @@ import androidx.compose.runtime.Composable import androidx.compose.runtime.LaunchedEffect import androidx.compose.runtime.getValue import androidx.compose.runtime.mutableStateOf +import androidx.compose.runtime.remember import androidx.compose.runtime.setValue import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase @@ -33,13 +34,13 @@ public fun PowerSyncDatabase.composeSyncStream( priority: StreamPriority? = null, ): SyncStreamStatus? { val syncStatus by currentStatus.composeState() - var subscriptionHandle by mutableStateOf(null) + val (resolvedHandle, changeHandle) = remember { mutableStateOf(null) } LaunchedEffect(name, parameters) { var sub: SyncStreamSubscription? = null try { sub = syncStream(name, parameters).subscribe(ttl, priority) - subscriptionHandle = sub + changeHandle(sub) // Wait for the composable to unmount awaitCancellation() } finally { @@ -47,9 +48,9 @@ public fun PowerSyncDatabase.composeSyncStream( sub?.unsubscribe() } - subscriptionHandle = null + changeHandle(null) } } - return subscriptionHandle?.let { syncStatus.forStream(it) } + return resolvedHandle?.let { syncStatus.forStream(it) } } diff --git a/demos/self-host-sync-streams/.gitignore b/demos/self-host-sync-streams/.gitignore new file mode 100644 index 00000000..42afabfd --- /dev/null +++ b/demos/self-host-sync-streams/.gitignore @@ -0,0 +1 @@ +/build \ No newline at end of file diff --git a/demos/self-host-sync-streams/README.md b/demos/self-host-sync-streams/README.md new file mode 100644 index 00000000..426f8258 --- /dev/null +++ b/demos/self-host-sync-streams/README.md @@ -0,0 +1,29 @@ +# SQLDelight + Sync streams demo + +This demo is a simple JVM todolist application using PowerSync. It differs from the +[main demo](../supabase-todolist) by: + +1. Using SQLDelight for database queries. +2. Using sync streams to sync items in each list on demand instead of upfront. +3. Again for simplicity, using a self-host demo instead of Supabase. + +## Start + +To start the self-hosted backend, run `docker compose up` in [this directory](https://github.com/powersync-ja/self-host-demo/tree/main/demos/nodejs). + +Also, to use sync streams, update the contents of [this file](https://github.com/powersync-ja/self-host-demo/blob/main/config/sync_rules.yaml) with + +```yaml +# Sync-rule docs: https://docs.powersync.com/usage/sync-rules +streams: + lists: + query: SELECT * FROM lists #WHERE owner_id = auth.user_id() + auto_subscribe: true + todos: + query: SELECT * FROM todos WHERE list_id = subscription.parameter('list') #AND list_id IN (SELECT id FROM lists WHERE owner_id = auth.user_id()) + +config: + edition: 2 +``` + +Afterwards, start the `:demos:self-host-sync-streams:run` task in Gradle. diff --git a/demos/self-host-sync-streams/build.gradle.kts b/demos/self-host-sync-streams/build.gradle.kts new file mode 100644 index 00000000..8e5111d4 --- /dev/null +++ b/demos/self-host-sync-streams/build.gradle.kts @@ -0,0 +1,56 @@ +plugins { + alias(libs.plugins.kotlin.jvm) + alias(libs.plugins.jetbrainsCompose) + alias(libs.plugins.kotlinSerialization) + alias(libs.plugins.compose.compiler) + alias(libs.plugins.sqldelight) + alias(libs.plugins.kotlinter) +} + +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +kotlin { + compilerOptions { + jvmTarget = org.jetbrains.kotlin.gradle.dsl.JvmTarget.JVM_17 + optIn.add("com.powersync.ExperimentalPowerSyncAPI") + } +} + +dependencies { + implementation(projects.core) + implementation(projects.integrations.sqldelight) + implementation(projects.compose) + + implementation(compose.desktop.currentOs) + implementation(compose.material) + implementation(compose.materialIconsExtended) + implementation(libs.ktor.client.core) + implementation(libs.ktor.client.okhttp) + implementation(libs.ktor.client.contentnegotiation) + implementation(libs.ktor.client.logging) + implementation(libs.ktor.serialization.json) + implementation(libs.sqldelight.coroutines) + implementation(libs.kmp.lifecycle.compose) +} + +sqldelight { + databases { + linkSqlite.set(false) + + create("TodoDatabase") { + packageName.set("com.powersync.integrations.sqldelight") + generateAsync.set(true) + deriveSchemaFromMigrations.set(false) + dialect(libs.sqldelight.dialect.sqlite38) + } + } +} + +compose.desktop { + application { + mainClass = "com.powersync.demo.self_host.MainKt" + } +} diff --git a/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/Main.kt b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/Main.kt new file mode 100644 index 00000000..9be2b417 --- /dev/null +++ b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/Main.kt @@ -0,0 +1,89 @@ +package com.powersync.demo.self_host + +import androidx.compose.foundation.layout.fillMaxWidth +import androidx.compose.foundation.layout.padding +import androidx.compose.material.LinearProgressIndicator +import androidx.compose.material.MaterialTheme +import androidx.compose.runtime.Composable +import androidx.compose.runtime.LaunchedEffect +import androidx.compose.runtime.getValue +import androidx.compose.runtime.mutableStateOf +import androidx.compose.runtime.remember +import androidx.compose.runtime.setValue +import androidx.compose.ui.Alignment +import androidx.compose.ui.Modifier +import androidx.compose.ui.unit.dp +import androidx.compose.ui.window.Window +import androidx.compose.ui.window.WindowPosition +import androidx.compose.ui.window.application +import androidx.compose.ui.window.rememberWindowState +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.loggerConfigInit +import co.touchlab.kermit.platformLogWriter +import com.powersync.DatabaseDriverFactory +import com.powersync.PowerSyncDatabase +import com.powersync.compose.composeSyncStream +import com.powersync.demo.self_host.powersync.DemoConnector +import com.powersync.demo.self_host.powersync.WithDatabase +import com.powersync.demo.self_host.powersync.schema +import com.powersync.demo.self_host.powersync.usePowerSync +import com.powersync.demo.self_host.views.Lists +import com.powersync.demo.self_host.views.Todos +import com.powersync.integrations.sqldelight.Lists +import com.powersync.sync.SyncOptions + +fun main() { + application { + Window( + onCloseRequest = ::exitApplication, + title = "Self-hosted todo app", + state = + rememberWindowState( + position = WindowPosition(alignment = Alignment.Center), + ), + ) { + MaterialTheme { + App() + } + } + } +} + +@Composable +fun App() { + val logger = + remember { + Logger( + loggerConfigInit( + platformLogWriter(), + minSeverity = Severity.Verbose, + ), + ) + } + val database = + remember { + PowerSyncDatabase( + DatabaseDriverFactory(), + schema, + dbFilename = "self_host_powersync.db", + logger = logger, + ) + } + LaunchedEffect(Unit) { + database.connect(DemoConnector(logger), options = SyncOptions(newClientImplementation = true)) + } + + WithDatabase(database) { + var selectedList by remember { mutableStateOf(null) } + val stream = usePowerSync().composeSyncStream("lists") + + if (stream?.subscription?.hasSynced == true) { + selectedList?.let { Todos(it, onBack = { selectedList = null }) } ?: Lists(onItemClicked = { list -> selectedList = list }) + } else { + LinearProgressIndicator( + modifier = Modifier.fillMaxWidth().padding(8.dp), + ) + } + } +} diff --git a/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/DemoConnector.kt b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/DemoConnector.kt new file mode 100644 index 00000000..461db06e --- /dev/null +++ b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/DemoConnector.kt @@ -0,0 +1,117 @@ +package com.powersync.demo.self_host.powersync + +import com.powersync.PowerSyncDatabase +import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.connectors.PowerSyncCredentials +import io.ktor.client.HttpClient +import io.ktor.client.call.body +import io.ktor.client.plugins.DefaultRequest +import io.ktor.client.plugins.contentnegotiation.ContentNegotiation +import io.ktor.client.plugins.logging.LogLevel +import io.ktor.client.plugins.logging.Logger +import io.ktor.client.plugins.logging.Logging +import io.ktor.client.request.get +import io.ktor.client.request.post +import io.ktor.client.request.setBody +import io.ktor.http.ContentType +import io.ktor.http.HttpHeaders +import io.ktor.http.HttpStatusCode +import io.ktor.http.contentType +import io.ktor.serialization.kotlinx.json.json +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject + +/** + * A PowerSync connector that talks to the [demo Node.JS backend](https://github.com/powersync-ja/self-host-demo/tree/main/demos/nodejs). + */ +class DemoConnector( + private val logger: co.touchlab.kermit.Logger, +) : PowerSyncBackendConnector() { + private val client = + HttpClient { + install(DefaultRequest) { + url("http://localhost:6060/") + } + + install(ContentNegotiation) { + json( + Json { + ignoreUnknownKeys = true + }, + ) + } + install(Logging) { + logger = + object : Logger { + override fun log(message: String) { + this@DemoConnector.logger.v { message } + } + } + level = LogLevel.HEADERS + filter { request -> + request.url.host.contains("ktor.io") + } + sanitizeHeader { header -> header == HttpHeaders.Authorization } + } + } + + override suspend fun fetchCredentials(): PowerSyncCredentials? { + @Serializable + data class Response( + val token: String, + ) + + val response = client.get("api/auth/token") + check(response.status == HttpStatusCode.OK) { + "Unexpected status code while fetching token: ${response.status}" + } + + return PowerSyncCredentials( + endpoint = "http://localhost:8080", + response.body().token, + ) + } + + override suspend fun uploadData(database: PowerSyncDatabase) { + @Serializable + data class Payload( + val op: String, + val table: String, + val id: String, + val data: JsonObject?, + ) + + @Serializable + data class RequestBatch( + val batch: List, + ) + + database.getCrudTransactions().collect { tx -> + val batch = + buildList { + for (operation in tx.crud) { + add( + Payload( + op = operation.op.toJson(), + table = operation.table, + id = operation.id, + data = operation.opData?.jsonValues, + ), + ) + } + } + + val response = + client.post("api/data") { + contentType(ContentType.Application.Json) + setBody(RequestBatch(batch)) + } + check(response.status == HttpStatusCode.OK) { + "Unexpected status code while upload crud tx: ${response.status}" + } + + tx.complete(null) + } + } +} diff --git a/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/Schema.kt b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/Schema.kt new file mode 100644 index 00000000..8504b74f --- /dev/null +++ b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/Schema.kt @@ -0,0 +1,51 @@ +package com.powersync.demo.self_host.powersync + +import com.powersync.db.schema.Column +import com.powersync.db.schema.Index +import com.powersync.db.schema.IndexedColumn +import com.powersync.db.schema.Schema +import com.powersync.db.schema.Table + +const val LISTS_TABLE = "lists" +const val TODOS_TABLE = "todos" + +val todos = + Table( + TODOS_TABLE, + listOf( + Column.text("created_at"), + Column.text("completed_at"), + Column.text("description"), + Column.text("created_by"), + Column.text("completed_by"), + // 0 or 1 to represent false or true + Column.integer("completed"), + Column.text("list_id"), + Column.text("photo_id"), + ), + indexes = + listOf( + Index( + name = "listid", + columns = listOf(IndexedColumn("list_id")), + ), + ), + ) + +val lists = + Table( + LISTS_TABLE, + listOf( + Column.text("created_at"), + Column.text("name"), + Column.text("owner_id"), + ), + ) + +val schema: Schema = + Schema( + listOf( + todos, + lists, + ), + ) diff --git a/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/database.kt b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/database.kt new file mode 100644 index 00000000..49bc1198 --- /dev/null +++ b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/powersync/database.kt @@ -0,0 +1,32 @@ +package com.powersync.demo.self_host.powersync + +import androidx.compose.runtime.Composable +import androidx.compose.runtime.CompositionLocalProvider +import androidx.compose.runtime.compositionLocalOf +import androidx.compose.runtime.remember +import androidx.compose.runtime.rememberCoroutineScope +import com.powersync.PowerSyncDatabase +import com.powersync.integrations.sqldelight.PowerSyncDriver +import com.powersync.integrations.sqldelight.TodoDatabase + +val PowerSync = compositionLocalOf { null } +val Database = compositionLocalOf { null } + +@Composable +fun WithDatabase( + powersync: PowerSyncDatabase, + inner: @Composable () -> Unit, +) { + val scope = rememberCoroutineScope() + val sqlDelight = remember(powersync) { TodoDatabase(PowerSyncDriver(powersync, scope)) } + + CompositionLocalProvider(PowerSync provides powersync, Database provides sqlDelight) { + inner() + } +} + +@Composable +fun usePowerSync(): PowerSyncDatabase = PowerSync.current!! + +@Composable +fun useTodoDatabase(): TodoDatabase = Database.current!! diff --git a/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/views/Lists.kt b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/views/Lists.kt new file mode 100644 index 00000000..3ba1dcf1 --- /dev/null +++ b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/views/Lists.kt @@ -0,0 +1,112 @@ +package com.powersync.demo.self_host.views + +import androidx.compose.foundation.VerticalScrollbar +import androidx.compose.foundation.clickable +import androidx.compose.foundation.layout.Box +import androidx.compose.foundation.layout.Column +import androidx.compose.foundation.layout.Row +import androidx.compose.foundation.layout.Spacer +import androidx.compose.foundation.layout.fillMaxHeight +import androidx.compose.foundation.layout.padding +import androidx.compose.foundation.layout.size +import androidx.compose.foundation.layout.width +import androidx.compose.foundation.lazy.LazyColumn +import androidx.compose.foundation.lazy.items +import androidx.compose.foundation.lazy.rememberLazyListState +import androidx.compose.foundation.rememberScrollbarAdapter +import androidx.compose.material.CircularProgressIndicator +import androidx.compose.material.Divider +import androidx.compose.material.ExperimentalMaterialApi +import androidx.compose.material.Text +import androidx.compose.runtime.Composable +import androidx.compose.runtime.collectAsState +import androidx.compose.runtime.getValue +import androidx.compose.runtime.remember +import androidx.compose.ui.Alignment +import androidx.compose.ui.Modifier +import androidx.compose.ui.text.AnnotatedString +import androidx.compose.ui.text.TextStyle +import androidx.compose.ui.text.style.TextOverflow +import androidx.compose.ui.unit.dp +import app.cash.sqldelight.coroutines.asFlow +import app.cash.sqldelight.coroutines.mapToList +import app.cash.sqldelight.coroutines.mapToOne +import com.powersync.compose.composeState +import com.powersync.demo.self_host.powersync.usePowerSync +import com.powersync.demo.self_host.powersync.useTodoDatabase +import com.powersync.integrations.sqldelight.EntrySummary +import com.powersync.integrations.sqldelight.Lists +import com.powersync.utils.JsonParam +import kotlinx.coroutines.Dispatchers + +@OptIn(ExperimentalMaterialApi::class) +@Composable +fun Lists(onItemClicked: (item: Lists) -> Unit) { + val db = useTodoDatabase() + val lists by db.todosQueries + .allLists() + .asFlow() + .mapToList(Dispatchers.IO) + .collectAsState(emptyList()) + + Box { + val listState = rememberLazyListState() + + LazyColumn(state = listState) { + items(lists) { item -> + ListItem( + item = item, + onClicked = { onItemClicked(item) }, + ) + + Divider() + } + } + + VerticalScrollbar( + modifier = Modifier.align(Alignment.CenterEnd).fillMaxHeight(), + adapter = rememberScrollbarAdapter(scrollState = listState), + ) + } +} + +@Composable +fun ListItem( + item: Lists, + onClicked: () -> Unit, +) { + val db = usePowerSync() + val stream = remember { db.syncStream("todos", mapOf("list" to JsonParam.String(item.id))) } + val streamStatus = + db.currentStatus + .composeState() + .value + .forStream(stream) + + Column(modifier = Modifier.clickable(onClick = onClicked).padding(16.dp)) { + Text( + text = AnnotatedString(item.name), + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + + if (streamStatus == null) { + Text( + text = AnnotatedString("Entries in this list are not synced - click to subscribe!"), + ) + } else if (!streamStatus.subscription.hasSynced) { + CircularProgressIndicator(modifier = Modifier.size(8.dp)) + } else { + val db = useTodoDatabase() + val stats by db.todosQueries + .entrySummary(item.id) + .asFlow() + .mapToOne(Dispatchers.IO) + .collectAsState(EntrySummary(0, 0)) + + Text( + text = AnnotatedString("${stats.completed_entries} completed, ${stats.pending_entries} pending items"), + ) + } + } +} diff --git a/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/views/Todos.kt b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/views/Todos.kt new file mode 100644 index 00000000..2656f882 --- /dev/null +++ b/demos/self-host-sync-streams/src/main/java/com/powersync/demo/self_host/views/Todos.kt @@ -0,0 +1,136 @@ +package com.powersync.demo.self_host.views + +import androidx.compose.foundation.VerticalScrollbar +import androidx.compose.foundation.layout.Box +import androidx.compose.foundation.layout.Row +import androidx.compose.foundation.layout.Spacer +import androidx.compose.foundation.layout.fillMaxHeight +import androidx.compose.foundation.layout.width +import androidx.compose.foundation.lazy.LazyColumn +import androidx.compose.foundation.lazy.items +import androidx.compose.foundation.lazy.rememberLazyListState +import androidx.compose.foundation.rememberScrollbarAdapter +import androidx.compose.material.Checkbox +import androidx.compose.material.Divider +import androidx.compose.material.Icon +import androidx.compose.material.IconButton +import androidx.compose.material.LinearProgressIndicator +import androidx.compose.material.MaterialTheme +import androidx.compose.material.Scaffold +import androidx.compose.material.Text +import androidx.compose.material.TopAppBar +import androidx.compose.material.icons.Icons +import androidx.compose.material.icons.automirrored.filled.ArrowBack +import androidx.compose.runtime.Composable +import androidx.compose.runtime.collectAsState +import androidx.compose.runtime.getValue +import androidx.compose.runtime.rememberCoroutineScope +import androidx.compose.ui.Alignment +import androidx.compose.ui.Modifier +import androidx.compose.ui.text.AnnotatedString +import androidx.compose.ui.text.style.TextOverflow +import androidx.compose.ui.unit.dp +import app.cash.sqldelight.coroutines.asFlow +import app.cash.sqldelight.coroutines.mapToList +import com.powersync.compose.composeSyncStream +import com.powersync.demo.self_host.powersync.usePowerSync +import com.powersync.demo.self_host.powersync.useTodoDatabase +import com.powersync.integrations.sqldelight.Lists +import com.powersync.integrations.sqldelight.Todos +import com.powersync.utils.JsonParam +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch + +@Composable +fun Todos( + list: Lists, + onBack: () -> Unit, +) { + val listId = list.id + val db = usePowerSync() + val stream = db.composeSyncStream("todos", mapOf("list" to JsonParam.String(listId))) + + Scaffold( + topBar = { + TopAppBar( + navigationIcon = { + IconButton(onClick = onBack) { + Icon( + imageVector = Icons.AutoMirrored.Filled.ArrowBack, + contentDescription = "Go back", + ) + } + }, + title = { Text(list.name) }, + ) + }, + content = { + if (stream?.subscription?.hasSynced != true) { + LinearProgressIndicator() + return@Scaffold + } + + val items by useTodoDatabase() + .todosQueries + .allEntries(listId) + .asFlow() + .mapToList(Dispatchers.IO) + .collectAsState(emptyList()) + + Box { + val listState = rememberLazyListState() + + LazyColumn(state = listState) { + items(items) { item -> + TodoItem(item) + Divider() + } + } + + VerticalScrollbar( + modifier = Modifier.align(Alignment.CenterEnd).fillMaxHeight(), + adapter = rememberScrollbarAdapter(scrollState = listState), + ) + } + }, + ) +} + +@Composable +fun TodoItem(item: Todos) { + val db = useTodoDatabase() + val scope = rememberCoroutineScope() + + Row { + Spacer(modifier = Modifier.width(8.dp)) + + Checkbox( + checked = item.completed == 1L, + modifier = Modifier.align(Alignment.CenterVertically), + onCheckedChange = { + scope.launch { + db.todosQueries.toggleEntry( + id = item.id, + completed = + if (item.completed == 0L) { + 1L + } else { + 0L + }, + ) + } + }, + ) + + Spacer(modifier = Modifier.width(8.dp)) + + Text( + text = AnnotatedString(item.description), + modifier = Modifier.weight(1F).align(Alignment.CenterVertically), + maxLines = 1, + overflow = TextOverflow.Ellipsis, + ) + + Spacer(modifier = Modifier.width(8.dp)) + } +} diff --git a/demos/self-host-sync-streams/src/main/sqldelight/com/powersync/integrations/sqldelight/todos.sq b/demos/self-host-sync-streams/src/main/sqldelight/com/powersync/integrations/sqldelight/todos.sq new file mode 100644 index 00000000..2368ba4f --- /dev/null +++ b/demos/self-host-sync-streams/src/main/sqldelight/com/powersync/integrations/sqldelight/todos.sq @@ -0,0 +1,22 @@ +CREATE TABLE lists ( + id TEXT NOT NULL PRIMARY KEY, + created_at TEXT NOT NULL, + name TEXT NOT NULL, + owner_id TEXT NOT NULL +); + +CREATE TABLE todos ( + id TEXT NOT NULL PRIMARY KEY, + description TEXT NOT NULL, + completed INTEGER NOT NULL, + list_id TEXT NOT NULL +); + +allLists: SELECT * FROM lists; +entrySummary: SELECT + (SELECT COUNT(*) FROM todos WHERE list_id = l.id AND completed == 1) AS completed_entries, + (SELECT COUNT(*) FROM todos WHERE list_id = l.id AND completed == 0) AS pending_entries +FROM lists l WHERE l.id = ?; + +allEntries: SELECT * FROM todos WHERE list_id = ?; +toggleEntry: UPDATE todos SET completed = ? WHERE id = ?; diff --git a/demos/supabase-todolist/iosApp/iosApp.xcodeproj/project.pbxproj b/demos/supabase-todolist/iosApp/iosApp.xcodeproj/project.pbxproj index 0b5842d5..57c0fb91 100644 --- a/demos/supabase-todolist/iosApp/iosApp.xcodeproj/project.pbxproj +++ b/demos/supabase-todolist/iosApp/iosApp.xcodeproj/project.pbxproj @@ -118,7 +118,7 @@ 7555FF79242A565900829871 /* Resources */, F85CB1118929364A9C6EFABC /* Frameworks */, 3C5ACF3A4AAFF294B2A5839B /* [CP] Embed Pods Frameworks */, - AA799A6E8997A58F1EF8CBFF /* [CP] Copy Pods Resources */, + 8DB30327041110C1BE1461E2 /* [CP] Copy Pods Resources */, ); buildRules = ( ); @@ -231,7 +231,7 @@ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; - AA799A6E8997A58F1EF8CBFF /* [CP] Copy Pods Resources */ = { + 8DB30327041110C1BE1461E2 /* [CP] Copy Pods Resources */ = { isa = PBXShellScriptBuildPhase; buildActionMask = 2147483647; files = ( diff --git a/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt b/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt index 3f8aad30..e102a16c 100644 --- a/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt +++ b/integrations/room/src/commonMain/kotlin/com/powersync/integrations/room/RoomConnectionPool.kt @@ -9,6 +9,7 @@ import androidx.sqlite.SQLiteStatement import com.powersync.db.driver.SQLiteConnectionLease import com.powersync.db.driver.SQLiteConnectionPool import com.powersync.db.schema.Schema +import io.ktor.serialization.kotlinx.json.DefaultJson import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharedFlow @@ -116,7 +117,7 @@ public class RoomConnectionPool( } private companion object { - val json = Json {} + val json = DefaultJson } } diff --git a/settings.gradle.kts b/settings.gradle.kts index c66d4964..178f8a1a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -49,5 +49,6 @@ include(":demos:supabase-todolist:androidBackgroundSync") include(":demos:supabase-todolist:desktopApp") include(":demos:supabase-todolist:iosApp") include(":demos:supabase-todolist:shared") +include(":demos:self-host-sync-streams") enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS")