diff --git a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt index a6a90eff..182c7560 100644 --- a/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt +++ b/core-tests-android/src/androidTest/java/com/powersync/AndroidDatabaseTest.kt @@ -34,7 +34,10 @@ class AndroidDatabaseTest { @After fun tearDown() { - runBlocking { database.disconnectAndClear(true) } + runBlocking { + database.disconnectAndClear(true) + database.close() + } } @Test diff --git a/core/build.gradle.kts b/core/build.gradle.kts index be0733c3..648a1444 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -211,6 +211,11 @@ kotlin { dependsOn(commonTest.get()) } + val commonJava by creating { + kotlin.srcDir("commonJava") + dependsOn(commonMain.get()) + } + commonMain.dependencies { implementation(libs.uuid) implementation(libs.kotlin.stdlib) @@ -226,13 +231,18 @@ kotlin { api(libs.kermit) } - androidMain.dependencies { - implementation(libs.ktor.client.okhttp) + androidMain { + dependsOn(commonJava) + dependencies.implementation(libs.ktor.client.okhttp) } - jvmMain.dependencies { - implementation(libs.ktor.client.okhttp) - implementation(libs.sqlite.jdbc) + jvmMain { + dependsOn(commonJava) + + dependencies { + implementation(libs.ktor.client.okhttp) + implementation(libs.sqlite.jdbc) + } } iosMain.dependencies { diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index f3fa85cf..6a72e27c 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -6,7 +6,7 @@ import co.touchlab.kermit.Logger import co.touchlab.kermit.Severity import co.touchlab.kermit.TestConfig import co.touchlab.kermit.TestLogWriter -import com.powersync.db.PowerSyncDatabaseImpl +import com.powersync.db.ActiveDatabaseGroup import com.powersync.db.schema.Schema import com.powersync.testutils.UserRow import com.powersync.testutils.waitFor @@ -122,7 +122,7 @@ class DatabaseTest { waitFor { assertNotNull( logWriter.logs.find { - it.message == PowerSyncDatabaseImpl.multipleInstancesMessage + it.message == ActiveDatabaseGroup.multipleInstancesMessage }, ) } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 043a8495..ad911963 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -120,7 +120,7 @@ class SyncIntegrationTest { fun testPartialSync() = runTest { val syncStream = syncStream() - database.connect(syncStream, 1000L) + database.connectInternal(syncStream, 1000L) val checksums = buildList { @@ -214,7 +214,7 @@ class SyncIntegrationTest { fun testRemembersLastPartialSync() = runTest { val syncStream = syncStream() - database.connect(syncStream, 1000L) + database.connectInternal(syncStream, 1000L) syncLines.send( SyncLine.FullCheckpoint( @@ -253,7 +253,7 @@ class SyncIntegrationTest { fun setsDownloadingState() = runTest { val syncStream = syncStream() - database.connect(syncStream, 1000L) + database.connectInternal(syncStream, 1000L) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -291,7 +291,7 @@ class SyncIntegrationTest { val syncStream = syncStream() val turbine = database.currentStatus.asFlow().testIn(this) - database.connect(syncStream, 1000L) + database.connectInternal(syncStream, 1000L) turbine.waitFor { it.connecting } database.disconnect() @@ -308,7 +308,7 @@ class SyncIntegrationTest { fun testMultipleSyncsDoNotCreateMultipleStatusEntries() = runTest { val syncStream = syncStream() - database.connect(syncStream, 1000L) + database.connectInternal(syncStream, 1000L) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) diff --git a/core/src/commonJava/kotlin/com/powersync/db/ActiveInstanceStore.commonJava.kt b/core/src/commonJava/kotlin/com/powersync/db/ActiveInstanceStore.commonJava.kt new file mode 100644 index 00000000..7db8439d --- /dev/null +++ b/core/src/commonJava/kotlin/com/powersync/db/ActiveInstanceStore.commonJava.kt @@ -0,0 +1,27 @@ +package com.powersync.db + +internal actual fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any { + // We can't do this on Java 8 :( + return object {} +} + +// This would require Java 9+ + +/* +import java.lang.ref.Cleaner + +internal actual fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any { + // Note: It's important that the returned object does not reference the resource directly + val wrapper = CleanableWrapper() + CleanableWrapper.cleaner.register(wrapper, resource::dispose) + return wrapper +} + +private class CleanableWrapper { + var cleanable: Cleaner.Cleanable? = null + + companion object { + val cleaner: Cleaner = Cleaner.create() + } +} +*/ diff --git a/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt b/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt index a70e6661..fba11bd1 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt @@ -1,23 +1,94 @@ package com.powersync.db -import com.powersync.PowerSyncDatabase -import com.powersync.utils.ExclusiveMethodProvider - -internal class ActiveInstanceStore : ExclusiveMethodProvider() { - private val instances = mutableListOf() - - /** - * Registers an instance. Returns true if multiple instances with the same identifier are - * present. - */ - suspend fun registerAndCheckInstance(db: PowerSyncDatabase) = - exclusiveMethod("instances") { - instances.add(db) - return@exclusiveMethod instances.filter { it.identifier == db.identifier }.size > 1 +import co.touchlab.kermit.Logger +import co.touchlab.stately.concurrency.AtomicBoolean +import co.touchlab.stately.concurrency.Synchronizable +import co.touchlab.stately.concurrency.synchronize +import kotlinx.coroutines.sync.Mutex + +/** + * Returns an object that, when deallocated, calls [ActiveDatabaseResource.dispose]. + */ +internal expect fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any + +/** + * An collection of PowerSync databases with the same path / identifier. + * + * We expect that each group will only ever have one database because we encourage users to write their databases as + * singletons. We print a warning when two databases are part of the same group. + * Additionally, we want to avoid two databases in the same group having a sync stream open at the same time to avoid + * duplicate resources being used. For this reason, each active database group has a coroutine mutex guarding the + * sync job. + */ +internal class ActiveDatabaseGroup( + val identifier: String, + private val collection: GroupsCollection, +) { + internal var refCount = 0 // Guarded by companion object + internal val syncMutex = Mutex() + + fun removeUsage() { + collection.synchronize { + if (--refCount == 0) { + collection.allGroups.remove(this) + } } + } + + internal open class GroupsCollection : Synchronizable() { + internal val allGroups = mutableListOf() + + private fun findGroup( + warnOnDuplicate: Logger, + identifier: String, + ): ActiveDatabaseGroup = + synchronize { + val existing = allGroups.asSequence().firstOrNull { it.identifier == identifier } + val resolvedGroup = + if (existing == null) { + val added = ActiveDatabaseGroup(identifier, this) + allGroups.add(added) + added + } else { + existing + } + + if (resolvedGroup.refCount++ != 0) { + warnOnDuplicate.w { multipleInstancesMessage } + } + + resolvedGroup + } + + internal fun referenceDatabase( + warnOnDuplicate: Logger, + identifier: String, + ): Pair { + val group = findGroup(warnOnDuplicate, identifier) + val resource = ActiveDatabaseResource(group) + + return resource to disposeWhenDeallocated(resource) + } + } + + companion object : GroupsCollection() { + internal val multipleInstancesMessage = + """ + Multiple PowerSync instances for the same database have been detected. + This can cause unexpected results. + Please check your PowerSync client instantiation logic if this is not intentional. + """.trimIndent() + } +} + +internal class ActiveDatabaseResource( + val group: ActiveDatabaseGroup, +) { + val disposed = AtomicBoolean(false) - suspend fun removeInstance(db: PowerSyncDatabase) = - exclusiveMethod("instances") { - instances.remove(db) + fun dispose() { + if (disposed.compareAndSet(false, true)) { + group.removeUsage() } + } } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index edd08017..a0e06bf1 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -20,7 +20,6 @@ import com.powersync.sync.PriorityStatusEntry import com.powersync.sync.SyncStatus import com.powersync.sync.SyncStatusData import com.powersync.sync.SyncStream -import com.powersync.utils.ExclusiveMethodProvider import com.powersync.utils.JsonParam import com.powersync.utils.JsonUtil import com.powersync.utils.throttle @@ -35,6 +34,8 @@ import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Instant import kotlinx.datetime.LocalDateTime import kotlinx.datetime.TimeZone @@ -57,8 +58,7 @@ internal class PowerSyncDatabaseImpl( private val dbFilename: String, val logger: Logger = Logger, driver: PsSqlDriver = factory.createDriver(scope, dbFilename), -) : ExclusiveMethodProvider(), - PowerSyncDatabase { +) : PowerSyncDatabase { companion object { internal val streamConflictMessage = """ @@ -68,21 +68,15 @@ internal class PowerSyncDatabaseImpl( This connection attempt will be queued and will only be executed after currently connecting clients are disconnected. """.trimIndent() - - internal val multipleInstancesMessage = - """ - Multiple PowerSync instances for the same database have been detected. - This can cause unexpected results. - Please check your PowerSync client instantiation logic if this is not intentional. - """.trimIndent() - - internal val instanceStore = ActiveInstanceStore() } override val identifier = dbFilename private val internalDb = InternalDatabaseImpl(driver, scope) internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) + private val resource: ActiveDatabaseResource + private val clearResourceWhenDisposed: Any + var closed = false /** @@ -90,19 +84,17 @@ internal class PowerSyncDatabaseImpl( */ override val currentStatus: SyncStatus = SyncStatus() + private val mutex = Mutex() private var syncStream: SyncStream? = null - private var syncJob: Job? = null - private var uploadJob: Job? = null init { - val db = this + val res = ActiveDatabaseGroup.referenceDatabase(logger, identifier) + resource = res.first + clearResourceWhenDisposed = res.second + runBlocking { - val isMultiple = instanceStore.registerAndCheckInstance(db) - if (isMultiple) { - logger.w { multipleInstancesMessage } - } val sqliteVersion = internalDb.queries.sqliteVersion().executeAsOne() logger.d { "SQLiteVersion: $sqliteVersion" } checkVersion() @@ -126,10 +118,10 @@ internal class PowerSyncDatabaseImpl( crudThrottleMs: Long, retryDelayMs: Long, params: Map, - ) = exclusiveMethod("connect") { - disconnect() + ) = mutex.withLock { + disconnectInternal() - connect( + connectInternal( SyncStream( bucketStorage = bucketStorage, connector = connector, @@ -143,7 +135,7 @@ internal class PowerSyncDatabaseImpl( } @OptIn(FlowPreview::class) - internal fun connect( + internal fun connectInternal( stream: SyncStream, crudThrottleMs: Long, ) { @@ -154,8 +146,7 @@ internal class PowerSyncDatabaseImpl( syncJob = scope.launch { // Get a global lock for checking mutex maps - val streamMutex = - globalMutexFor("streaming-$identifier") + val streamMutex = resource.group.syncMutex // Poke the streaming mutex to see if another client is using it var obtainedLock = false @@ -337,7 +328,9 @@ internal class PowerSyncDatabaseImpl( } } - override suspend fun disconnect() { + override suspend fun disconnect() = mutex.withLock { disconnectInternal() } + + private suspend fun disconnectInternal() { if (syncJob != null && syncJob!!.isActive) { syncJob?.cancelAndJoin() } @@ -431,13 +424,13 @@ internal class PowerSyncDatabaseImpl( } override suspend fun close() = - exclusiveMethod("close") { + mutex.withLock { if (closed) { - return@exclusiveMethod + return@withLock } - disconnect() + disconnectInternal() internalDb.close() - instanceStore.removeInstance(this) + resource.dispose() closed = true } diff --git a/core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt b/core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt deleted file mode 100644 index f71d99c1..00000000 --- a/core/src/commonMain/kotlin/com/powersync/utils/ExclusiveMethodProvider.kt +++ /dev/null @@ -1,39 +0,0 @@ -package com.powersync.utils - -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock - -// Helper class for executing methods exclusively -internal open class ExclusiveMethodProvider { - // Class level mutexes - private val mapLock = Mutex() - private val mutexMap = mutableMapOf() - - companion object { - // global level mutexes for global exclusivity - private val staticMapLock = Mutex() - private val staticMutexes = mutableMapOf() - - // Runs the callback exclusively on the global level - internal suspend fun globallyExclusive( - lockName: String, - callback: suspend () -> R, - ) = globalMutexFor(lockName).withLock { callback() } - - internal suspend fun globalMutexFor(lockName: String): Mutex = - staticMapLock.withLock { - staticMutexes.getOrPut(lockName) { Mutex() } - } - } - - // A method for running a callback exclusively on the class instance level - internal suspend fun exclusiveMethod( - lockName: String, - callback: suspend () -> R, - ): R = mutexFor(lockName).withLock { callback() } - - internal suspend fun mutexFor(lockName: String): Mutex = - mapLock.withLock { - mutexMap.getOrPut(lockName) { Mutex() } - } -} diff --git a/core/src/commonTest/kotlin/com/powersync/db/ActiveDatabaseGroupTest.kt b/core/src/commonTest/kotlin/com/powersync/db/ActiveDatabaseGroupTest.kt new file mode 100644 index 00000000..87c84124 --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/db/ActiveDatabaseGroupTest.kt @@ -0,0 +1,81 @@ +package com.powersync.db + +import co.touchlab.kermit.ExperimentalKermitApi +import co.touchlab.kermit.Logger +import co.touchlab.kermit.Severity +import co.touchlab.kermit.TestConfig +import co.touchlab.kermit.TestLogWriter +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertNotNull + +@OptIn(ExperimentalKermitApi::class) +class ActiveDatabaseGroupTest { + private val logWriter = + TestLogWriter( + loggable = Severity.Debug, + ) + + private val logger = + Logger( + TestConfig( + minSeverity = Severity.Debug, + logWriterList = listOf(logWriter), + ), + ) + + private lateinit var collection: ActiveDatabaseGroup.GroupsCollection + + @BeforeTest + fun setup() { + collection = ActiveDatabaseGroup.GroupsCollection() + logWriter.reset() + } + + @Test + fun testTrackDatabase() { + val usage = collection.referenceDatabase(logger, "test") + assertEquals(1, collection.allGroups.size) + + usage.first.dispose() + assertEquals(0, collection.allGroups.size) + } + + @Test + fun testWarnsOnDuplicate() { + val usage = collection.referenceDatabase(logger, "test") + assertEquals(1, collection.allGroups.size) + + assertEquals(0, logWriter.logs.size) + + val another = collection.referenceDatabase(logger, "test") + assertNotNull( + logWriter.logs.find { + it.message == ActiveDatabaseGroup.multipleInstancesMessage + }, + ) + + assertEquals(usage.first.group, another.first.group) + + usage.first.dispose() + assertEquals(1, collection.allGroups.size) + another.first.dispose() + assertEquals(0, collection.allGroups.size) + } + + @Test + fun testDoesNotWarnForDifferentIdentifiers() { + val usage = collection.referenceDatabase(logger, "test") + assertEquals(1, collection.allGroups.size) + val another = collection.referenceDatabase(logger, "test2") + assertEquals(2, collection.allGroups.size) + + assertEquals(0, logWriter.logs.size) + + usage.first.dispose() + assertEquals(1, collection.allGroups.size) + another.first.dispose() + assertEquals(0, collection.allGroups.size) + } +} diff --git a/core/src/nativeMain/kotlin/com/powersync/db/ActiveInstanceStore.native.kt b/core/src/nativeMain/kotlin/com/powersync/db/ActiveInstanceStore.native.kt new file mode 100644 index 00000000..29181ba1 --- /dev/null +++ b/core/src/nativeMain/kotlin/com/powersync/db/ActiveInstanceStore.native.kt @@ -0,0 +1,7 @@ +package com.powersync.db + +import kotlin.experimental.ExperimentalNativeApi +import kotlin.native.ref.createCleaner + +@OptIn(ExperimentalNativeApi::class) +internal actual fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any = createCleaner(resource) { it.dispose() }