Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ internal class BucketStorage(
return MAX_OP_ID
}

suspend fun getClientId(): String {
val id = db.getOptional("SELECT powersync_client_id() as client_id") {
it.getString(0)!!
}
return id ?: throw IllegalStateException("Client ID not found")
}

suspend fun hasCrud(): Boolean {
return db.queries.hasCrud().awaitAsOneOrNull() == 1L
}
Expand Down Expand Up @@ -140,7 +147,7 @@ internal class BucketStorage(
}

val completedSync = db.getOptional(
"SELECT name, last_applied_op FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1",
"SELECT powersync_last_synced_at()",
mapper = { cursor ->
cursor.getString(0)!!
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.datetime.Instant
import kotlinx.datetime.toLocalDateTime
import kotlinx.serialization.encodeToString

/**
Expand Down Expand Up @@ -67,8 +69,10 @@ internal class PowerSyncDatabaseImpl(
runBlocking {
val sqliteVersion = internalDb.queries.sqliteVersion().awaitAsOne()
logger.d { "SQLiteVersion: $sqliteVersion" }
checkVersion()
logger.d { "PowerSyncVersion: ${getPowerSyncVersion()}" }
applySchema()
updateHasSynced()
}
}

Expand Down Expand Up @@ -105,6 +109,7 @@ internal class PowerSyncDatabaseImpl(
connecting = it.connecting,
downloading = it.downloading,
lastSyncedAt = it.lastSyncedAt,
hasSynced = it.hasSynced,
uploadError = it.uploadError,
downloadError = it.downloadError,
clearDownloadError = it.downloadError == null,
Expand Down Expand Up @@ -247,12 +252,12 @@ internal class PowerSyncDatabaseImpl(

if (writeCheckpoint != null && bucketStorage.hasCrud()) {
tx.execute(
"UPDATE ps_buckets SET target_op = ? WHERE name='\$local'",
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
listOf(writeCheckpoint),
)
} else {
tx.execute(
"UPDATE ps_buckets SET target_op = ? WHERE name='\$local'",
"UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'",
listOf(bucketStorage.getMaxOpId()),
)
}
Expand All @@ -273,7 +278,7 @@ internal class PowerSyncDatabaseImpl(
syncStream = null
}

currentStatus.update(connected = false, connecting = false)
currentStatus.update(connected = false, connecting = false, lastSyncedAt = currentStatus.lastSyncedAt)
}

override suspend fun disconnectAndClear(clearLocal: Boolean) {
Expand All @@ -293,4 +298,43 @@ internal class PowerSyncDatabaseImpl(
}
}
}

private suspend fun updateHasSynced() {
// Query the database to see if any data has been synced.
val timestamp = internalDb.getOptional("SELECT powersync_last_synced_at() as synced_at", null) { cursor ->
cursor.getString(0)!!
}

val hasSynced = timestamp != null
if (hasSynced != currentStatus.hasSynced) {
val formattedDateTime = "${timestamp!!.replace(" ","T").toLocalDateTime()}Z"
val lastSyncedAt = Instant.parse(formattedDateTime)
currentStatus.update(hasSynced = hasSynced, lastSyncedAt = lastSyncedAt)
}
}

/**
* Check that a supported version of the powersync extension is loaded.
*/
private suspend fun checkVersion() {
val version: String = try {
getPowerSyncVersion()
} catch (e: Exception) {
throw Exception("The powersync extension is not loaded correctly. Details: $e")
}

// Parse version
val versionInts: List<Int> = try {
version.split(Regex("[./]"))
.take(3)
.map { it.toInt() }
} catch (e: Exception) {
throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $version. Details: $e")
}

// Validate ^0.2.0
if (versionInts[0] != 0 || versionInts[1] != 2 || versionInts[2] < 0) {
throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $version")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package com.powersync.sync
import com.powersync.bucket.BucketRequest
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonObject

@Serializable
internal data class StreamingSyncRequest(
val buckets: List<BucketRequest>,
@SerialName("include_checksum") val includeChecksum: Boolean = true,
@SerialName("client_id") val clientId: String,
val parameters: JsonObject = JsonObject(mapOf())
) {
@SerialName("raw_data") private val rawData: Boolean = true
}
}
44 changes: 14 additions & 30 deletions core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ public interface SyncStatusData {
*/
public val lastSyncedAt: Instant?

/**
* Indicates whether there has been at least one full sync, if any.
*
* Is null when unknown, for example when state is still being loaded from the database.
*/
public val hasSynced: Boolean?

/**
* Error during uploading.
*
Expand All @@ -66,27 +73,12 @@ internal data class SyncStatusDataContainer(
override val downloading: Boolean = false,
override val uploading: Boolean = false,
override val lastSyncedAt: Instant? = null,
override val hasSynced: Boolean? = null,
override val uploadError: Any? = null,
override val downloadError: Any? = null,
) : SyncStatusData {
override val anyError
get() = downloadError ?: uploadError

/**
* Builder for creating a new SyncStatusDataContainer with updated values.
*/
class Builder(private var data: SyncStatusDataContainer) {
fun connected(value: Boolean) = apply { data = data.copy(connected = value) }
fun connecting(value: Boolean) = apply { data = data.copy(connecting = value) }
fun downloading(value: Boolean) = apply { data = data.copy(downloading = value) }
fun uploading(value: Boolean) = apply { data = data.copy(uploading = value) }
fun lastSyncedAt(value: Instant?) = apply { data = data.copy(lastSyncedAt = value) }
fun uploadError(value: Any?) = apply { data = data.copy(uploadError = value) }
fun downloadError(value: Any?) = apply { data = data.copy(downloadError = value) }
fun clearUploadError() = apply { data = data.copy(uploadError = null) }
fun clearDownloadError() = apply { data = data.copy(downloadError = null) }
fun build() = data
}
}


Expand All @@ -102,19 +94,6 @@ public data class SyncStatus internal constructor(
return stateFlow.asSharedFlow()
}

/**
* Updates the internal sync status indicators and emits Flow updates
* Usage:
* ```
* syncStatus.update {
* connected(true)
* }
*/
internal fun update(builder: SyncStatusDataContainer.Builder.() -> Unit) {
data = SyncStatusDataContainer.Builder(data).apply(builder).build()
stateFlow.value = data
}

/**
* Updates the internal sync status indicators and emits Flow updates
*/
Expand All @@ -123,6 +102,7 @@ public data class SyncStatus internal constructor(
connecting: Boolean = data.connecting,
downloading: Boolean = data.downloading,
uploading: Boolean = data.uploading,
hasSynced: Boolean? = data.hasSynced,
lastSyncedAt: Instant? = data.lastSyncedAt,
uploadError: Any? = data.uploadError,
downloadError: Any? = data.downloadError,
Expand All @@ -135,6 +115,7 @@ public data class SyncStatus internal constructor(
downloading = downloading,
uploading = uploading,
lastSyncedAt = lastSyncedAt,
hasSynced = hasSynced,
uploadError = if (clearUploadError == true) null else uploadError,
downloadError = if (clearDownloadError == true) null else downloadError,
)
Expand All @@ -159,14 +140,17 @@ public data class SyncStatus internal constructor(
override val lastSyncedAt: Instant?
get() = data.lastSyncedAt

override val hasSynced: Boolean?
get() = data.hasSynced

override val uploadError: Any?
get() = data.uploadError

override val downloadError: Any?
get() = data.downloadError

override fun toString(): String {
return "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, error=$anyError)"
return "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, hasSynced: $hasSynced, error=$anyError)"
}

public companion object {
Expand Down
6 changes: 5 additions & 1 deletion core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ internal class SyncStream(
*/
var status = SyncStatus()

private var clientId: String? = null

private val httpClient: HttpClient = HttpClient {
install(HttpTimeout)
install(ContentNegotiation)
Expand Down Expand Up @@ -84,6 +86,7 @@ internal class SyncStream(

suspend fun streamingSync() {
var invalidCredentials = false
clientId = bucketStorage.getClientId()
while (true) {
status.update(connecting = true)
try {
Expand Down Expand Up @@ -164,7 +167,7 @@ internal class SyncStream(
private suspend fun getWriteCheckpoint(): String {
val credentials = connector.getCredentialsCached()
require(credentials != null) { "Not logged in" }
val uri = credentials.endpointUri("write-checkpoint2.json")
val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId'")

val response = httpClient.get(uri) {
contentType(ContentType.Application.Json)
Expand Down Expand Up @@ -242,6 +245,7 @@ internal class SyncStream(

val req = StreamingSyncRequest(
buckets = initialBuckets.map { (bucket, after) -> BucketRequest(bucket, after) },
clientId = clientId!!
)

streamingSyncRequest(req).collect { value ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import com.powersync.bucket.BucketRequest
import com.powersync.sync.StreamingSyncRequest
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.JsonPrimitive
import kotlin.test.*

class StreamingSyncRequestTest {

private val json = Json { ignoreUnknownKeys = true }

@Test
fun testSerialization() {
val request = StreamingSyncRequest(
buckets = listOf(BucketRequest("table1", "op1"), BucketRequest("table2", "op2")),
includeChecksum = true,
clientId = "client123",
parameters = JsonObject(mapOf("param1" to JsonPrimitive("value1")))
)

val serialized = json.encodeToString(request)

val expected = """
{"buckets":[{"name":"table1","after":"op1"},{"name":"table2","after":"op2"}],"client_id":"client123","parameters":{"param1":"value1"}}
""".trimIndent().replace("\n", "")

assertEquals(expected, serialized)
}

@Test
fun testDeserialization() {
val jsonString = """
{
"buckets": [{"name": "table1", "after": "op1"}, {"name": "table2", "after": "op2"}],
"include_checksum": false,
"client_id": "client456",
"parameters": {"param2": "value2"},
"raw_data": true
}
""".trimIndent()

val deserialized = json.decodeFromString<StreamingSyncRequest>(jsonString)

assertEquals(2, deserialized.buckets.size)
assertEquals("table1", deserialized.buckets[0].name)
assertEquals("op1", deserialized.buckets[0].after)
assertEquals("table2", deserialized.buckets[1].name)
assertEquals("op2", deserialized.buckets[1].after)
assertFalse(deserialized.includeChecksum)
assertEquals("client456", deserialized.clientId)
assertEquals(JsonPrimitive("value2"), deserialized.parameters?.get("param2"))
}

@Test
fun testDefaultValues() {
val request = StreamingSyncRequest(
buckets = listOf(),
clientId = "client789"
)

assertTrue(request.includeChecksum)
assertEquals(request.parameters, JsonObject(mapOf()))

val serialized = json.encodeToString(request)
val expected = """
{"buckets":[],"client_id":"client789"}
""".trimIndent().replace("\n", "")
assertEquals(serialized, expected)
}
}
2 changes: 1 addition & 1 deletion demos/supabase-todolist/gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ android-compileSdk = "34"
java = "17"

# Dependencies
powersync-core = "0.1.6"
powersync-core = "0.2.1"

kotlin = "1.9.23"
coroutines = "1.8.0"
Expand Down
18 changes: 0 additions & 18 deletions demos/supabase-todolist/iosApp/iosApp.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
7555FF79242A565900829871 /* Resources */,
F85CB1118929364A9C6EFABC /* Frameworks */,
3C5ACF3A4AAFF294B2A5839B /* [CP] Embed Pods Frameworks */,
3114824168D992F9E0D6D9F4 /* [CP] Copy Pods Resources */,
);
buildRules = (
);
Expand Down Expand Up @@ -193,23 +192,6 @@
shellPath = /bin/sh;
shellScript = "cd \"$SRCROOT/..\"\n./gradlew :shared:embedAndSignAppleFrameworkForXcode\n";
};
3114824168D992F9E0D6D9F4 /* [CP] Copy Pods Resources */ = {
isa = PBXShellScriptBuildPhase;
buildActionMask = 2147483647;
files = (
);
inputFileListPaths = (
"${PODS_ROOT}/Target Support Files/Pods-iosApp/Pods-iosApp-resources-${CONFIGURATION}-input-files.xcfilelist",
);
name = "[CP] Copy Pods Resources";
outputFileListPaths = (
"${PODS_ROOT}/Target Support Files/Pods-iosApp/Pods-iosApp-resources-${CONFIGURATION}-output-files.xcfilelist",
);
runOnlyForDeploymentPostprocessing = 0;
shellPath = /bin/sh;
shellScript = "\"${PODS_ROOT}/Target Support Files/Pods-iosApp/Pods-iosApp-resources.sh\"\n";
showEnvVarsInLog = 0;
};
3C5ACF3A4AAFF294B2A5839B /* [CP] Embed Pods Frameworks */ = {
isa = PBXShellScriptBuildPhase;
buildActionMask = 2147483647;
Expand Down
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ idea = "222.4459.24" # Flamingo | 2022.2.1 (see https://plugins.jetbrains.com/do

# Dependencies
kermit = "2.0.4"
kotlin = "1.9.23"
kotlin = "1.9.24"
coroutines = "1.8.0"
kotlinx-datetime = "0.5.0"
kotlinx-io = "0.3.0"
ktor = "2.3.10"
uuid = "0.8.2"
powersync-core = "0.1.6"
powersync-core = "0.2.1"
sqlite-android = "3.45.0"

sqlDelight = "2.0.2"
Expand Down
Loading