diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 5e5e0728..443743a9 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -44,6 +44,13 @@ internal class BucketStorage( return MAX_OP_ID } + suspend fun getClientId(): String { + val id = db.getOptional("SELECT powersync_client_id() as client_id") { + it.getString(0)!! + } + return id ?: throw IllegalStateException("Client ID not found") + } + suspend fun hasCrud(): Boolean { return db.queries.hasCrud().awaitAsOneOrNull() == 1L } @@ -140,7 +147,7 @@ internal class BucketStorage( } val completedSync = db.getOptional( - "SELECT name, last_applied_op FROM ps_buckets WHERE last_applied_op > 0 LIMIT 1", + "SELECT powersync_last_synced_at()", mapper = { cursor -> cursor.getString(0)!! }) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 53842877..6cee8ffc 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -30,6 +30,8 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.datetime.Instant +import kotlinx.datetime.toLocalDateTime import kotlinx.serialization.encodeToString /** @@ -67,8 +69,10 @@ internal class PowerSyncDatabaseImpl( runBlocking { val sqliteVersion = internalDb.queries.sqliteVersion().awaitAsOne() logger.d { "SQLiteVersion: $sqliteVersion" } + checkVersion() logger.d { "PowerSyncVersion: ${getPowerSyncVersion()}" } applySchema() + updateHasSynced() } } @@ -105,6 +109,7 @@ internal class PowerSyncDatabaseImpl( connecting = it.connecting, downloading = it.downloading, lastSyncedAt = it.lastSyncedAt, + hasSynced = it.hasSynced, uploadError = it.uploadError, downloadError = it.downloadError, clearDownloadError = it.downloadError == null, @@ -247,12 +252,12 @@ internal class PowerSyncDatabaseImpl( if (writeCheckpoint != null && bucketStorage.hasCrud()) { tx.execute( - "UPDATE ps_buckets SET target_op = ? WHERE name='\$local'", + "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'", listOf(writeCheckpoint), ) } else { tx.execute( - "UPDATE ps_buckets SET target_op = ? WHERE name='\$local'", + "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='\$local'", listOf(bucketStorage.getMaxOpId()), ) } @@ -273,7 +278,7 @@ internal class PowerSyncDatabaseImpl( syncStream = null } - currentStatus.update(connected = false, connecting = false) + currentStatus.update(connected = false, connecting = false, lastSyncedAt = currentStatus.lastSyncedAt) } override suspend fun disconnectAndClear(clearLocal: Boolean) { @@ -293,4 +298,43 @@ internal class PowerSyncDatabaseImpl( } } } + + private suspend fun updateHasSynced() { + // Query the database to see if any data has been synced. + val timestamp = internalDb.getOptional("SELECT powersync_last_synced_at() as synced_at", null) { cursor -> + cursor.getString(0)!! + } + + val hasSynced = timestamp != null + if (hasSynced != currentStatus.hasSynced) { + val formattedDateTime = "${timestamp!!.replace(" ","T").toLocalDateTime()}Z" + val lastSyncedAt = Instant.parse(formattedDateTime) + currentStatus.update(hasSynced = hasSynced, lastSyncedAt = lastSyncedAt) + } + } + + /** + * Check that a supported version of the powersync extension is loaded. + */ + private suspend fun checkVersion() { + val version: String = try { + getPowerSyncVersion() + } catch (e: Exception) { + throw Exception("The powersync extension is not loaded correctly. Details: $e") + } + + // Parse version + val versionInts: List = try { + version.split(Regex("[./]")) + .take(3) + .map { it.toInt() } + } catch (e: Exception) { + throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $version. Details: $e") + } + + // Validate ^0.2.0 + if (versionInts[0] != 0 || versionInts[1] != 2 || versionInts[2] < 0) { + throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $version") + } + } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncRequest.kt b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncRequest.kt index 9e1c4f38..6fda9fe4 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncRequest.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/StreamingSyncRequest.kt @@ -3,11 +3,14 @@ package com.powersync.sync import com.powersync.bucket.BucketRequest import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonObject @Serializable internal data class StreamingSyncRequest( val buckets: List, @SerialName("include_checksum") val includeChecksum: Boolean = true, + @SerialName("client_id") val clientId: String, + val parameters: JsonObject = JsonObject(mapOf()) ) { @SerialName("raw_data") private val rawData: Boolean = true -} +} \ No newline at end of file diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index 206deb32..11510524 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -40,6 +40,13 @@ public interface SyncStatusData { */ public val lastSyncedAt: Instant? + /** + * Indicates whether there has been at least one full sync, if any. + * + * Is null when unknown, for example when state is still being loaded from the database. + */ + public val hasSynced: Boolean? + /** * Error during uploading. * @@ -66,27 +73,12 @@ internal data class SyncStatusDataContainer( override val downloading: Boolean = false, override val uploading: Boolean = false, override val lastSyncedAt: Instant? = null, + override val hasSynced: Boolean? = null, override val uploadError: Any? = null, override val downloadError: Any? = null, ) : SyncStatusData { override val anyError get() = downloadError ?: uploadError - - /** - * Builder for creating a new SyncStatusDataContainer with updated values. - */ - class Builder(private var data: SyncStatusDataContainer) { - fun connected(value: Boolean) = apply { data = data.copy(connected = value) } - fun connecting(value: Boolean) = apply { data = data.copy(connecting = value) } - fun downloading(value: Boolean) = apply { data = data.copy(downloading = value) } - fun uploading(value: Boolean) = apply { data = data.copy(uploading = value) } - fun lastSyncedAt(value: Instant?) = apply { data = data.copy(lastSyncedAt = value) } - fun uploadError(value: Any?) = apply { data = data.copy(uploadError = value) } - fun downloadError(value: Any?) = apply { data = data.copy(downloadError = value) } - fun clearUploadError() = apply { data = data.copy(uploadError = null) } - fun clearDownloadError() = apply { data = data.copy(downloadError = null) } - fun build() = data - } } @@ -102,19 +94,6 @@ public data class SyncStatus internal constructor( return stateFlow.asSharedFlow() } - /** - * Updates the internal sync status indicators and emits Flow updates - * Usage: - * ``` - * syncStatus.update { - * connected(true) - * } - */ - internal fun update(builder: SyncStatusDataContainer.Builder.() -> Unit) { - data = SyncStatusDataContainer.Builder(data).apply(builder).build() - stateFlow.value = data - } - /** * Updates the internal sync status indicators and emits Flow updates */ @@ -123,6 +102,7 @@ public data class SyncStatus internal constructor( connecting: Boolean = data.connecting, downloading: Boolean = data.downloading, uploading: Boolean = data.uploading, + hasSynced: Boolean? = data.hasSynced, lastSyncedAt: Instant? = data.lastSyncedAt, uploadError: Any? = data.uploadError, downloadError: Any? = data.downloadError, @@ -135,6 +115,7 @@ public data class SyncStatus internal constructor( downloading = downloading, uploading = uploading, lastSyncedAt = lastSyncedAt, + hasSynced = hasSynced, uploadError = if (clearUploadError == true) null else uploadError, downloadError = if (clearDownloadError == true) null else downloadError, ) @@ -159,6 +140,9 @@ public data class SyncStatus internal constructor( override val lastSyncedAt: Instant? get() = data.lastSyncedAt + override val hasSynced: Boolean? + get() = data.hasSynced + override val uploadError: Any? get() = data.uploadError @@ -166,7 +150,7 @@ public data class SyncStatus internal constructor( get() = data.downloadError override fun toString(): String { - return "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, error=$anyError)" + return "SyncStatus(connected=$connected, connecting=$connecting, downloading=$downloading, uploading=$uploading, lastSyncedAt=$lastSyncedAt, hasSynced: $hasSynced, error=$anyError)" } public companion object { diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index f2aa8927..92eb7422 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -50,6 +50,8 @@ internal class SyncStream( */ var status = SyncStatus() + private var clientId: String? = null + private val httpClient: HttpClient = HttpClient { install(HttpTimeout) install(ContentNegotiation) @@ -84,6 +86,7 @@ internal class SyncStream( suspend fun streamingSync() { var invalidCredentials = false + clientId = bucketStorage.getClientId() while (true) { status.update(connecting = true) try { @@ -164,7 +167,7 @@ internal class SyncStream( private suspend fun getWriteCheckpoint(): String { val credentials = connector.getCredentialsCached() require(credentials != null) { "Not logged in" } - val uri = credentials.endpointUri("write-checkpoint2.json") + val uri = credentials.endpointUri("write-checkpoint2.json?client_id=$clientId'") val response = httpClient.get(uri) { contentType(ContentType.Application.Json) @@ -242,6 +245,7 @@ internal class SyncStream( val req = StreamingSyncRequest( buckets = initialBuckets.map { (bucket, after) -> BucketRequest(bucket, after) }, + clientId = clientId!! ) streamingSyncRequest(req).collect { value -> diff --git a/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncRequestTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncRequestTest.kt new file mode 100644 index 00000000..a5c4557a --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/sync/StreamingSyncRequestTest.kt @@ -0,0 +1,71 @@ +import com.powersync.bucket.BucketRequest +import com.powersync.sync.StreamingSyncRequest +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.JsonPrimitive +import kotlin.test.* + +class StreamingSyncRequestTest { + + private val json = Json { ignoreUnknownKeys = true } + + @Test + fun testSerialization() { + val request = StreamingSyncRequest( + buckets = listOf(BucketRequest("table1", "op1"), BucketRequest("table2", "op2")), + includeChecksum = true, + clientId = "client123", + parameters = JsonObject(mapOf("param1" to JsonPrimitive("value1"))) + ) + + val serialized = json.encodeToString(request) + + val expected = """ + {"buckets":[{"name":"table1","after":"op1"},{"name":"table2","after":"op2"}],"client_id":"client123","parameters":{"param1":"value1"}} + """.trimIndent().replace("\n", "") + + assertEquals(expected, serialized) + } + + @Test + fun testDeserialization() { + val jsonString = """ + { + "buckets": [{"name": "table1", "after": "op1"}, {"name": "table2", "after": "op2"}], + "include_checksum": false, + "client_id": "client456", + "parameters": {"param2": "value2"}, + "raw_data": true + } + """.trimIndent() + + val deserialized = json.decodeFromString(jsonString) + + assertEquals(2, deserialized.buckets.size) + assertEquals("table1", deserialized.buckets[0].name) + assertEquals("op1", deserialized.buckets[0].after) + assertEquals("table2", deserialized.buckets[1].name) + assertEquals("op2", deserialized.buckets[1].after) + assertFalse(deserialized.includeChecksum) + assertEquals("client456", deserialized.clientId) + assertEquals(JsonPrimitive("value2"), deserialized.parameters?.get("param2")) + } + + @Test + fun testDefaultValues() { + val request = StreamingSyncRequest( + buckets = listOf(), + clientId = "client789" + ) + + assertTrue(request.includeChecksum) + assertEquals(request.parameters, JsonObject(mapOf())) + + val serialized = json.encodeToString(request) + val expected = """ + {"buckets":[],"client_id":"client789"} + """.trimIndent().replace("\n", "") + assertEquals(serialized, expected) + } +} \ No newline at end of file diff --git a/demos/supabase-todolist/gradle/libs.versions.toml b/demos/supabase-todolist/gradle/libs.versions.toml index 0a593a82..aee29116 100644 --- a/demos/supabase-todolist/gradle/libs.versions.toml +++ b/demos/supabase-todolist/gradle/libs.versions.toml @@ -6,7 +6,7 @@ android-compileSdk = "34" java = "17" # Dependencies -powersync-core = "0.1.6" +powersync-core = "0.2.1" kotlin = "1.9.23" coroutines = "1.8.0" diff --git a/demos/supabase-todolist/iosApp/iosApp.xcodeproj/project.pbxproj b/demos/supabase-todolist/iosApp/iosApp.xcodeproj/project.pbxproj index d94ab97a..58344074 100644 --- a/demos/supabase-todolist/iosApp/iosApp.xcodeproj/project.pbxproj +++ b/demos/supabase-todolist/iosApp/iosApp.xcodeproj/project.pbxproj @@ -118,7 +118,6 @@ 7555FF79242A565900829871 /* Resources */, F85CB1118929364A9C6EFABC /* Frameworks */, 3C5ACF3A4AAFF294B2A5839B /* [CP] Embed Pods Frameworks */, - 3114824168D992F9E0D6D9F4 /* [CP] Copy Pods Resources */, ); buildRules = ( ); @@ -193,23 +192,6 @@ shellPath = /bin/sh; shellScript = "cd \"$SRCROOT/..\"\n./gradlew :shared:embedAndSignAppleFrameworkForXcode\n"; }; - 3114824168D992F9E0D6D9F4 /* [CP] Copy Pods Resources */ = { - isa = PBXShellScriptBuildPhase; - buildActionMask = 2147483647; - files = ( - ); - inputFileListPaths = ( - "${PODS_ROOT}/Target Support Files/Pods-iosApp/Pods-iosApp-resources-${CONFIGURATION}-input-files.xcfilelist", - ); - name = "[CP] Copy Pods Resources"; - outputFileListPaths = ( - "${PODS_ROOT}/Target Support Files/Pods-iosApp/Pods-iosApp-resources-${CONFIGURATION}-output-files.xcfilelist", - ); - runOnlyForDeploymentPostprocessing = 0; - shellPath = /bin/sh; - shellScript = "\"${PODS_ROOT}/Target Support Files/Pods-iosApp/Pods-iosApp-resources.sh\"\n"; - showEnvVarsInLog = 0; - }; 3C5ACF3A4AAFF294B2A5839B /* [CP] Embed Pods Frameworks */ = { isa = PBXShellScriptBuildPhase; buildActionMask = 2147483647; diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a293fa11..9c5d09b6 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -9,13 +9,13 @@ idea = "222.4459.24" # Flamingo | 2022.2.1 (see https://plugins.jetbrains.com/do # Dependencies kermit = "2.0.4" -kotlin = "1.9.23" +kotlin = "1.9.24" coroutines = "1.8.0" kotlinx-datetime = "0.5.0" kotlinx-io = "0.3.0" ktor = "2.3.10" uuid = "0.8.2" -powersync-core = "0.1.6" +powersync-core = "0.2.1" sqlite-android = "3.45.0" sqlDelight = "2.0.2"