Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ let package = Package(
)
,
]
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package com.powersync.connector.supabase

import co.touchlab.kermit.Logger
import com.powersync.PowerSyncDatabase
import com.powersync.PowerSyncException
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.db.crud.CrudEntry
import com.powersync.db.crud.UpdateType
import com.powersync.db.runWrappedSuspending
import io.github.jan.supabase.SupabaseClient
import io.github.jan.supabase.annotations.SupabaseInternal
import io.github.jan.supabase.auth.Auth
Expand Down Expand Up @@ -90,55 +92,70 @@ public class SupabaseConnector(
}
}

@Throws(PowerSyncException::class)
public suspend fun login(
email: String,
password: String,
) {
supabaseClient.auth.signInWith(Email) {
this.email = email
this.password = password
runWrappedSuspending {
supabaseClient.auth.signInWith(Email) {
this.email = email
this.password = password
}
}
}

@Throws(PowerSyncException::class)
public suspend fun signUp(
email: String,
password: String,
) {
supabaseClient.auth.signUpWith(Email) {
this.email = email
this.password = password
runWrappedSuspending {
supabaseClient.auth.signUpWith(Email) {
this.email = email
this.password = password
}
}
}

@Throws(PowerSyncException::class)
public suspend fun signOut() {
supabaseClient.auth.signOut()
runWrappedSuspending {
supabaseClient.auth.signOut()
}
}

public fun session(): UserSession? = supabaseClient.auth.currentSessionOrNull()

public val sessionStatus: StateFlow<SessionStatus> = supabaseClient.auth.sessionStatus

@Throws(PowerSyncException::class)
public suspend fun loginAnonymously() {
supabaseClient.auth.signInAnonymously()
runWrappedSuspending {
supabaseClient.auth.signInAnonymously()
}
}

/**
* Get credentials for PowerSync.
*/
@Throws(PowerSyncException::class)
override suspend fun fetchCredentials(): PowerSyncCredentials {
check(supabaseClient.auth.sessionStatus.value is SessionStatus.Authenticated) { "Supabase client is not authenticated" }
return runWrappedSuspending {
check(supabaseClient.auth.sessionStatus.value is SessionStatus.Authenticated) { "Supabase client is not authenticated" }

// Use Supabase token for PowerSync
val session = supabaseClient.auth.currentSessionOrNull() ?: error("Could not fetch Supabase credentials")
// Use Supabase token for PowerSync
val session = supabaseClient.auth.currentSessionOrNull() ?: error("Could not fetch Supabase credentials")

check(session.user != null) { "No user data" }
check(session.user != null) { "No user data" }

// userId is for debugging purposes only
return PowerSyncCredentials(
endpoint = powerSyncEndpoint,
token = session.accessToken, // Use the access token to authenticate against PowerSync
userId = session.user!!.id,
)
// userId is for debugging purposes only
PowerSyncCredentials(
endpoint = powerSyncEndpoint,
token = session.accessToken, // Use the access token to authenticate against PowerSync
userId = session.user!!.id,
)
}
}

/**
Expand All @@ -147,60 +164,63 @@ public class SupabaseConnector(
* This function is called whenever there is data to upload, whether the device is online or offline.
* If this call throws an error, it is retried periodically.
*/
@Throws(PowerSyncException::class)
override suspend fun uploadData(database: PowerSyncDatabase) {
val transaction = database.getNextCrudTransaction() ?: return
return runWrappedSuspending {
val transaction = database.getNextCrudTransaction() ?: return@runWrappedSuspending

var lastEntry: CrudEntry? = null
try {
for (entry in transaction.crud) {
lastEntry = entry
var lastEntry: CrudEntry? = null
try {
for (entry in transaction.crud) {
lastEntry = entry

val table = supabaseClient.from(entry.table)
val table = supabaseClient.from(entry.table)

when (entry.op) {
UpdateType.PUT -> {
val data = entry.opData?.toMutableMap() ?: mutableMapOf()
data["id"] = entry.id
table.upsert(data)
}
when (entry.op) {
UpdateType.PUT -> {
val data = entry.opData?.toMutableMap() ?: mutableMapOf()
data["id"] = entry.id
table.upsert(data)
}

UpdateType.PATCH -> {
table.update(entry.opData!!) {
filter {
eq("id", entry.id)
UpdateType.PATCH -> {
table.update(entry.opData!!) {
filter {
eq("id", entry.id)
}
}
}
}

UpdateType.DELETE -> {
table.delete {
filter {
eq("id", entry.id)
UpdateType.DELETE -> {
table.delete {
filter {
eq("id", entry.id)
}
}
}
}
}
}

transaction.complete(null)
} catch (e: Exception) {
if (errorCode != null && PostgresFatalCodes.isFatalError(errorCode.toString())) {
/**
* Instead of blocking the queue with these errors,
* discard the (rest of the) transaction.
*
* Note that these errors typically indicate a bug in the application.
* If protecting against data loss is important, save the failing records
* elsewhere instead of discarding, and/or notify the user.
*/
Logger.e("Data upload error: ${e.message}")
Logger.e("Discarding entry: $lastEntry")
transaction.complete(null)
return
}
} catch (e: Exception) {
if (errorCode != null && PostgresFatalCodes.isFatalError(errorCode.toString())) {
/**
* Instead of blocking the queue with these errors,
* discard the (rest of the) transaction.
*
* Note that these errors typically indicate a bug in the application.
* If protecting against data loss is important, save the failing records
* elsewhere instead of discarding, and/or notify the user.
*/
Logger.e("Data upload error: ${e.message}")
Logger.e("Discarding entry: $lastEntry")
transaction.complete(null)
return@runWrappedSuspending
}

Logger.e("Data upload error - retrying last entry: $lastEntry, $e")
throw e
Logger.e("Data upload error - retrying last entry: $lastEntry, $e")
throw e
}
}
}
}
3 changes: 3 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/Exceptions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.powersync

public class PowerSyncException(message: String, cause: Throwable): Exception(message, cause)
11 changes: 9 additions & 2 deletions core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.powersync.db.crud.CrudBatch
import com.powersync.db.crud.CrudTransaction
import com.powersync.sync.SyncStatus
import com.powersync.utils.JsonParam
import kotlin.coroutines.cancellation.CancellationException

/**
* A PowerSync managed database.
Expand All @@ -25,6 +26,7 @@ public interface PowerSyncDatabase : Queries {
/**
* Suspend function that resolves when the first sync has occurred
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun waitForFirstSync()

/**
Expand Down Expand Up @@ -56,7 +58,7 @@ public interface PowerSyncDatabase : Queries {
* ```
* TODO: Internal Team - Status changes are reported on [statusStream].
*/

@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun connect(
connector: PowerSyncBackendConnector,
crudThrottleMs: Long = 1000L,
Expand All @@ -81,6 +83,7 @@ public interface PowerSyncDatabase : Queries {
* data by transaction. One batch may contain data from multiple transactions,
* and a single transaction may be split over multiple batches.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun getCrudBatch(limit: Int = 100): CrudBatch?

/**
Expand All @@ -96,19 +99,21 @@ public interface PowerSyncDatabase : Queries {
* Unlike [getCrudBatch], this only returns data from a single transaction at a time.
* All data for the transaction is loaded into memory.
*/

@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun getNextCrudTransaction(): CrudTransaction?

/**
* Convenience method to get the current version of PowerSync.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun getPowerSyncVersion(): String

/**
* Close the sync connection.
*
* Use [connect] to connect again.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun disconnect()

/**
Expand All @@ -119,6 +124,7 @@ public interface PowerSyncDatabase : Queries {
*
* To preserve data in local-only tables, set clearLocal to false.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun disconnectAndClear(clearLocal: Boolean = true)

/**
Expand All @@ -127,5 +133,6 @@ public interface PowerSyncDatabase : Queries {
*
* Once close is called, this database cannot be used again - a new one must be constructed.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun close()
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.powersync.connectors

import com.powersync.PowerSyncDatabase
import com.powersync.PowerSyncException
import com.powersync.db.runWrappedSuspending
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlin.coroutines.cancellation.CancellationException

/**
* Implement this to connect an app backend.
Expand All @@ -26,10 +29,13 @@ public abstract class PowerSyncBackendConnector {
*
* These credentials may have expired already.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public open suspend fun getCredentialsCached(): PowerSyncCredentials? {
cachedCredentials?.let { return it }
prefetchCredentials()?.join()
return cachedCredentials
return runWrappedSuspending {
cachedCredentials?.let { return@runWrappedSuspending it }
prefetchCredentials()?.join()
cachedCredentials
}
}

/**
Expand All @@ -49,6 +55,7 @@ public abstract class PowerSyncBackendConnector {
*
* This may be called before the current credentials have expired.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public open suspend fun prefetchCredentials(): Job? {
fetchRequest?.takeIf { it.isActive }?.let { return it }

Expand All @@ -74,6 +81,7 @@ public abstract class PowerSyncBackendConnector {
*
* This token is kept for the duration of a sync connection.
*/
@Throws(PowerSyncException::class, CancellationException::class)
public abstract suspend fun fetchCredentials(): PowerSyncCredentials?

/**
Expand All @@ -83,5 +91,6 @@ public abstract class PowerSyncBackendConnector {
*
* Any thrown errors will result in a retry after the configured wait period (default: 5 seconds).
*/
@Throws(PowerSyncException::class, CancellationException::class)
public abstract suspend fun uploadData(database: PowerSyncDatabase)
}
23 changes: 23 additions & 0 deletions core/src/commonMain/kotlin/com/powersync/db/Functions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.powersync.db

import com.powersync.PowerSyncException

public fun <R> runWrapped(block: () -> R): R = try {
block()
} catch (t: Throwable) {
if (t is PowerSyncException) {
throw t
} else {
throw PowerSyncException(t.message ?: "Unknown internal exception", t)
}
}

public suspend fun <R> runWrappedSuspending(block: suspend () -> R): R = try {
block()
} catch (t: Throwable) {
if (t is PowerSyncException) {
throw t
} else {
throw PowerSyncException(t.message ?: "Unknown internal exception", t)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ internal class PowerSyncDatabaseImpl(

val hasSynced = timestamp != null
if (hasSynced != currentStatus.hasSynced) {
val formattedDateTime = "${timestamp!!.replace(" ","T").toLocalDateTime()}Z"
val formattedDateTime = "${timestamp!!.replace(" ", "T").toLocalDateTime()}Z"
val lastSyncedAt = Instant.parse(formattedDateTime)
currentStatus.update(hasSynced = hasSynced, lastSyncedAt = lastSyncedAt)
}
Expand Down
Loading
Loading