Skip to content

Commit 0e9ffcc

Browse files
authored
Merge pull request #124 from powersync-ja/bucket-priorities
Bucket priorities
2 parents 90b557a + 0fd3bee commit 0e9ffcc

File tree

20 files changed

+991
-150
lines changed

20 files changed

+991
-150
lines changed

core/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ kotlin {
235235
implementation(libs.test.coroutines)
236236
implementation(libs.test.turbine)
237237
implementation(libs.kermit.test)
238+
implementation(libs.ktor.client.mock)
239+
implementation(libs.test.turbine)
238240
}
239241

240242
// We're putting the native libraries into our JAR, so integration tests for the JVM can run as part of the unit

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package com.powersync
22

33
import app.cash.turbine.turbineScope
4-
import com.powersync.db.SqlCursor
5-
import com.powersync.db.getString
6-
import com.powersync.db.schema.Column
74
import com.powersync.db.schema.Schema
8-
import com.powersync.db.schema.Table
5+
import com.powersync.testutils.UserRow
96
import kotlinx.coroutines.runBlocking
107
import kotlinx.coroutines.test.runTest
118
import kotlin.test.AfterTest
@@ -21,10 +18,7 @@ class DatabaseTest {
2118
database =
2219
PowerSyncDatabase(
2320
factory = com.powersync.testutils.factory,
24-
schema =
25-
Schema(
26-
Table(name = "users", columns = listOf(Column.text("name"), Column.text("email"))),
27-
),
21+
schema = Schema(UserRow.table),
2822
dbFilename = "testdb",
2923
)
3024

@@ -49,7 +43,7 @@ class DatabaseTest {
4943
fun testTableUpdates() =
5044
runTest {
5145
turbineScope {
52-
val query = database.watch("SELECT * FROM users") { User.from(it) }.testIn(this)
46+
val query = database.watch("SELECT * FROM users") { UserRow.from(it) }.testIn(this)
5347

5448
// Wait for initial query
5549
assertEquals(0, query.awaitItem().size)
@@ -92,19 +86,4 @@ class DatabaseTest {
9286
query.cancel()
9387
}
9488
}
95-
96-
private data class User(
97-
val id: String,
98-
val name: String,
99-
val email: String,
100-
) {
101-
companion object {
102-
fun from(cursor: SqlCursor): User =
103-
User(
104-
id = cursor.getString("id"),
105-
name = cursor.getString("name"),
106-
email = cursor.getString("email"),
107-
)
108-
}
109-
}
11089
}
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package com.powersync
2+
3+
import app.cash.turbine.turbineScope
4+
import co.touchlab.kermit.Logger
5+
import co.touchlab.kermit.Severity
6+
import co.touchlab.kermit.TestConfig
7+
import com.powersync.bucket.BucketChecksum
8+
import com.powersync.bucket.BucketPriority
9+
import com.powersync.bucket.Checkpoint
10+
import com.powersync.bucket.OpType
11+
import com.powersync.bucket.OplogEntry
12+
import com.powersync.connectors.PowerSyncBackendConnector
13+
import com.powersync.connectors.PowerSyncCredentials
14+
import com.powersync.db.PowerSyncDatabaseImpl
15+
import com.powersync.db.schema.Schema
16+
import com.powersync.sync.SyncLine
17+
import com.powersync.sync.SyncStream
18+
import com.powersync.testutils.MockSyncService
19+
import com.powersync.testutils.UserRow
20+
import com.powersync.testutils.cleanup
21+
import com.powersync.testutils.waitFor
22+
import com.powersync.utils.JsonUtil
23+
import dev.mokkery.answering.returns
24+
import dev.mokkery.everySuspend
25+
import dev.mokkery.mock
26+
import kotlinx.coroutines.CoroutineScope
27+
import kotlinx.coroutines.channels.Channel
28+
import kotlinx.coroutines.flow.receiveAsFlow
29+
import kotlinx.coroutines.runBlocking
30+
import kotlinx.coroutines.test.runTest
31+
import kotlinx.serialization.encodeToString
32+
import kotlinx.serialization.json.JsonObject
33+
import kotlin.test.AfterTest
34+
import kotlin.test.BeforeTest
35+
import kotlin.test.Test
36+
import kotlin.test.assertEquals
37+
import kotlin.test.assertFalse
38+
import kotlin.test.assertTrue
39+
import kotlin.time.Duration.Companion.seconds
40+
41+
@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class)
42+
class SyncIntegrationTest {
43+
private val logger =
44+
Logger(
45+
TestConfig(
46+
minSeverity = Severity.Debug,
47+
logWriterList = listOf(),
48+
),
49+
)
50+
private lateinit var database: PowerSyncDatabaseImpl
51+
private lateinit var connector: PowerSyncBackendConnector
52+
private lateinit var syncLines: Channel<SyncLine>
53+
54+
@BeforeTest
55+
fun setup() {
56+
cleanup("testdb")
57+
database = openDb()
58+
connector =
59+
mock<PowerSyncBackendConnector> {
60+
everySuspend { getCredentialsCached() } returns
61+
PowerSyncCredentials(
62+
token = "test-token",
63+
userId = "test-user",
64+
endpoint = "https://test.com",
65+
)
66+
67+
everySuspend { invalidateCredentials() } returns Unit
68+
}
69+
syncLines = Channel()
70+
71+
runBlocking {
72+
database.disconnectAndClear(true)
73+
}
74+
}
75+
76+
@AfterTest
77+
fun teardown() {
78+
cleanup("testdb")
79+
}
80+
81+
private fun openDb() =
82+
PowerSyncDatabase(
83+
factory = com.powersync.testutils.factory,
84+
schema = Schema(UserRow.table),
85+
dbFilename = "testdb",
86+
) as PowerSyncDatabaseImpl
87+
88+
private fun CoroutineScope.syncStream(): SyncStream {
89+
val client = MockSyncService.client(this, syncLines.receiveAsFlow())
90+
return SyncStream(
91+
bucketStorage = database.bucketStorage,
92+
connector = connector,
93+
httpEngine = client,
94+
uploadCrud = { },
95+
retryDelayMs = 10,
96+
logger = logger,
97+
params = JsonObject(emptyMap()),
98+
)
99+
}
100+
101+
private suspend fun expectUserCount(amount: Int) {
102+
val users = database.getAll("SELECT * FROM users;") { UserRow.from(it) }
103+
assertEquals(amount, users.size, "Expected $amount users, got $users")
104+
}
105+
106+
@Test
107+
fun testPartialSync() =
108+
runTest {
109+
val syncStream = syncStream()
110+
database.connect(syncStream, 1000L)
111+
112+
val checksums =
113+
buildList {
114+
for (prio in 0..3) {
115+
add(
116+
BucketChecksum(
117+
bucket = "bucket$prio",
118+
priority = BucketPriority(prio),
119+
checksum = 10 + prio,
120+
),
121+
)
122+
}
123+
}
124+
var operationId = 1
125+
126+
suspend fun pushData(priority: Int) {
127+
val id = operationId++
128+
129+
syncLines.send(
130+
SyncLine.SyncDataBucket(
131+
bucket = "bucket$priority",
132+
data =
133+
listOf(
134+
OplogEntry(
135+
checksum = (priority + 10).toLong(),
136+
data =
137+
JsonUtil.json.encodeToString(
138+
mapOf(
139+
"name" to "user $priority",
140+
"email" to "$priority@example.org",
141+
),
142+
),
143+
op = OpType.PUT,
144+
opId = id.toString(),
145+
rowId = "prio$priority",
146+
rowType = "users",
147+
),
148+
),
149+
after = null,
150+
nextAfter = null,
151+
),
152+
)
153+
}
154+
155+
turbineScope(timeout = 10.0.seconds) {
156+
val turbine = syncStream.status.asFlow().testIn(this)
157+
turbine.waitFor { it.connected }
158+
expectUserCount(0)
159+
160+
syncLines.send(
161+
SyncLine.FullCheckpoint(
162+
Checkpoint(
163+
lastOpId = "4",
164+
checksums = checksums,
165+
),
166+
),
167+
)
168+
169+
// Emit a partial sync complete for each priority but the last.
170+
for (priorityNo in 0..<3) {
171+
val priority = BucketPriority(priorityNo)
172+
pushData(priorityNo)
173+
syncLines.send(
174+
SyncLine.CheckpointPartiallyComplete(
175+
lastOpId = operationId.toString(),
176+
priority = priority,
177+
),
178+
)
179+
180+
turbine.waitFor { it.priorityStatusFor(priority).hasSynced == true }
181+
expectUserCount(priorityNo + 1)
182+
}
183+
184+
// Then complete the sync
185+
pushData(3)
186+
syncLines.send(
187+
SyncLine.CheckpointComplete(
188+
lastOpId = operationId.toString(),
189+
),
190+
)
191+
turbine.waitFor { it.hasSynced == true }
192+
expectUserCount(4)
193+
194+
turbine.cancel()
195+
}
196+
197+
syncLines.close()
198+
}
199+
200+
@Test
201+
fun testRemembersLastPartialSync() =
202+
runTest {
203+
val syncStream = syncStream()
204+
database.connect(syncStream, 1000L)
205+
206+
syncLines.send(
207+
SyncLine.FullCheckpoint(
208+
Checkpoint(
209+
lastOpId = "4",
210+
checksums = listOf(BucketChecksum(bucket = "bkt", priority = BucketPriority(1), checksum = 0)),
211+
),
212+
),
213+
)
214+
syncLines.send(
215+
SyncLine.CheckpointPartiallyComplete(
216+
lastOpId = "0",
217+
priority = BucketPriority(1),
218+
),
219+
)
220+
221+
database.waitForFirstSync(BucketPriority(1))
222+
database.close()
223+
224+
// Connect to the same database again
225+
database = openDb()
226+
assertFalse { database.currentStatus.hasSynced == true }
227+
assertTrue { database.currentStatus.priorityStatusFor(BucketPriority(1)).hasSynced == true }
228+
database.close()
229+
syncLines.close()
230+
}
231+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.powersync.testutils
2+
3+
import com.powersync.db.SqlCursor
4+
import com.powersync.db.getString
5+
import com.powersync.db.schema.Column
6+
import com.powersync.db.schema.Table
7+
8+
data class UserRow(
9+
val id: String,
10+
val name: String,
11+
val email: String,
12+
) {
13+
companion object {
14+
fun from(cursor: SqlCursor): UserRow =
15+
UserRow(
16+
id = cursor.getString("id"),
17+
name = cursor.getString("name"),
18+
email = cursor.getString("email"),
19+
)
20+
21+
val table = Table(name = "users", columns = listOf(Column.text("name"), Column.text("email")))
22+
}
23+
}

core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.powersync
22

3+
import com.powersync.bucket.BucketPriority
34
import com.powersync.connectors.PowerSyncBackendConnector
45
import com.powersync.db.Queries
56
import com.powersync.db.crud.CrudBatch
@@ -29,6 +30,13 @@ public interface PowerSyncDatabase : Queries {
2930
@Throws(PowerSyncException::class, CancellationException::class)
3031
public suspend fun waitForFirstSync()
3132

33+
/**
34+
* Suspend function that resolves when the first sync covering at least all buckets with the
35+
* given [priority] (or a higher one, since those would be synchronized first) has completed.
36+
*/
37+
@Throws(PowerSyncException::class, CancellationException::class)
38+
public suspend fun waitForFirstSync(priority: BucketPriority)
39+
3240
/**
3341
* Connect to the PowerSync service, and keep the databases in sync.
3442
*

core/src/commonMain/kotlin/com/powersync/bucket/BucketChecksum.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import kotlinx.serialization.Serializable
66
@Serializable
77
internal data class BucketChecksum(
88
val bucket: String,
9+
val priority: BucketPriority = BucketPriority.DEFAULT_PRIORITY,
910
val checksum: Int,
1011
val count: Int? = null,
1112
@SerialName("last_op_id") val lastOpId: String? = null,
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.powersync.bucket
2+
3+
import kotlinx.serialization.Serializable
4+
import kotlin.jvm.JvmInline
5+
6+
@JvmInline
7+
@Serializable
8+
public value class BucketPriority(
9+
private val priorityCode: Int,
10+
) : Comparable<BucketPriority> {
11+
init {
12+
require(priorityCode >= 0)
13+
}
14+
15+
override fun compareTo(other: BucketPriority): Int = other.priorityCode.compareTo(priorityCode)
16+
17+
public companion object {
18+
internal val FULL_SYNC_PRIORITY: BucketPriority = BucketPriority(Int.MAX_VALUE)
19+
20+
/**
21+
* The assumed priority for buckets when talking to older sync service instances that don't
22+
* support bucket priorities.
23+
*/
24+
internal val DEFAULT_PRIORITY: BucketPriority = BucketPriority(3)
25+
}
26+
}

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ internal interface BucketStorage {
2828

2929
suspend fun hasCompletedSync(): Boolean
3030

31-
suspend fun syncLocalDatabase(targetCheckpoint: Checkpoint): SyncLocalDatabaseResult
31+
suspend fun syncLocalDatabase(
32+
targetCheckpoint: Checkpoint,
33+
partialPriority: BucketPriority? = null,
34+
): SyncLocalDatabaseResult
3235

3336
fun setTargetCheckpoint(checkpoint: Checkpoint)
3437
}

0 commit comments

Comments
 (0)