Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.2.3 (unreleased)

* Fix `runWrapped` catching cancellation exceptions.

## 1.2.2

* Supabase: Avoid creating `Json` serializers multiple times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.db.crud.CrudEntry
import com.powersync.db.crud.UpdateType
import com.powersync.db.runWrappedSuspending
import com.powersync.db.runWrapped
import io.github.jan.supabase.SupabaseClient
import io.github.jan.supabase.annotations.SupabaseInternal
import io.github.jan.supabase.auth.Auth
Expand Down Expand Up @@ -115,7 +115,7 @@ public class SupabaseConnector(
email: String,
password: String,
) {
runWrappedSuspending {
runWrapped {
supabaseClient.auth.signInWith(Email) {
this.email = email
this.password = password
Expand All @@ -127,7 +127,7 @@ public class SupabaseConnector(
email: String,
password: String,
) {
runWrappedSuspending {
runWrapped {
supabaseClient.auth.signUpWith(Email) {
this.email = email
this.password = password
Expand All @@ -136,7 +136,7 @@ public class SupabaseConnector(
}

public suspend fun signOut() {
runWrappedSuspending {
runWrapped {
supabaseClient.auth.signOut()
}
}
Expand All @@ -146,7 +146,7 @@ public class SupabaseConnector(
public val sessionStatus: StateFlow<SessionStatus> = supabaseClient.auth.sessionStatus

public suspend fun loginAnonymously() {
runWrappedSuspending {
runWrapped {
supabaseClient.auth.signInAnonymously()
}
}
Expand All @@ -155,7 +155,7 @@ public class SupabaseConnector(
* Get credentials for PowerSync.
*/
override suspend fun fetchCredentials(): PowerSyncCredentials =
runWrappedSuspending {
runWrapped {
check(supabaseClient.auth.sessionStatus.value is SessionStatus.Authenticated) { "Supabase client is not authenticated" }

// Use Supabase token for PowerSync
Expand All @@ -178,8 +178,8 @@ public class SupabaseConnector(
* If this call throws an error, it is retried periodically.
*/
override suspend fun uploadData(database: PowerSyncDatabase) {
return runWrappedSuspending {
val transaction = database.getNextCrudTransaction() ?: return@runWrappedSuspending
return runWrapped {
val transaction = database.getNextCrudTransaction() ?: return@runWrapped

var lastEntry: CrudEntry? = null
try {
Expand Down Expand Up @@ -227,7 +227,7 @@ public class SupabaseConnector(
Logger.e("Data upload error: ${e.message}")
Logger.e("Discarding entry: $lastEntry")
transaction.complete(null)
return@runWrappedSuspending
return@runWrapped
}

Logger.e("Data upload error - retrying last entry: $lastEntry, $e")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.powersync.attachments.storage.IOLocalStorageAdapter
import com.powersync.attachments.sync.SyncingService
import com.powersync.db.getString
import com.powersync.db.internal.ConnectionContext
import com.powersync.db.runWrappedSuspending
import com.powersync.db.runWrapped
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
Expand Down Expand Up @@ -207,7 +207,7 @@ public open class AttachmentQueue(
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun startSync(): Unit =
runWrappedSuspending {
runWrapped {
mutex.withLock {
if (closed) {
throw Exception("Attachment queue has been closed")
Expand Down Expand Up @@ -261,9 +261,9 @@ public open class AttachmentQueue(
}

private suspend fun stopSyncingInternal(): Unit =
runWrappedSuspending {
runWrapped {
if (closed) {
return@runWrappedSuspending
return@runWrapped
}

syncStatusJob?.cancelAndJoin()
Expand All @@ -278,10 +278,10 @@ public open class AttachmentQueue(
*/
@Throws(PowerSyncException::class, CancellationException::class)
public suspend fun close(): Unit =
runWrappedSuspending {
runWrapped {
mutex.withLock {
if (closed) {
return@runWrappedSuspending
return@runWrapped
}

syncStatusJob?.cancelAndJoin()
Expand Down Expand Up @@ -322,7 +322,7 @@ public open class AttachmentQueue(
*/
@Throws(PowerSyncException::class, CancellationException::class)
public open suspend fun processWatchedAttachments(items: List<WatchedAttachmentItem>): Unit =
runWrappedSuspending {
runWrapped {
/**
* Use a lock here to prevent conflicting state updates.
*/
Expand Down Expand Up @@ -436,7 +436,7 @@ public open class AttachmentQueue(
metaData: String? = null,
updateHook: (context: ConnectionContext, attachment: Attachment) -> Unit,
): Attachment =
runWrappedSuspending {
runWrapped {
val id = db.get("SELECT uuid() as id") { it.getString("id") }
val filename =
resolveNewAttachmentFilename(attachmentId = id, fileExtension = fileExtension)
Expand Down Expand Up @@ -487,7 +487,7 @@ public open class AttachmentQueue(
attachmentId: String,
updateHook: (context: ConnectionContext, attachment: Attachment) -> Unit,
): Attachment =
runWrappedSuspending {
runWrapped {
attachmentsService.withContext { attachmentContext ->
val attachment =
attachmentContext.getAttachment(attachmentId)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.powersync.attachments.storage

import com.powersync.attachments.LocalStorage
import com.powersync.db.runWrappedSuspending
import com.powersync.db.runWrapped
import io.ktor.utils.io.core.remaining
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
Expand All @@ -26,7 +26,7 @@ public open class IOLocalStorageAdapter(
filePath: String,
data: Flow<ByteArray>,
): Long =
runWrappedSuspending {
runWrapped {
withContext(Dispatchers.IO) {
var totalSize = 0L
fileSystem.sink(Path(filePath)).use { sink ->
Expand Down Expand Up @@ -65,28 +65,28 @@ public open class IOLocalStorageAdapter(
}.flowOn(Dispatchers.IO)

public override suspend fun deleteFile(filePath: String): Unit =
runWrappedSuspending {
runWrapped {
withContext(Dispatchers.IO) {
fileSystem.delete(Path(filePath))
}
}

public override suspend fun fileExists(filePath: String): Boolean =
runWrappedSuspending {
runWrapped {
withContext(Dispatchers.IO) {
fileSystem.exists(Path(filePath))
}
}

public override suspend fun makeDir(path: String): Unit =
runWrappedSuspending {
runWrapped {
withContext(Dispatchers.IO) {
fileSystem.createDirectories(Path(path))
}
}

public override suspend fun rmDir(path: String): Unit =
runWrappedSuspending {
runWrapped {
withContext(Dispatchers.IO) {
for (item in fileSystem.list(Path(path))) {
// Can't delete directories with files in them. Need to go down the file tree
Expand All @@ -105,7 +105,7 @@ public open class IOLocalStorageAdapter(
sourcePath: String,
targetPath: String,
): Unit =
runWrappedSuspending {
runWrapped {
withContext(Dispatchers.IO) {
fileSystem.source(Path(sourcePath)).use { source ->
fileSystem.sink(Path(targetPath)).use { sink ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.powersync.connectors

import com.powersync.PowerSyncDatabase
import com.powersync.PowerSyncException
import com.powersync.db.runWrappedSuspending
import com.powersync.db.runWrapped
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -31,8 +31,8 @@ public abstract class PowerSyncBackendConnector {
*/
@Throws(PowerSyncException::class, CancellationException::class)
public open suspend fun getCredentialsCached(): PowerSyncCredentials? {
return runWrappedSuspending {
cachedCredentials?.let { return@runWrappedSuspending it }
return runWrapped {
cachedCredentials?.let { return@runWrapped it }
prefetchCredentials().join()
cachedCredentials
}
Expand Down
24 changes: 10 additions & 14 deletions core/src/commonMain/kotlin/com/powersync/db/Functions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,10 @@ import co.touchlab.kermit.Logger
import com.powersync.PowerSyncException
import kotlinx.coroutines.CancellationException

public fun <R> runWrapped(block: () -> R): R =
try {
block()
} catch (t: Throwable) {
if (t is PowerSyncException) {
Logger.e("PowerSyncException: ${t.message}")
throw t
} else {
Logger.e("PowerSyncException: ${t.message}")
throw PowerSyncException(t.message ?: "Unknown internal exception", t)
}
}

public suspend fun <R> runWrappedSuspending(block: suspend () -> R): R =
/**
* Runs the given [block], wrapping exceptions as [PowerSyncException]s.
*/
public inline fun <R> runWrapped(block: () -> R): R =
try {
block()
} catch (t: Throwable) {
Expand All @@ -33,3 +23,9 @@ public suspend fun <R> runWrappedSuspending(block: suspend () -> R): R =
throw PowerSyncException(t.message ?: "Unknown internal exception", t)
}
}

@Deprecated("Use runWrapped instead", replaceWith = ReplaceWith("runWrapped"))
public suspend fun <R> runWrappedSuspending(block: suspend () -> R): R =
runWrapped {
block()
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ internal class PowerSyncDatabaseImpl(
}

override suspend fun updateSchema(schema: Schema) =
runWrappedSuspending {
runWrapped {
waitReady()
updateSchemaInternal(schema)
}
Expand Down Expand Up @@ -515,7 +515,7 @@ internal class PowerSyncDatabaseImpl(
}

override suspend fun close() =
runWrappedSuspending {
runWrapped {
mutex.withLock {
if (closed) {
return@withLock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import com.powersync.db.SqlCursor
import com.powersync.db.ThrowableLockCallback
import com.powersync.db.ThrowableTransactionCallback
import com.powersync.db.runWrapped
import com.powersync.db.runWrappedSuspending
import com.powersync.utils.AtomicMutableSet
import com.powersync.utils.JsonUtil
import com.powersync.utils.throttle
Expand All @@ -24,7 +23,6 @@ import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import kotlinx.serialization.encodeToString
import kotlin.time.Duration.Companion.milliseconds

@OptIn(FlowPreview::class)
Expand Down Expand Up @@ -67,7 +65,7 @@ internal class InternalDatabaseImpl(

override suspend fun updateSchema(schemaJson: String) {
withContext(dbContext) {
runWrappedSuspending {
runWrapped {
// First get a lock on all read connections
readPool.withAllConnections { readConnections ->
// Then get access to the write connection
Expand Down Expand Up @@ -183,7 +181,7 @@ internal class InternalDatabaseImpl(
*/
private suspend fun <R> internalReadLock(callback: (TransactorDriver) -> R): R =
withContext(dbContext) {
runWrappedSuspending {
runWrapped {
readPool.withConnection {
catchSwiftExceptions {
callback(it)
Expand Down Expand Up @@ -295,7 +293,7 @@ internal class InternalDatabaseImpl(
}

override suspend fun close() {
runWrappedSuspending {
runWrapped {
writeConnection.driver.close()
readPool.close()
}
Expand Down
26 changes: 26 additions & 0 deletions core/src/commonTest/kotlin/com/powersync/db/FunctionTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.powersync.db

import com.powersync.PowerSyncException
import io.kotest.assertions.throwables.shouldThrow
import kotlinx.coroutines.CancellationException
import kotlin.test.Test

class FunctionTest {
@Test
fun `runWrapped reports exceptions as powersync exception`() {
shouldThrow<PowerSyncException> {
runWrapped {
error("test")
}
}
}

@Test
fun `runWrapped does not wrap cancellation exceptions`() {
shouldThrow<CancellationException> {
runWrapped {
throw CancellationException("test")
}
}
}
}