Skip to content

Commit d6002ca

Browse files
authored
Sync streams (#257)
1 parent 4075302 commit d6002ca

File tree

25 files changed

+899
-157
lines changed

25 files changed

+899
-157
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import com.powersync.PowerSyncDatabase
77
import com.powersync.PowerSyncException
88
import com.powersync.TestConnector
99
import com.powersync.bucket.BucketChecksum
10-
import com.powersync.bucket.BucketPriority
1110
import com.powersync.bucket.Checkpoint
1211
import com.powersync.bucket.OpType
1312
import com.powersync.bucket.OplogEntry
13+
import com.powersync.bucket.StreamPriority
1414
import com.powersync.bucket.WriteCheckpointData
1515
import com.powersync.bucket.WriteCheckpointResponse
1616
import com.powersync.connectors.PowerSyncBackendConnector
@@ -165,7 +165,7 @@ abstract class BaseSyncIntegrationTest(
165165
add(
166166
BucketChecksum(
167167
bucket = "bucket$prio",
168-
priority = BucketPriority(prio),
168+
priority = StreamPriority(prio),
169169
checksum = 10 + prio,
170170
),
171171
)
@@ -218,7 +218,7 @@ abstract class BaseSyncIntegrationTest(
218218

219219
// Emit a partial sync complete for each priority but the last.
220220
for (priorityNo in 0..<3) {
221-
val priority = BucketPriority(priorityNo)
221+
val priority = StreamPriority(priorityNo)
222222
pushData(priorityNo)
223223
syncLines.send(
224224
SyncLine.CheckpointPartiallyComplete(
@@ -258,7 +258,7 @@ abstract class BaseSyncIntegrationTest(
258258
listOf(
259259
BucketChecksum(
260260
bucket = "bkt",
261-
priority = BucketPriority(1),
261+
priority = StreamPriority(1),
262262
checksum = 0,
263263
),
264264
),
@@ -268,17 +268,17 @@ abstract class BaseSyncIntegrationTest(
268268
syncLines.send(
269269
SyncLine.CheckpointPartiallyComplete(
270270
lastOpId = "0",
271-
priority = BucketPriority(1),
271+
priority = StreamPriority(1),
272272
),
273273
)
274274

275-
database.waitForFirstSync(BucketPriority(1))
275+
database.waitForFirstSync(StreamPriority(1))
276276
database.close()
277277

278278
// Connect to the same database again
279279
database = openDatabaseAndInitialize()
280280
database.currentStatus.hasSynced shouldBe false
281-
database.currentStatus.statusForPriority(BucketPriority(1)).hasSynced shouldBe true
281+
database.currentStatus.statusForPriority(StreamPriority(1)).hasSynced shouldBe true
282282
}
283283

284284
@Test

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package com.powersync.sync
33
import app.cash.turbine.ReceiveTurbine
44
import app.cash.turbine.turbineScope
55
import com.powersync.bucket.BucketChecksum
6-
import com.powersync.bucket.BucketPriority
76
import com.powersync.bucket.Checkpoint
87
import com.powersync.bucket.OpType
98
import com.powersync.bucket.OplogEntry
9+
import com.powersync.bucket.StreamPriority
1010
import com.powersync.testutils.ActiveDatabaseTest
1111
import com.powersync.testutils.databaseTest
1212
import com.powersync.testutils.waitFor
@@ -35,7 +35,7 @@ abstract class BaseSyncProgressTest(
3535
private fun bucket(
3636
name: String,
3737
count: Int,
38-
priority: BucketPriority = BucketPriority(3),
38+
priority: StreamPriority = StreamPriority(3),
3939
): BucketChecksum =
4040
BucketChecksum(
4141
bucket = name,
@@ -68,7 +68,7 @@ abstract class BaseSyncProgressTest(
6868
)
6969
}
7070

71-
private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: BucketPriority? = null) {
71+
private suspend fun ActiveDatabaseTest.addCheckpointComplete(priority: StreamPriority? = null) {
7272
if (priority != null) {
7373
syncLines.send(
7474
SyncLine.CheckpointPartiallyComplete(
@@ -93,7 +93,7 @@ abstract class BaseSyncProgressTest(
9393

9494
private suspend fun ReceiveTurbine<SyncStatusData>.expectProgress(
9595
total: Pair<Int, Int>,
96-
priorities: Map<BucketPriority, Pair<Int, Int>> = emptyMap(),
96+
priorities: Map<StreamPriority, Pair<Int, Int>> = emptyMap(),
9797
) {
9898
val item = awaitItem()
9999
val progress = item.downloadProgress ?: error("Expected download progress on $item")
@@ -357,7 +357,7 @@ abstract class BaseSyncProgressTest(
357357
) {
358358
turbine.expectProgress(
359359
prio2,
360-
mapOf(BucketPriority(0) to prio0, BucketPriority(2) to prio2),
360+
mapOf(StreamPriority(0) to prio0, StreamPriority(2) to prio2),
361361
)
362362
}
363363

@@ -367,8 +367,8 @@ abstract class BaseSyncProgressTest(
367367
lastOpId = "10",
368368
checksums =
369369
listOf(
370-
bucket("a", 5, BucketPriority(0)),
371-
bucket("b", 5, BucketPriority(2)),
370+
bucket("a", 5, StreamPriority(0)),
371+
bucket("b", 5, StreamPriority(2)),
372372
),
373373
),
374374
),
@@ -378,7 +378,7 @@ abstract class BaseSyncProgressTest(
378378
addDataLine("a", 5)
379379
expectProgress(5 to 5, 5 to 10)
380380

381-
addCheckpointComplete(BucketPriority(0))
381+
addCheckpointComplete(StreamPriority(0))
382382
expectProgress(5 to 5, 5 to 10)
383383

384384
addDataLine("b", 2)
@@ -390,8 +390,8 @@ abstract class BaseSyncProgressTest(
390390
lastOpId = "14",
391391
updatedBuckets =
392392
listOf(
393-
bucket("a", 8, BucketPriority(0)),
394-
bucket("b", 6, BucketPriority(2)),
393+
bucket("a", 8, StreamPriority(0)),
394+
bucket("b", 6, StreamPriority(2)),
395395
),
396396
removedBuckets = emptyList(),
397397
),

0 commit comments

Comments
 (0)