From 07dc77f7ef6a687d1a156c59aa1387f908d31fa0 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 18 Mar 2025 17:03:32 +0200 Subject: [PATCH 01/10] queue connects --- .../com/powersync/SyncIntegrationTest.kt | 178 +++++++++++++++++- .../com/powersync/testutils/TestUtils.kt | 4 + .../com/powersync/db/PowerSyncDatabaseImpl.kt | 91 ++++++++- .../powersync/testutils/MockSyncService.kt | 10 + .../kotlin/com/powersync/demos/App.kt | 54 +++--- 5 files changed, 295 insertions(+), 42 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index eee94854..09665783 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -1,6 +1,8 @@ package com.powersync import app.cash.turbine.turbineScope +import co.touchlab.kermit.ExperimentalKermitApi +import co.touchlab.kermit.LogWriter import co.touchlab.kermit.Logger import co.touchlab.kermit.Severity import co.touchlab.kermit.TestConfig @@ -18,13 +20,17 @@ import com.powersync.sync.SyncStream import com.powersync.testutils.MockSyncService import com.powersync.testutils.UserRow import com.powersync.testutils.cleanup +import com.powersync.testutils.factory import com.powersync.testutils.waitFor +import com.powersync.testutils.waitForString import com.powersync.utils.JsonUtil import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest @@ -38,7 +44,7 @@ import kotlin.test.assertFalse import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds -@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class) +@OptIn(ExperimentalKermitApi::class) class SyncIntegrationTest { private val logger = Logger( @@ -58,11 +64,11 @@ class SyncIntegrationTest { connector = mock { everySuspend { getCredentialsCached() } returns - PowerSyncCredentials( - token = "test-token", - userId = "test-user", - endpoint = "https://test.com", - ) + PowerSyncCredentials( + token = "test-token", + userId = "test-user", + endpoint = "https://test.com", + ) everySuspend { invalidateCredentials() } returns Unit } @@ -75,12 +81,15 @@ class SyncIntegrationTest { @AfterTest fun teardown() { + runBlocking { + database.close() + } cleanup("testdb") } private fun openDb() = PowerSyncDatabase( - factory = com.powersync.testutils.factory, + factory = factory, schema = Schema(UserRow.table), dbFilename = "testdb", ) as PowerSyncDatabaseImpl @@ -271,6 +280,26 @@ class SyncIntegrationTest { syncLines.close() } + @Test + fun setsConnectingState() = + runTest { + turbineScope(timeout = 10.0.seconds) { + val syncStream = syncStream() + val turbine = database.currentStatus.asFlow().testIn(this) + + database.connect(syncStream, 1000L) + turbine.waitFor { it.connecting } + + database.disconnect() + + turbine.waitFor { !it.connecting && !it.connected } + turbine.cancel() + } + + database.close() + syncLines.close() + } + @Test fun testMultipleSyncsDoNotCreateMultipleStatusEntries() = runTest { @@ -312,6 +341,141 @@ class SyncIntegrationTest { turbine.cancel() } + database.close() + syncLines.close() + } + + @Test + fun warnsMultipleConnectionAttempts() = + runTest { + val logMessages = MutableSharedFlow(extraBufferCapacity = 10) + + Logger.setLogWriters( + object : LogWriter() { + override fun log( + severity: Severity, + message: String, + tag: String, + throwable: Throwable?, + ) { + logMessages.tryEmit(message) + } + }, + ) + + Logger.setMinSeverity(Severity.Verbose) + + val db2 = + PowerSyncDatabase( + factory = factory, + schema = Schema(UserRow.table), + dbFilename = "testdb", + logger = Logger, + ) as PowerSyncDatabaseImpl + + turbineScope(timeout = 10.0.seconds) { + val logTurbine = logMessages.asSharedFlow().testIn(this) + + // Connect the first database + database.connect(connector, 1000L) + db2.connect(connector) + + logTurbine.waitForString { it == PowerSyncDatabaseImpl.streamConflictMessage } + + db2.disconnect() + + database.disconnect() + + logTurbine.cancel() + } + + db2.close() + database.close() + syncLines.close() + } + + @Test + fun queuesMultipleConnectionAttempts() = + runTest { + val db2 = + PowerSyncDatabase( + factory = factory, + schema = Schema(UserRow.table), + dbFilename = "testdb", + logger = Logger, + ) as PowerSyncDatabaseImpl + + turbineScope(timeout = 10.0.seconds) { + val turbine1 = database.currentStatus.asFlow().testIn(this) + val turbine2 = db2.currentStatus.asFlow().testIn(this) + + // Connect the first database + database.connect(connector, 1000L) + + turbine1.waitFor { it.connecting } + db2.connect(connector) + + // Should not be connecting yet + assertEquals(false, db2.currentStatus.connecting) + + database.disconnect() + turbine1.waitFor { !it.connecting } + + // Should start connecting after the other database disconnected + turbine2.waitFor { it.connecting } + db2.disconnect() + turbine2.waitFor { !it.connecting } + + turbine1.cancel() + turbine2.cancel() + } + + db2.close() + database.close() + syncLines.close() + } + + @Test + fun reconnectsAfterDisconnecting() = + runTest { + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + + // Connect the first database + database.connect(connector, 1000L) + turbine.waitFor { it.connecting } + + database.disconnect() + turbine.waitFor { !it.connecting } + + database.connect(connector, 1000L) + turbine.waitFor { it.connecting } + database.disconnect() + turbine.waitFor { !it.connecting } + + turbine.cancel() + } + + database.close() + syncLines.close() + } + + @Test + fun reconnects() = + runTest { + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + + // Connect the first database + database.connect(connector, 1000L, retryDelayMs = 5000) + turbine.waitFor { it.connecting } + + database.connect(connector, 1000L, retryDelayMs = 5000) + turbine.waitFor { it.connecting } + + turbine.cancel() + } + database.close() syncLines.close() } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 249e7bd8..cb2f6d96 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -1,7 +1,11 @@ package com.powersync.testutils import com.powersync.DatabaseDriverFactory +import com.powersync.PowerSyncDatabase +import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.connectors.PowerSyncCredentials expect val factory: DatabaseDriverFactory expect fun cleanup(path: String) + diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index b22fcffa..185f8261 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -1,6 +1,7 @@ package com.powersync.db import co.touchlab.kermit.Logger +import co.touchlab.stately.concurrency.AtomicBoolean import com.powersync.DatabaseDriverFactory import com.powersync.PowerSyncDatabase import com.powersync.PsSqlDriver @@ -33,6 +34,8 @@ import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Instant import kotlinx.datetime.LocalDateTime import kotlinx.datetime.TimeZone @@ -56,8 +59,34 @@ internal class PowerSyncDatabaseImpl( val logger: Logger = Logger, driver: PsSqlDriver = factory.createDriver(scope, dbFilename), ) : PowerSyncDatabase { + companion object { + /** + * We store a map of connection streaming mutexes, but we need to access that safely. + */ + internal val streamingMapMutex = Mutex() + + /** + * Manages streaming operations for the same database between clients + */ + internal val streamMutexes = mutableMapOf() + + internal val streamConflictMessage = + """ + Another PowerSync client is already connected to this database. + Multiple connections to the same database should be avoided. + Please check your PowerSync client instantiation logic. + This connection attempt will be queued and will only be executed after + currently connecting clients are disconnected. + """.trimIndent() + } + + /** + * Manages multiple connection attempts within the same client + */ + private val connectingMutex = Mutex() private val internalDb = InternalDatabaseImpl(driver, scope) internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) + val closed = AtomicBoolean(false) /** * The current sync status. @@ -95,8 +124,7 @@ internal class PowerSyncDatabaseImpl( crudThrottleMs: Long, retryDelayMs: Long, params: Map, - ) { - // close connection if one is open + ) = connectingMutex.withLock { disconnect() connect( @@ -118,9 +146,48 @@ internal class PowerSyncDatabaseImpl( crudThrottleMs: Long, ) { this.syncStream = stream + + val db = this + syncJob = scope.launch { - syncStream!!.streamingSync() + // Get a global lock for checking mutex maps + val streamMutex = + streamingMapMutex.withLock { + val streamMutex = streamMutexes.getOrPut(dbFilename) { Mutex() } + + // Poke the streaming mutex to see if another client is using it + // We check the mutex status in the global lock to prevent race conditions + // between multiple clients. + var obtainedLock = false + try { + // This call will throw if the lock is already held by this db client. + // We should never reach that point since we disconnect before connecting. + obtainedLock = streamMutex.tryLock(db) + if (!obtainedLock) { + // The mutex is held already by another PowerSync instance (owner). + // (The tryLock should throw if this client already holds the lock). + logger.w(streamConflictMessage) + } + } catch (ex: IllegalStateException) { + logger.e { "The streaming sync client did not disconnect before connecting" } + } finally { + if (obtainedLock) { + streamMutex.unlock(db) + } + } + + // It should be safe to use this mutex instance externally + return@withLock streamMutex + } + + // Start the Mutex request in this job. + // Disconnecting will cancel the job and request. + // This holds the lock while streaming is in progress. + streamMutex.withLock(db) { + // Check if another client is already holding the mutex + syncStream!!.streamingSync() + } } scope.launch { @@ -216,7 +283,8 @@ internal class PowerSyncDatabaseImpl( } } - override suspend fun getPowerSyncVersion(): String = internalDb.queries.powerSyncVersion().executeAsOne() + override suspend fun getPowerSyncVersion(): String = + internalDb.queries.powerSyncVersion().executeAsOne() override suspend fun get( sql: String, @@ -243,9 +311,11 @@ internal class PowerSyncDatabaseImpl( mapper: (SqlCursor) -> RowType, ): Flow> = internalDb.watch(sql, parameters, throttleMs, mapper) - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) + override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = + internalDb.writeTransaction(callback) - override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) + override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = + internalDb.writeTransaction(callback) override suspend fun execute( sql: String, @@ -316,7 +386,10 @@ internal class PowerSyncDatabaseImpl( SyncedAt( priority = BucketPriority(it.getLong(0)!!.toInt()), - syncedAt = LocalDateTime.parse(rawTime.replace(" ", "T")).toInstant(TimeZone.UTC), + syncedAt = + LocalDateTime + .parse(rawTime.replace(" ", "T")) + .toInstant(TimeZone.UTC), ) } @@ -364,6 +437,10 @@ internal class PowerSyncDatabaseImpl( } override suspend fun close() { + if (closed.value) { + return + } + closed.value = true disconnect() internalDb.close() } diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index eb57a232..1f655820 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -65,3 +65,13 @@ suspend inline fun ReceiveTurbine.waitFor(matcher: (SyncStatusDa } } } + +// JVM build fails if this is also called waitFor +suspend inline fun ReceiveTurbine.waitForString(matcher: (String) -> Boolean) { + while (true) { + val item = awaitItem() + if (matcher(item)) { + break + } + } +} diff --git a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt index b9655a62..0ed667aa 100644 --- a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt +++ b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt @@ -27,12 +27,14 @@ fun App() { var version by remember { mutableStateOf("Loading") } val scope = rememberCoroutineScope() val customers by powerSync.watchUsers().collectAsState(emptyList()) - val syncStatus by powerSync.db.currentStatus.asFlow().collectAsState(powerSync.db.currentStatus) + val syncStatus by powerSync.db.currentStatus + .asFlow() + .collectAsState(powerSync.db.currentStatus) MaterialTheme { Surface( modifier = Modifier.fillMaxSize(), - color = MaterialTheme.colors.background + color = MaterialTheme.colors.background, ) { LaunchedEffect(powerSync) { scope.launch { @@ -55,7 +57,7 @@ fun App() { powerSync.deleteUser() } }, - syncStatus = syncStatus + syncStatus = syncStatus, ) } } @@ -67,7 +69,7 @@ fun ViewContent( users: List, onCreate: () -> Unit, onDelete: () -> Unit, - syncStatus: SyncStatusData + syncStatus: SyncStatusData, ) { val layoutDirection = LocalLayoutDirection.current Scaffold( @@ -83,20 +85,20 @@ fun ViewContent( Box(modifier = Modifier.fillMaxSize()) { LazyColumn( contentPadding = - PaddingValues( - start = padding.calculateStartPadding(layoutDirection), - top = padding.calculateTopPadding() + 8.dp, - end = padding.calculateEndPadding(layoutDirection), - bottom = padding.calculateBottomPadding() + 80.dp - ), + PaddingValues( + start = padding.calculateStartPadding(layoutDirection), + top = padding.calculateTopPadding() + 8.dp, + end = padding.calculateEndPadding(layoutDirection), + bottom = padding.calculateBottomPadding() + 80.dp, + ), ) { item { - ListItem( "Name", "Email", style = TextStyle(fontWeight = FontWeight.Bold), - modifier = Modifier.padding(bottom = 8.dp), divider = false + modifier = Modifier.padding(bottom = 8.dp), + divider = false, ) } items(users) { @@ -108,7 +110,7 @@ fun ViewContent( Row( modifier = Modifier.fillMaxWidth(), - horizontalArrangement = Arrangement.SpaceAround + horizontalArrangement = Arrangement.SpaceAround, ) { Column { MyButton(label = "Create") { @@ -121,7 +123,6 @@ fun ViewContent( } } } - } } // This box should be at the bottom of the screen @@ -129,11 +130,10 @@ fun ViewContent( Column { Text(version) Text("""Connected: ${syncStatus.connected}""") + Text("""Connecting: ${syncStatus.connecting}""") } - } } - }, contentColor = Color.Unspecified, ) @@ -154,7 +154,7 @@ fun ListItem( Text( it, modifier = Modifier.weight(1f), - style = style ?: LocalTextStyle.current + style = style ?: LocalTextStyle.current, ) } } @@ -163,15 +163,13 @@ fun ListItem( if (divider) { Divider( color = Color.Black, - modifier = Modifier.align(Alignment.BottomStart) + modifier = Modifier.align(Alignment.BottomStart), ) } } - } } - @Composable fun MyButton( modifier: Modifier = Modifier, @@ -180,14 +178,14 @@ fun MyButton( ) { Column( modifier = - modifier - .clip(MaterialTheme.shapes.large) - .clickable(onClick = onClick).border( - width = 1.dp, - color = MaterialTheme.colors.primarySurface - ) - .padding(horizontal = 16.dp, vertical = 16.dp), - horizontalAlignment = Alignment.CenterHorizontally + modifier + .clip(MaterialTheme.shapes.large) + .clickable(onClick = onClick) + .border( + width = 1.dp, + color = MaterialTheme.colors.primarySurface, + ).padding(horizontal = 16.dp, vertical = 16.dp), + horizontalAlignment = Alignment.CenterHorizontally, ) { Text( label, From fa049e3533e294a59e7b1886b2e81d5241b96cbd Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 18 Mar 2025 17:06:11 +0200 Subject: [PATCH 02/10] cleanup --- .../kotlin/com/powersync/SyncIntegrationTest.kt | 10 +++++----- .../kotlin/com/powersync/testutils/TestUtils.kt | 4 ---- .../kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt | 9 +++------ 3 files changed, 8 insertions(+), 15 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 09665783..41caa5bf 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -64,11 +64,11 @@ class SyncIntegrationTest { connector = mock { everySuspend { getCredentialsCached() } returns - PowerSyncCredentials( - token = "test-token", - userId = "test-user", - endpoint = "https://test.com", - ) + PowerSyncCredentials( + token = "test-token", + userId = "test-user", + endpoint = "https://test.com", + ) everySuspend { invalidateCredentials() } returns Unit } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index cb2f6d96..249e7bd8 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -1,11 +1,7 @@ package com.powersync.testutils import com.powersync.DatabaseDriverFactory -import com.powersync.PowerSyncDatabase -import com.powersync.connectors.PowerSyncBackendConnector -import com.powersync.connectors.PowerSyncCredentials expect val factory: DatabaseDriverFactory expect fun cleanup(path: String) - diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 185f8261..d3297f5b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -283,8 +283,7 @@ internal class PowerSyncDatabaseImpl( } } - override suspend fun getPowerSyncVersion(): String = - internalDb.queries.powerSyncVersion().executeAsOne() + override suspend fun getPowerSyncVersion(): String = internalDb.queries.powerSyncVersion().executeAsOne() override suspend fun get( sql: String, @@ -311,11 +310,9 @@ internal class PowerSyncDatabaseImpl( mapper: (SqlCursor) -> RowType, ): Flow> = internalDb.watch(sql, parameters, throttleMs, mapper) - override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = - internalDb.writeTransaction(callback) + override suspend fun readTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) - override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = - internalDb.writeTransaction(callback) + override suspend fun writeTransaction(callback: ThrowableTransactionCallback): R = internalDb.writeTransaction(callback) override suspend fun execute( sql: String, From 8a763f29337728c38611bd1eb41d896728fd7e55 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 18 Mar 2025 17:32:35 +0200 Subject: [PATCH 03/10] Added connect buttons to demo. Added changelog --- CHANGELOG.md | 4 ++ .../com/powersync/demos/MainActivity.kt | 13 ++++-- .../kotlin/com/powersync/demos/App.kt | 30 ++++++++++++ .../kotlin/com/powersync/demos/PowerSync.kt | 46 +++++++++++-------- 4 files changed, 71 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a65124a8..d7ae23eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-BETA29 + +* Added queing protection and warnings when connecting multiple PowerSync clients to the same database file. + ## 1.0.0-BETA28 * Update PowerSync SQLite core extension to 0.3.12. diff --git a/demos/hello-powersync/composeApp/src/androidMain/kotlin/com/powersync/demos/MainActivity.kt b/demos/hello-powersync/composeApp/src/androidMain/kotlin/com/powersync/demos/MainActivity.kt index 1c66a58a..4209f49f 100644 --- a/demos/hello-powersync/composeApp/src/androidMain/kotlin/com/powersync/demos/MainActivity.kt +++ b/demos/hello-powersync/composeApp/src/androidMain/kotlin/com/powersync/demos/MainActivity.kt @@ -5,7 +5,6 @@ import androidx.activity.ComponentActivity import androidx.activity.compose.setContent import androidx.compose.runtime.Composable import androidx.compose.ui.tooling.preview.Preview -import com.powersync.DatabaseDriverFactory import com.powersync.sync.SyncStatus class MainActivity : ComponentActivity() { @@ -18,11 +17,18 @@ class MainActivity : ComponentActivity() { } } - @Preview @Composable fun ViewContentPreview() { - ViewContent("Preview", listOf(User("1", "John Doe", "john@example.com")), {}, {}, SyncStatus.empty()) + ViewContent( + "Preview", + listOf(User("1", "John Doe", "john@example.com")), + {}, + {}, + SyncStatus.empty(), + {}, + {}, + ) } @Preview @@ -36,4 +42,3 @@ fun ViewContentPreview_ListItem() { fun ViewContentPreview_MyButton() { MyButton(label = "Preview Button", onClick = {}) } - diff --git a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt index 0ed667aa..2e1f444e 100644 --- a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt +++ b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/App.kt @@ -58,6 +58,16 @@ fun App() { } }, syncStatus = syncStatus, + onConnect = { + scope.launch { + powerSync.connect() + } + }, + onDisconnect = { + scope.launch { + powerSync.disconnect() + } + }, ) } } @@ -70,6 +80,8 @@ fun ViewContent( onCreate: () -> Unit, onDelete: () -> Unit, syncStatus: SyncStatusData, + onConnect: () -> Unit, + onDisconnect: () -> Unit, ) { val layoutDirection = LocalLayoutDirection.current Scaffold( @@ -123,6 +135,24 @@ fun ViewContent( } } } + + Spacer(Modifier.height(24.dp)) + + Row( + modifier = Modifier.fillMaxWidth(), + horizontalArrangement = Arrangement.SpaceAround, + ) { + Column { + MyButton(label = "Connect") { + onConnect() + } + } + Column { + MyButton(label = "Disconnect") { + onDisconnect() + } + } + } } } // This box should be at the bottom of the screen diff --git a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/PowerSync.kt b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/PowerSync.kt index 9a13eb9e..383a3a6b 100644 --- a/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/PowerSync.kt +++ b/demos/hello-powersync/composeApp/src/commonMain/kotlin/com/powersync/demos/PowerSync.kt @@ -10,12 +10,12 @@ import kotlinx.coroutines.runBlocking class PowerSync( driverFactory: DatabaseDriverFactory, ) { - - private val connector = SupabaseConnector( - supabaseUrl = Config.SUPABASE_URL, - supabaseKey = Config.SUPABASE_ANON_KEY, - powerSyncEndpoint = Config.POWERSYNC_URL - ) + private val connector = + SupabaseConnector( + supabaseUrl = Config.SUPABASE_URL, + supabaseKey = Config.SUPABASE_ANON_KEY, + powerSyncEndpoint = Config.POWERSYNC_URL, + ) private val database = PowerSyncDatabase(driverFactory, AppSchema) val db: PowerSyncDatabase @@ -26,31 +26,33 @@ class PowerSync( try { connector.login(Config.SUPABASE_USER_EMAIL, Config.SUPABASE_USER_PASSWORD) } catch (e: Exception) { - println("Could not connect to Supabase, have you configured an auth user and set `SUPABASE_USER_EMAIL` and `SUPABASE_USER_PASSWORD`?\n Error: $e") + println( + "Could not connect to Supabase, have you configured an auth user and set `SUPABASE_USER_EMAIL` and `SUPABASE_USER_PASSWORD`?\n Error: $e", + ) } database.connect(connector) } } - suspend fun getPowersyncVersion(): String { - return database.getPowerSyncVersion() - } + suspend fun getPowersyncVersion(): String = database.getPowerSyncVersion() - fun watchUsers(): Flow> { - return database.watch("SELECT * FROM customers", mapper = { cursor -> + fun watchUsers(): Flow> = + database.watch("SELECT * FROM customers", mapper = { cursor -> User( id = cursor.getString("id"), name = cursor.getString("name"), - email = cursor.getString("email") + email = cursor.getString("email"), ) }) - } - suspend fun createUser(name: String, email: String) { + suspend fun createUser( + name: String, + email: String, + ) { database.writeTransaction { tx -> tx.execute( "INSERT INTO customers (id, name, email) VALUES (uuid(), ?, ?)", - listOf(name, email) + listOf(name, email), ) } } @@ -60,10 +62,18 @@ class PowerSync( id ?: database.getOptional("SELECT id FROM customers LIMIT 1", mapper = { cursor -> cursor.getString(0)!! }) - ?: return + ?: return database.writeTransaction { tx -> tx.execute("DELETE FROM customers WHERE id = ?", listOf(targetId)) } } -} \ No newline at end of file + + suspend fun connect() { + database.connect(connector) + } + + suspend fun disconnect() { + database.disconnect() + } +} From a4150c1ceaec7d2632d65f036a0a0bba3cf11e40 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 18 Mar 2025 17:34:23 +0200 Subject: [PATCH 04/10] cleanup --- .../kotlin/com/powersync/SyncIntegrationTest.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 41caa5bf..60433866 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -441,7 +441,6 @@ class SyncIntegrationTest { turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) - // Connect the first database database.connect(connector, 1000L) turbine.waitFor { it.connecting } From 4b5f6b39e8a898161a9be355057da32e362348fc Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 18 Mar 2025 17:35:57 +0200 Subject: [PATCH 05/10] cleanup --- .../kotlin/com/powersync/SyncIntegrationTest.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 60433866..b58f741c 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -465,7 +465,6 @@ class SyncIntegrationTest { turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) - // Connect the first database database.connect(connector, 1000L, retryDelayMs = 5000) turbine.waitFor { it.connecting } From 4f08d534d6d6172922aca111236aa7545e3a154a Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 19 Mar 2025 08:24:00 +0200 Subject: [PATCH 06/10] cleanup comment --- .../commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index d3297f5b..0a24278c 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -185,7 +185,6 @@ internal class PowerSyncDatabaseImpl( // Disconnecting will cancel the job and request. // This holds the lock while streaming is in progress. streamMutex.withLock(db) { - // Check if another client is already holding the mutex syncStream!!.streamingSync() } } From 334b18487a08b8647861ac22814e33b9c29d2125 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Mar 2025 13:26:43 +0200 Subject: [PATCH 07/10] improve exclusive methods --- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 99 ++++++++----------- .../utils/ExclusiveMethodProvider.kt | 39 ++++++++ 2 files changed, 81 insertions(+), 57 deletions(-) create mode 100644 core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 0a24278c..dbcbe24a 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -1,7 +1,6 @@ package com.powersync.db import co.touchlab.kermit.Logger -import co.touchlab.stately.concurrency.AtomicBoolean import com.powersync.DatabaseDriverFactory import com.powersync.PowerSyncDatabase import com.powersync.PsSqlDriver @@ -21,6 +20,7 @@ import com.powersync.sync.PriorityStatusEntry import com.powersync.sync.SyncStatus import com.powersync.sync.SyncStatusData import com.powersync.sync.SyncStream +import com.powersync.utils.ExclusiveMethodProvider import com.powersync.utils.JsonParam import com.powersync.utils.JsonUtil import com.powersync.utils.throttle @@ -29,13 +29,12 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Instant import kotlinx.datetime.LocalDateTime import kotlinx.datetime.TimeZone @@ -58,18 +57,9 @@ internal class PowerSyncDatabaseImpl( private val dbFilename: String, val logger: Logger = Logger, driver: PsSqlDriver = factory.createDriver(scope, dbFilename), -) : PowerSyncDatabase { +) : ExclusiveMethodProvider(), + PowerSyncDatabase { companion object { - /** - * We store a map of connection streaming mutexes, but we need to access that safely. - */ - internal val streamingMapMutex = Mutex() - - /** - * Manages streaming operations for the same database between clients - */ - internal val streamMutexes = mutableMapOf() - internal val streamConflictMessage = """ Another PowerSync client is already connected to this database. @@ -80,13 +70,9 @@ internal class PowerSyncDatabaseImpl( """.trimIndent() } - /** - * Manages multiple connection attempts within the same client - */ - private val connectingMutex = Mutex() private val internalDb = InternalDatabaseImpl(driver, scope) internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) - val closed = AtomicBoolean(false) + var closed = false /** * The current sync status. @@ -124,7 +110,7 @@ internal class PowerSyncDatabaseImpl( crudThrottleMs: Long, retryDelayMs: Long, params: Map, - ) = connectingMutex.withLock { + ) = exclusiveMethod("connect") { disconnect() connect( @@ -153,39 +139,37 @@ internal class PowerSyncDatabaseImpl( scope.launch { // Get a global lock for checking mutex maps val streamMutex = - streamingMapMutex.withLock { - val streamMutex = streamMutexes.getOrPut(dbFilename) { Mutex() } - - // Poke the streaming mutex to see if another client is using it - // We check the mutex status in the global lock to prevent race conditions - // between multiple clients. - var obtainedLock = false - try { - // This call will throw if the lock is already held by this db client. - // We should never reach that point since we disconnect before connecting. - obtainedLock = streamMutex.tryLock(db) - if (!obtainedLock) { - // The mutex is held already by another PowerSync instance (owner). - // (The tryLock should throw if this client already holds the lock). - logger.w(streamConflictMessage) - } - } catch (ex: IllegalStateException) { - logger.e { "The streaming sync client did not disconnect before connecting" } - } finally { - if (obtainedLock) { - streamMutex.unlock(db) - } - } - - // It should be safe to use this mutex instance externally - return@withLock streamMutex + globalMutexFor("streaming-$dbFilename") + + // Poke the streaming mutex to see if another client is using it + // We check the mutex status in the global lock to prevent race conditions + // between multiple clients. + var obtainedLock = false + try { + // This call will throw if the lock is already held by this db client. + // We should never reach that point since we disconnect before connecting. + obtainedLock = streamMutex.tryLock(db) + if (!obtainedLock) { + // The mutex is held already by another PowerSync instance (owner). + // (The tryLock should throw if this client already holds the lock). + logger.w(streamConflictMessage) } + } catch (ex: IllegalStateException) { + logger.e { "The streaming sync client did not disconnect before connecting" } + } - // Start the Mutex request in this job. - // Disconnecting will cancel the job and request. - // This holds the lock while streaming is in progress. - streamMutex.withLock(db) { + // This effectively queues operations + if (!obtainedLock) { + // This will throw a CancellationException if the job was cancelled while waiting. + streamMutex.lock(db) + } + + // We have a lock if we reached here + try { + ensureActive() syncStream!!.streamingSync() + } finally { + streamMutex.unlock(db) } } @@ -432,14 +416,15 @@ internal class PowerSyncDatabaseImpl( currentStatus.asFlow().first(predicate) } - override suspend fun close() { - if (closed.value) { - return + override suspend fun close() = + exclusiveMethod("close") { + if (closed) { + return@exclusiveMethod + } + disconnect() + internalDb.close() + closed = true } - closed.value = true - disconnect() - internalDb.close() - } /** * Check that a supported version of the powersync extension is loaded. diff --git a/core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt b/core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt new file mode 100644 index 00000000..f71d99c1 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt @@ -0,0 +1,39 @@ +package com.powersync.utils + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock + +// Helper class for executing methods exclusively +internal open class ExclusiveMethodProvider { + // Class level mutexes + private val mapLock = Mutex() + private val mutexMap = mutableMapOf() + + companion object { + // global level mutexes for global exclusivity + private val staticMapLock = Mutex() + private val staticMutexes = mutableMapOf() + + // Runs the callback exclusively on the global level + internal suspend fun globallyExclusive( + lockName: String, + callback: suspend () -> R, + ) = globalMutexFor(lockName).withLock { callback() } + + internal suspend fun globalMutexFor(lockName: String): Mutex = + staticMapLock.withLock { + staticMutexes.getOrPut(lockName) { Mutex() } + } + } + + // A method for running a callback exclusively on the class instance level + internal suspend fun exclusiveMethod( + lockName: String, + callback: suspend () -> R, + ): R = mutexFor(lockName).withLock { callback() } + + internal suspend fun mutexFor(lockName: String): Mutex = + mapLock.withLock { + mutexMap.getOrPut(lockName) { Mutex() } + } +} From 11efa12546d047c3c4227f4e0c5d9c8af7cc7f62 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Mar 2025 16:25:37 +0200 Subject: [PATCH 08/10] Add store for active instances --- .../kotlin/com/powersync/DatabaseTest.kt | 54 ++++++++++++++++--- .../com/powersync/SyncIntegrationTest.kt | 46 ++++++---------- .../kotlin/com/powersync/PowerSyncDatabase.kt | 6 +++ .../com/powersync/db/ActiveInstanceStore.kt | 23 ++++++++ .../com/powersync/db/PowerSyncDatabaseImpl.kt | 19 ++++++- .../powersync/testutils/MockSyncService.kt | 10 ---- .../kotlin/com/powersync/testutils/waitFor.kt | 25 +++++++++ 7 files changed, 137 insertions(+), 46 deletions(-) create mode 100644 core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt create mode 100644 core/src/commonTest/kotlin/com/powersync/testutils/waitFor.kt diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index 3472c880..f3fa85cf 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -1,26 +1,53 @@ package com.powersync import app.cash.turbine.turbineScope +import co.touchlab.kermit.ExperimentalKermitApi +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.TestConfig +import co.touchlab.kermit.TestLogWriter +import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow +import com.powersync.testutils.waitFor import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertNotNull +@OptIn(ExperimentalKermitApi::class) class DatabaseTest { + private val logWriter = + TestLogWriter( + loggable = Severity.Debug, + ) + + private val logger = + Logger( + TestConfig( + minSeverity = Severity.Debug, + logWriterList = listOf(logWriter), + ), + ) + private lateinit var database: PowerSyncDatabase + private fun openDB() = + PowerSyncDatabase( + factory = com.powersync.testutils.factory, + schema = Schema(UserRow.table), + dbFilename = "testdb", + logger = logger, + ) + @BeforeTest fun setupDatabase() { - database = - PowerSyncDatabase( - factory = com.powersync.testutils.factory, - schema = Schema(UserRow.table), - dbFilename = "testdb", - ) + logWriter.reset() + + database = openDB() runBlocking { database.disconnectAndClear(true) @@ -86,4 +113,19 @@ class DatabaseTest { query.cancel() } } + + @Test + fun warnsMultipleInstances() = + runTest { + // Opens a second DB with the same database filename + val db2 = openDB() + waitFor { + assertNotNull( + logWriter.logs.find { + it.message == PowerSyncDatabaseImpl.multipleInstancesMessage + }, + ) + } + db2.close() + } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index b58f741c..043a8495 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -2,10 +2,10 @@ package com.powersync import app.cash.turbine.turbineScope import co.touchlab.kermit.ExperimentalKermitApi -import co.touchlab.kermit.LogWriter import co.touchlab.kermit.Logger import co.touchlab.kermit.Severity import co.touchlab.kermit.TestConfig +import co.touchlab.kermit.TestLogWriter import com.powersync.bucket.BucketChecksum import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint @@ -22,15 +22,12 @@ import com.powersync.testutils.UserRow import com.powersync.testutils.cleanup import com.powersync.testutils.factory import com.powersync.testutils.waitFor -import com.powersync.testutils.waitForString import com.powersync.utils.JsonUtil import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest @@ -41,16 +38,22 @@ import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFalse +import kotlin.test.assertNotNull import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds @OptIn(ExperimentalKermitApi::class) class SyncIntegrationTest { + private val logWriter = + TestLogWriter( + loggable = Severity.Debug, + ) + private val logger = Logger( TestConfig( minSeverity = Severity.Debug, - logWriterList = listOf(), + logWriterList = listOf(logWriter), ), ) private lateinit var database: PowerSyncDatabaseImpl @@ -60,6 +63,7 @@ class SyncIntegrationTest { @BeforeTest fun setup() { cleanup("testdb") + logWriter.reset() database = openDb() connector = mock { @@ -348,45 +352,29 @@ class SyncIntegrationTest { @Test fun warnsMultipleConnectionAttempts() = runTest { - val logMessages = MutableSharedFlow(extraBufferCapacity = 10) - - Logger.setLogWriters( - object : LogWriter() { - override fun log( - severity: Severity, - message: String, - tag: String, - throwable: Throwable?, - ) { - logMessages.tryEmit(message) - } - }, - ) - - Logger.setMinSeverity(Severity.Verbose) - val db2 = PowerSyncDatabase( factory = factory, schema = Schema(UserRow.table), dbFilename = "testdb", - logger = Logger, + logger = logger, ) as PowerSyncDatabaseImpl turbineScope(timeout = 10.0.seconds) { - val logTurbine = logMessages.asSharedFlow().testIn(this) - // Connect the first database database.connect(connector, 1000L) db2.connect(connector) - logTurbine.waitForString { it == PowerSyncDatabaseImpl.streamConflictMessage } + waitFor { + assertNotNull( + logWriter.logs.find { + it.message == PowerSyncDatabaseImpl.streamConflictMessage + }, + ) + } db2.disconnect() - database.disconnect() - - logTurbine.cancel() } db2.close() diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt index 9e0a9564..71d2de99 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt @@ -19,6 +19,12 @@ import kotlin.coroutines.cancellation.CancellationException * All changes to local tables are automatically recorded, whether connected or not. Once connected, the changes are uploaded. */ public interface PowerSyncDatabase : Queries { + /** + * Identifies the database client. + * This is typically the database name. + */ + public val identifier: String + /** * The current sync status. */ diff --git a/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt b/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt new file mode 100644 index 00000000..a70e6661 --- /dev/null +++ b/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt @@ -0,0 +1,23 @@ +package com.powersync.db + +import com.powersync.PowerSyncDatabase +import com.powersync.utils.ExclusiveMethodProvider + +internal class ActiveInstanceStore : ExclusiveMethodProvider() { + private val instances = mutableListOf() + + /** + * Registers an instance. Returns true if multiple instances with the same identifier are + * present. + */ + suspend fun registerAndCheckInstance(db: PowerSyncDatabase) = + exclusiveMethod("instances") { + instances.add(db) + return@exclusiveMethod instances.filter { it.identifier == db.identifier }.size > 1 + } + + suspend fun removeInstance(db: PowerSyncDatabase) = + exclusiveMethod("instances") { + instances.remove(db) + } +} diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index dbcbe24a..a285ad43 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -68,8 +68,19 @@ internal class PowerSyncDatabaseImpl( This connection attempt will be queued and will only be executed after currently connecting clients are disconnected. """.trimIndent() + + internal val multipleInstancesMessage = + """ + Multiple PowerSync instances for the same database have been detected. + This can cause unexpected results. + Please check your PowerSync client instantiation logic if this is not intentional. + """.trimIndent() + + internal val instanceStore = ActiveInstanceStore() } + override val identifier = dbFilename + private val internalDb = InternalDatabaseImpl(driver, scope) internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) var closed = false @@ -86,7 +97,12 @@ internal class PowerSyncDatabaseImpl( private var uploadJob: Job? = null init { + val db = this runBlocking { + val isMultiple = instanceStore.registerAndCheckInstance(db) + if (isMultiple) { + logger.w { multipleInstancesMessage } + } val sqliteVersion = internalDb.queries.sqliteVersion().executeAsOne() logger.d { "SQLiteVersion: $sqliteVersion" } checkVersion() @@ -139,7 +155,7 @@ internal class PowerSyncDatabaseImpl( scope.launch { // Get a global lock for checking mutex maps val streamMutex = - globalMutexFor("streaming-$dbFilename") + globalMutexFor("streaming-$identifier") // Poke the streaming mutex to see if another client is using it // We check the mutex status in the global lock to prevent race conditions @@ -423,6 +439,7 @@ internal class PowerSyncDatabaseImpl( } disconnect() internalDb.close() + instanceStore.removeInstance(this) closed = true } diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index 1f655820..eb57a232 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -65,13 +65,3 @@ suspend inline fun ReceiveTurbine.waitFor(matcher: (SyncStatusDa } } } - -// JVM build fails if this is also called waitFor -suspend inline fun ReceiveTurbine.waitForString(matcher: (String) -> Boolean) { - while (true) { - val item = awaitItem() - if (matcher(item)) { - break - } - } -} diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/waitFor.kt b/core/src/commonTest/kotlin/com/powersync/testutils/waitFor.kt new file mode 100644 index 00000000..cbc8fdb5 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/testutils/waitFor.kt @@ -0,0 +1,25 @@ +package com.powersync.testutils + +import kotlinx.coroutines.delay +import kotlinx.datetime.Clock +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +internal suspend fun waitFor( + timeout: Duration = 500.milliseconds, + interval: Duration = 100.milliseconds, + test: () -> Unit, +) { + val begin = Clock.System.now().toEpochMilliseconds() + do { + try { + test() + return + } catch (_: Error) { + // Treat exceptions as failed + } + delay(interval.inWholeMilliseconds) + } while ((Clock.System.now().toEpochMilliseconds() - begin) < timeout.inWholeMilliseconds) + + throw Exception("Timeout reached") +} From db5b10f1cd1f43aa9b1b18288896ea68491ad8f5 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Mar 2025 16:33:33 +0200 Subject: [PATCH 09/10] cleanup --- .../commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index a285ad43..edd08017 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -158,8 +158,6 @@ internal class PowerSyncDatabaseImpl( globalMutexFor("streaming-$identifier") // Poke the streaming mutex to see if another client is using it - // We check the mutex status in the global lock to prevent race conditions - // between multiple clients. var obtainedLock = false try { // This call will throw if the lock is already held by this db client. From a6de9a5240aaee786db987fb9252bf192d7a759a Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Mar 2025 16:42:04 +0200 Subject: [PATCH 10/10] rename file for lint --- .../kotlin/com/powersync/testutils/{waitFor.kt => WaitFor.kt} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename core/src/commonTest/kotlin/com/powersync/testutils/{waitFor.kt => WaitFor.kt} (100%) diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/waitFor.kt b/core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt similarity index 100% rename from core/src/commonTest/kotlin/com/powersync/testutils/waitFor.kt rename to core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt