diff --git a/CHANGELOG.md b/CHANGELOG.md index a65124a8..d7ae23eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-BETA29 + +* Added queing protection and warnings when connecting multiple PowerSync clients to the same database file. + ## 1.0.0-BETA28 * Update PowerSync SQLite core extension to 0.3.12. diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 3472c880..f3fa85cf 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -1,26 +1,53 @@ package com.powersync import app.cash.turbine.turbineScope +import co.touchlab.kermit.ExperimentalKermitApi +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.TestConfig +import co.touchlab.kermit.TestLogWriter +import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow +import com.powersync.testutils.waitFor import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertNotNull +@OptIn(ExperimentalKermitApi::class) class DatabaseTest { + private val logWriter = + TestLogWriter( + loggable = Severity.Debug, + ) + + private val logger = + Logger( + TestConfig( + minSeverity = Severity.Debug, + logWriterList = listOf(logWriter), + ), + ) + private lateinit var database: PowerSyncDatabase + private fun openDB() = + PowerSyncDatabase( + factory = com.powersync.testutils.factory, + schema = Schema(UserRow.table), + dbFilename = "testdb", + logger = logger, + ) + @BeforeTest fun setupDatabase() { - database = - PowerSyncDatabase( - factory = com.powersync.testutils.factory, - schema = Schema(UserRow.table), - dbFilename = "testdb", - ) + logWriter.reset() + + database = openDB() runBlocking { database.disconnectAndClear(true) @@ -86,4 +113,19 @@ class DatabaseTest { query.cancel() } } + + @Test + fun warnsMultipleInstances() = + runTest { + // Opens a second DB with the same database filename + val db2 = openDB() + waitFor { + assertNotNull( + logWriter.logs.find { + it.message == PowerSyncDatabaseImpl.multipleInstancesMessage + }, + ) + } + db2.close() + } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index eee94854..043a8495 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -1,9 +1,11 @@ package com.powersync import app.cash.turbine.turbineScope +import co.touchlab.kermit.ExperimentalKermitApi import co.touchlab.kermit.Logger import co.touchlab.kermit.Severity import co.touchlab.kermit.TestConfig +import co.touchlab.kermit.TestLogWriter import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint @@ -18,6 +20,7 @@ import com.powersync.sync.SyncStream import com.powersync.testutils.MockSyncService import com.powersync.testutils.UserRow import com.powersync.testutils.cleanup +import com.powersync.testutils.factory import com.powersync.testutils.waitFor import com.powersync.utils.JsonUtil import dev.mokkery.answering.returns @@ -35,16 +38,22 @@ import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFalse +import kotlin.test.assertNotNull import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds -@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class) +@OptIn(ExperimentalKermitApi::class) class SyncIntegrationTest { + private val logWriter = + TestLogWriter( + loggable = Severity.Debug, + ) + private val logger = Logger( TestConfig( minSeverity = Severity.Debug, - logWriterList = listOf(), + logWriterList = listOf(logWriter), ), ) private lateinit var database: PowerSyncDatabaseImpl @@ -54,6 +63,7 @@ class SyncIntegrationTest { @BeforeTest fun setup() { cleanup("testdb") + logWriter.reset() database = openDb() connector = mock { @@ -75,12 +85,15 @@ class SyncIntegrationTest { @AfterTest fun teardown() { + runBlocking { + database.close() + } cleanup("testdb") } private fun openDb() = PowerSyncDatabase( - factory = com.powersync.testutils.factory, + factory = factory, schema = Schema(UserRow.table), dbFilename = "testdb", ) as PowerSyncDatabaseImpl @@ -271,6 +284,26 @@ class SyncIntegrationTest { syncLines.close() } + @Test + fun setsConnectingState() = + runTest { + turbineScope(timeout = 10.0.seconds) { + val syncStream = syncStream() + val turbine = database.currentStatus.asFlow().testIn(this) + + database.connect(syncStream, 1000L) + turbine.waitFor { it.connecting } + + database.disconnect() + + turbine.waitFor { !it.connecting && !it.connected } + turbine.cancel() + } + + database.close() + syncLines.close() + } + @Test fun testMultipleSyncsDoNotCreateMultipleStatusEntries() = runTest { @@ -312,6 +345,123 @@ class SyncIntegrationTest { turbine.cancel() } + database.close() + syncLines.close() + } + + @Test + fun warnsMultipleConnectionAttempts() = + runTest { + val db2 = + PowerSyncDatabase( + factory = factory, + schema = Schema(UserRow.table), + dbFilename = "testdb", + logger = logger, + ) as PowerSyncDatabaseImpl + + turbineScope(timeout = 10.0.seconds) { + // Connect the first database + database.connect(connector, 1000L) + db2.connect(connector) + + waitFor { + assertNotNull( + logWriter.logs.find { + it.message == PowerSyncDatabaseImpl.streamConflictMessage + }, + ) + } + + db2.disconnect() + database.disconnect() + } + + db2.close() + database.close() + syncLines.close() + } + + @Test + fun queuesMultipleConnectionAttempts() = + runTest { + val db2 = + PowerSyncDatabase( + factory = factory, + schema = Schema(UserRow.table), + dbFilename = "testdb", + logger = Logger, + ) as PowerSyncDatabaseImpl + + turbineScope(timeout = 10.0.seconds) { + val turbine1 = database.currentStatus.asFlow().testIn(this) + val turbine2 = db2.currentStatus.asFlow().testIn(this) + + // Connect the first database + database.connect(connector, 1000L) + + turbine1.waitFor { it.connecting } + db2.connect(connector) + + // Should not be connecting yet + assertEquals(false, db2.currentStatus.connecting) + + database.disconnect() + turbine1.waitFor { !it.connecting } + + // Should start connecting after the other database disconnected + turbine2.waitFor { it.connecting } + db2.disconnect() + turbine2.waitFor { !it.connecting } + + turbine1.cancel() + turbine2.cancel() + } + + db2.close() + database.close() + syncLines.close() + } + + @Test + fun reconnectsAfterDisconnecting() = + runTest { + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + + database.connect(connector, 1000L) + turbine.waitFor { it.connecting } + + database.disconnect() + turbine.waitFor { !it.connecting } + + database.connect(connector, 1000L) + turbine.waitFor { it.connecting } + database.disconnect() + turbine.waitFor { !it.connecting } + + turbine.cancel() + } + + database.close() + syncLines.close() + } + + @Test + fun reconnects() = + runTest { + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + + database.connect(connector, 1000L, retryDelayMs = 5000) + turbine.waitFor { it.connecting } + + database.connect(connector, 1000L, retryDelayMs = 5000) + turbine.waitFor { it.connecting } + + turbine.cancel() + } + database.close() syncLines.close() } diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index 9e0a9564..71d2de99 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -19,6 +19,12 @@ import kotlin.coroutines.cancellation.CancellationException * All changes to local tables are automatically recorded, whether connected or not. Once connected, the changes are uploaded. */ public interface PowerSyncDatabase : Queries { + /** + * Identifies the database client. + * This is typically the database name. + */ + public val identifier: String + /** * The current sync status. */ diff --git a/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt b/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt new file mode 100644 index 00000000..a70e6661 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt @@ -0,0 +1,23 @@ +package com.powersync.db + +import com.powersync.PowerSyncDatabase +import com.powersync.utils.ExclusiveMethodProvider + +internal class ActiveInstanceStore : ExclusiveMethodProvider() { + private val instances = mutableListOf() + + /** + * Registers an instance. Returns true if multiple instances with the same identifier are + * present. + */ + suspend fun registerAndCheckInstance(db: PowerSyncDatabase) = + exclusiveMethod("instances") { + instances.add(db) + return@exclusiveMethod instances.filter { it.identifier == db.identifier }.size > 1 + } + + suspend fun removeInstance(db: PowerSyncDatabase) = + exclusiveMethod("instances") { + instances.remove(db) + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index b22fcffa..edd08017 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -20,6 +20,7 @@ import com.powersync.sync.PriorityStatusEntry import com.powersync.sync.SyncStatus import com.powersync.sync.SyncStatusData import com.powersync.sync.SyncStream +import com.powersync.utils.ExclusiveMethodProvider import com.powersync.utils.JsonParam import com.powersync.utils.JsonUtil import com.powersync.utils.throttle @@ -28,6 +29,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first @@ -55,9 +57,33 @@ internal class PowerSyncDatabaseImpl( private val dbFilename: String, val logger: Logger = Logger, driver: PsSqlDriver = factory.createDriver(scope, dbFilename), -) : PowerSyncDatabase { +) : ExclusiveMethodProvider(), + PowerSyncDatabase { + companion object { + internal val streamConflictMessage = + """ + Another PowerSync client is already connected to this database. + Multiple connections to the same database should be avoided. + Please check your PowerSync client instantiation logic. + This connection attempt will be queued and will only be executed after + currently connecting clients are disconnected. + """.trimIndent() + + internal val multipleInstancesMessage = + """ + Multiple PowerSync instances for the same database have been detected. + This can cause unexpected results. + Please check your PowerSync client instantiation logic if this is not intentional. + """.trimIndent() + + internal val instanceStore = ActiveInstanceStore() + } + + override val identifier = dbFilename + private val internalDb = InternalDatabaseImpl(driver, scope) internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) + var closed = false /** * The current sync status. @@ -71,7 +97,12 @@ internal class PowerSyncDatabaseImpl( private var uploadJob: Job? = null init { + val db = this runBlocking { + val isMultiple = instanceStore.registerAndCheckInstance(db) + if (isMultiple) { + logger.w { multipleInstancesMessage } + } val sqliteVersion = internalDb.queries.sqliteVersion().executeAsOne() logger.d { "SQLiteVersion: $sqliteVersion" } checkVersion() @@ -95,8 +126,7 @@ internal class PowerSyncDatabaseImpl( crudThrottleMs: Long, retryDelayMs: Long, params: Map, - ) { - // close connection if one is open + ) = exclusiveMethod("connect") { disconnect() connect( @@ -118,9 +148,43 @@ internal class PowerSyncDatabaseImpl( crudThrottleMs: Long, ) { this.syncStream = stream + + val db = this + syncJob = scope.launch { - syncStream!!.streamingSync() + // Get a global lock for checking mutex maps + val streamMutex = + globalMutexFor("streaming-$identifier") + + // Poke the streaming mutex to see if another client is using it + var obtainedLock = false + try { + // This call will throw if the lock is already held by this db client. + // We should never reach that point since we disconnect before connecting. + obtainedLock = streamMutex.tryLock(db) + if (!obtainedLock) { + // The mutex is held already by another PowerSync instance (owner). + // (The tryLock should throw if this client already holds the lock). + logger.w(streamConflictMessage) + } + } catch (ex: IllegalStateException) { + logger.e { "The streaming sync client did not disconnect before connecting" } + } + + // This effectively queues operations + if (!obtainedLock) { + // This will throw a CancellationException if the job was cancelled while waiting. + streamMutex.lock(db) + } + + // We have a lock if we reached here + try { + ensureActive() + syncStream!!.streamingSync() + } finally { + streamMutex.unlock(db) + } } scope.launch { @@ -316,7 +380,10 @@ internal class PowerSyncDatabaseImpl( SyncedAt( priority = BucketPriority(it.getLong(0)!!.toInt()), - syncedAt = LocalDateTime.parse(rawTime.replace(" ", "T")).toInstant(TimeZone.UTC), + syncedAt = + LocalDateTime + .parse(rawTime.replace(" ", "T")) + .toInstant(TimeZone.UTC), ) } @@ -363,10 +430,16 @@ internal class PowerSyncDatabaseImpl( currentStatus.asFlow().first(predicate) } - override suspend fun close() { - disconnect() - internalDb.close() - } + override suspend fun close() = + exclusiveMethod("close") { + if (closed) { + return@exclusiveMethod + } + disconnect() + internalDb.close() + instanceStore.removeInstance(this) + closed = true + } /** * Check that a supported version of the powersync extension is loaded. diff --git a/core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt b/core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt new file mode 100644 index 00000000..f71d99c1 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt @@ -0,0 +1,39 @@ +package com.powersync.utils + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +// Helper class for executing methods exclusively +internal open class ExclusiveMethodProvider { + // Class level mutexes + private val mapLock = Mutex() + private val mutexMap = mutableMapOf() + + companion object { + // global level mutexes for global exclusivity + private val staticMapLock = Mutex() + private val staticMutexes = mutableMapOf() + + // Runs the callback exclusively on the global level + internal suspend fun globallyExclusive( + lockName: String, + callback: suspend () -> R, + ) = globalMutexFor(lockName).withLock { callback() } + + internal suspend fun globalMutexFor(lockName: String): Mutex = + staticMapLock.withLock { + staticMutexes.getOrPut(lockName) { Mutex() } + } + } + + // A method for running a callback exclusively on the class instance level + internal suspend fun exclusiveMethod( + lockName: String, + callback: suspend () -> R, + ): R = mutexFor(lockName).withLock { callback() } + + internal suspend fun mutexFor(lockName: String): Mutex = + mapLock.withLock { + mutexMap.getOrPut(lockName) { Mutex() } + } +} diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt b/core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt new file mode 100644 index 00000000..cbc8fdb5 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt @@ -0,0 +1,25 @@ +package com.powersync.testutils + +import kotlinx.coroutines.delay +import kotlinx.datetime.Clock +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +internal suspend fun waitFor( + timeout: Duration = 500.milliseconds, + interval: Duration = 100.milliseconds, + test: () -> Unit, +) { + val begin = Clock.System.now().toEpochMilliseconds() + do { + try { + test() + return + } catch (_: Error) { + // Treat exceptions as failed + } + delay(interval.inWholeMilliseconds) + } while ((Clock.System.now().toEpochMilliseconds() - begin) < timeout.inWholeMilliseconds) + + throw Exception("Timeout reached") +} diff --git a/demos/hello-powersync/composeApp/src/androidMain/kotlin/com/powersync/demos/MainActivity.kt b/demos/hello-powersync/composeApp/src/androidMain/kotlin/com/powersync/demos/MainActivity.kt index 1c66a58a..4209f49f 100644 --- a/demos/hello-powersync/composeApp/src/androidMain/kotlin/com/powersync/demos/MainActivity.kt +++ b/demos/hello-powersync/composeApp/src/androidMain/kotlin/com/powersync/demos/MainActivity.kt @@ -5,7 +5,6 @@ import androidx.activity.ComponentActivity import androidx.activity.compose.setContent import androidx.compose.runtime.Composable import androidx.compose.ui.tooling.preview.Preview -import com.powersync.DatabaseDriverFactory import com.powersync.sync.SyncStatus class MainActivity : ComponentActivity() { @@ -18,11 +17,18 @@ class MainActivity : ComponentActivity() { } } - @Preview @Composable fun ViewContentPreview() { - ViewContent("Preview", listOf(User("1", "John Doe", "john@example.com")), {}, {}, SyncStatus.empty()) + ViewContent( + "Preview", + listOf(User("1", "John Doe", "john@example.com")), + {}, + {}, + SyncStatus.empty(), + {}, + {}, + ) } @Preview @@ -36,4 +42,3 @@ fun ViewContentPreview_ListItem() { fun ViewContentPreview_MyButton() { MyButton(label = "Preview Button", onClick = {}) } - diff --git a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt index b9655a62..2e1f444e 100644 --- a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt +++ b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt @@ -27,12 +27,14 @@ fun App() { var version by remember { mutableStateOf("Loading") } val scope = rememberCoroutineScope() val customers by powerSync.watchUsers().collectAsState(emptyList()) - val syncStatus by powerSync.db.currentStatus.asFlow().collectAsState(powerSync.db.currentStatus) + val syncStatus by powerSync.db.currentStatus + .asFlow() + .collectAsState(powerSync.db.currentStatus) MaterialTheme { Surface( modifier = Modifier.fillMaxSize(), - color = MaterialTheme.colors.background + color = MaterialTheme.colors.background, ) { LaunchedEffect(powerSync) { scope.launch { @@ -55,7 +57,17 @@ fun App() { powerSync.deleteUser() } }, - syncStatus = syncStatus + syncStatus = syncStatus, + onConnect = { + scope.launch { + powerSync.connect() + } + }, + onDisconnect = { + scope.launch { + powerSync.disconnect() + } + }, ) } } @@ -67,7 +79,9 @@ fun ViewContent( users: List, onCreate: () -> Unit, onDelete: () -> Unit, - syncStatus: SyncStatusData + syncStatus: SyncStatusData, + onConnect: () -> Unit, + onDisconnect: () -> Unit, ) { val layoutDirection = LocalLayoutDirection.current Scaffold( @@ -83,20 +97,20 @@ fun ViewContent( Box(modifier = Modifier.fillMaxSize()) { LazyColumn( contentPadding = - PaddingValues( - start = padding.calculateStartPadding(layoutDirection), - top = padding.calculateTopPadding() + 8.dp, - end = padding.calculateEndPadding(layoutDirection), - bottom = padding.calculateBottomPadding() + 80.dp - ), + PaddingValues( + start = padding.calculateStartPadding(layoutDirection), + top = padding.calculateTopPadding() + 8.dp, + end = padding.calculateEndPadding(layoutDirection), + bottom = padding.calculateBottomPadding() + 80.dp, + ), ) { item { - ListItem( "Name", "Email", style = TextStyle(fontWeight = FontWeight.Bold), - modifier = Modifier.padding(bottom = 8.dp), divider = false + modifier = Modifier.padding(bottom = 8.dp), + divider = false, ) } items(users) { @@ -108,7 +122,7 @@ fun ViewContent( Row( modifier = Modifier.fillMaxWidth(), - horizontalArrangement = Arrangement.SpaceAround + horizontalArrangement = Arrangement.SpaceAround, ) { Column { MyButton(label = "Create") { @@ -122,6 +136,23 @@ fun ViewContent( } } + Spacer(Modifier.height(24.dp)) + + Row( + modifier = Modifier.fillMaxWidth(), + horizontalArrangement = Arrangement.SpaceAround, + ) { + Column { + MyButton(label = "Connect") { + onConnect() + } + } + Column { + MyButton(label = "Disconnect") { + onDisconnect() + } + } + } } } // This box should be at the bottom of the screen @@ -129,11 +160,10 @@ fun ViewContent( Column { Text(version) Text("""Connected: ${syncStatus.connected}""") + Text("""Connecting: ${syncStatus.connecting}""") } - } } - }, contentColor = Color.Unspecified, ) @@ -154,7 +184,7 @@ fun ListItem( Text( it, modifier = Modifier.weight(1f), - style = style ?: LocalTextStyle.current + style = style ?: LocalTextStyle.current, ) } } @@ -163,15 +193,13 @@ fun ListItem( if (divider) { Divider( color = Color.Black, - modifier = Modifier.align(Alignment.BottomStart) + modifier = Modifier.align(Alignment.BottomStart), ) } } - } } - @Composable fun MyButton( modifier: Modifier = Modifier, @@ -180,14 +208,14 @@ fun MyButton( ) { Column( modifier = - modifier - .clip(MaterialTheme.shapes.large) - .clickable(onClick = onClick).border( - width = 1.dp, - color = MaterialTheme.colors.primarySurface - ) - .padding(horizontal = 16.dp, vertical = 16.dp), - horizontalAlignment = Alignment.CenterHorizontally + modifier + .clip(MaterialTheme.shapes.large) + .clickable(onClick = onClick) + .border( + width = 1.dp, + color = MaterialTheme.colors.primarySurface, + ).padding(horizontal = 16.dp, vertical = 16.dp), + horizontalAlignment = Alignment.CenterHorizontally, ) { Text( label, diff --git a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/PowerSync.kt b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/PowerSync.kt index 9a13eb9e..383a3a6b 100644 --- a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/PowerSync.kt +++ b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/PowerSync.kt @@ -10,12 +10,12 @@ import kotlinx.coroutines.runBlocking class PowerSync( driverFactory: DatabaseDriverFactory, ) { - - private val connector = SupabaseConnector( - supabaseUrl = Config.SUPABASE_URL, - supabaseKey = Config.SUPABASE_ANON_KEY, - powerSyncEndpoint = Config.POWERSYNC_URL - ) + private val connector = + SupabaseConnector( + supabaseUrl = Config.SUPABASE_URL, + supabaseKey = Config.SUPABASE_ANON_KEY, + powerSyncEndpoint = Config.POWERSYNC_URL, + ) private val database = PowerSyncDatabase(driverFactory, AppSchema) val db: PowerSyncDatabase @@ -26,31 +26,33 @@ class PowerSync( try { connector.login(Config.SUPABASE_USER_EMAIL, Config.SUPABASE_USER_PASSWORD) } catch (e: Exception) { - println("Could not connect to Supabase, have you configured an auth user and set `SUPABASE_USER_EMAIL` and `SUPABASE_USER_PASSWORD`?\n Error: $e") + println( + "Could not connect to Supabase, have you configured an auth user and set `SUPABASE_USER_EMAIL` and `SUPABASE_USER_PASSWORD`?\n Error: $e", + ) } database.connect(connector) } } - suspend fun getPowersyncVersion(): String { - return database.getPowerSyncVersion() - } + suspend fun getPowersyncVersion(): String = database.getPowerSyncVersion() - fun watchUsers(): Flow> { - return database.watch("SELECT * FROM customers", mapper = { cursor -> + fun watchUsers(): Flow> = + database.watch("SELECT * FROM customers", mapper = { cursor -> User( id = cursor.getString("id"), name = cursor.getString("name"), - email = cursor.getString("email") + email = cursor.getString("email"), ) }) - } - suspend fun createUser(name: String, email: String) { + suspend fun createUser( + name: String, + email: String, + ) { database.writeTransaction { tx -> tx.execute( "INSERT INTO customers (id, name, email) VALUES (uuid(), ?, ?)", - listOf(name, email) + listOf(name, email), ) } } @@ -60,10 +62,18 @@ class PowerSync( id ?: database.getOptional("SELECT id FROM customers LIMIT 1", mapper = { cursor -> cursor.getString(0)!! }) - ?: return + ?: return database.writeTransaction { tx -> tx.execute("DELETE FROM customers WHERE id = ?", listOf(targetId)) } } -} \ No newline at end of file + + suspend fun connect() { + database.connect(connector) + } + + suspend fun disconnect() { + database.disconnect() + } +}