Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions compose/src/commonMain/kotlin/com/powersync/compose/SyncStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.powersync.compose

import androidx.compose.runtime.Composable
import androidx.compose.runtime.LaunchedEffect
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.runtime.setValue
import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.PowerSyncDatabase
import com.powersync.bucket.StreamPriority
import com.powersync.sync.SyncStreamStatus
import com.powersync.sync.SyncStreamSubscription
import com.powersync.utils.JsonParam
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.withContext
import kotlin.time.Duration

/**
* Creates a PowerSync stream subscription. The subscription is kept alive as long as this
* composable. When the composition is left, [SyncStreamSubscription.unsubscribe] is called
*
* For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams).
*
* @returns The status for that stream, or `null` if the stream is currently being resolved.
*/
@ExperimentalPowerSyncAPI
@Composable
public fun PowerSyncDatabase.composeSyncStream(
name: String,
parameters: Map<String, JsonParam>? = null,
ttl: Duration? = null,
priority: StreamPriority? = null,
): SyncStreamStatus? {
val syncStatus by currentStatus.composeState()
val (resolvedHandle, changeHandle) = remember { mutableStateOf<SyncStreamSubscription?>(null) }

LaunchedEffect(name, parameters) {
var sub: SyncStreamSubscription? = null
try {
sub = syncStream(name, parameters).subscribe(ttl, priority)
changeHandle(sub)
// Wait for the composable to unmount
awaitCancellation()
} finally {
withContext(NonCancellable) {
sub?.unsubscribe()
}

changeHandle(null)
}
}

return resolvedHandle?.let { syncStatus.forStream(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import com.powersync.PowerSyncDatabase
import com.powersync.PowerSyncException
import com.powersync.TestConnector
import com.powersync.bucket.BucketChecksum
import com.powersync.bucket.BucketPriority
import com.powersync.bucket.Checkpoint
import com.powersync.bucket.OpType
import com.powersync.bucket.OplogEntry
import com.powersync.bucket.StreamPriority
import com.powersync.bucket.WriteCheckpointData
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.connectors.PowerSyncBackendConnector
Expand Down Expand Up @@ -165,7 +165,7 @@ abstract class BaseSyncIntegrationTest(
add(
BucketChecksum(
bucket = "bucket$prio",
priority = BucketPriority(prio),
priority = StreamPriority(prio),
checksum = 10 + prio,
),
)
Expand Down Expand Up @@ -218,7 +218,7 @@ abstract class BaseSyncIntegrationTest(

// Emit a partial sync complete for each priority but the last.
for (priorityNo in 0..<3) {
val priority = BucketPriority(priorityNo)
val priority = StreamPriority(priorityNo)
pushData(priorityNo)
syncLines.send(
SyncLine.CheckpointPartiallyComplete(
Expand Down Expand Up @@ -258,7 +258,7 @@ abstract class BaseSyncIntegrationTest(
listOf(
BucketChecksum(
bucket = "bkt",
priority = BucketPriority(1),
priority = StreamPriority(1),
checksum = 0,
),
),
Expand All @@ -268,17 +268,17 @@ abstract class BaseSyncIntegrationTest(
syncLines.send(
SyncLine.CheckpointPartiallyComplete(
lastOpId = "0",
priority = BucketPriority(1),
priority = StreamPriority(1),
),
)

database.waitForFirstSync(BucketPriority(1))
database.waitForFirstSync(StreamPriority(1))
database.close()

// Connect to the same database again
database = openDatabaseAndInitialize()
database.currentStatus.hasSynced shouldBe false
database.currentStatus.statusForPriority(BucketPriority(1)).hasSynced shouldBe true
database.currentStatus.statusForPriority(StreamPriority(1)).hasSynced shouldBe true
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package com.powersync.sync
import app.cash.turbine.ReceiveTurbine
import app.cash.turbine.turbineScope
import com.powersync.bucket.BucketChecksum
import com.powersync.bucket.BucketPriority
import com.powersync.bucket.Checkpoint
import com.powersync.bucket.OpType
import com.powersync.bucket.OplogEntry
import com.powersync.bucket.StreamPriority
import com.powersync.testutils.ActiveDatabaseTest
import com.powersync.testutils.databaseTest
import com.powersync.testutils.waitFor
Expand Down Expand Up @@ -35,7 +35,7 @@ abstract class BaseSyncProgressTest(
private fun bucket(
name: String,
count: Int,
priority: BucketPriority = BucketPriority(3),
priority: StreamPriority = StreamPriority(3),
): BucketChecksum =
BucketChecksum(
bucket = name,
Expand Down Expand Up @@ -68,7 +68,7 @@ abstract class BaseSyncProgressTest(
)
}

private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) {
private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: StreamPriority? = null) {
if (priority != null) {
syncLines.send(
SyncLine.CheckpointPartiallyComplete(
Expand All @@ -93,7 +93,7 @@ abstract class BaseSyncProgressTest(

private suspend fun ReceiveTurbine<SyncStatusData>.expectProgress(
total: Pair<Int, Int>,
priorities: Map<BucketPriority, Pair<Int, Int>> = emptyMap(),
priorities: Map<StreamPriority, Pair<Int, Int>> = emptyMap(),
) {
val item = awaitItem()
val progress = item.downloadProgress ?: error("Expected download progress on $item")
Expand Down Expand Up @@ -357,7 +357,7 @@ abstract class BaseSyncProgressTest(
) {
turbine.expectProgress(
prio2,
mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2),
mapOf(StreamPriority(0) to prio0, StreamPriority(2) to prio2),
)
}

Expand All @@ -367,8 +367,8 @@ abstract class BaseSyncProgressTest(
lastOpId = "10",
checksums =
listOf(
bucket("a", 5, BucketPriority(0)),
bucket("b", 5, BucketPriority(2)),
bucket("a", 5, StreamPriority(0)),
bucket("b", 5, StreamPriority(2)),
),
),
),
Expand All @@ -378,7 +378,7 @@ abstract class BaseSyncProgressTest(
addDataLine("a", 5)
expectProgress(5 to 5, 5 to 10)

addCheckpointComplete(BucketPriority(0))
addCheckpointComplete(StreamPriority(0))
expectProgress(5 to 5, 5 to 10)

addDataLine("b", 2)
Expand All @@ -390,8 +390,8 @@ abstract class BaseSyncProgressTest(
lastOpId = "14",
updatedBuckets =
listOf(
bucket("a", 8, BucketPriority(0)),
bucket("b", 6, BucketPriority(2)),
bucket("a", 8, StreamPriority(0)),
bucket("b", 6, StreamPriority(2)),
),
removedBuckets = emptyList(),
),
Expand Down
Loading