Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ class AndroidDatabaseTest {

@After
fun tearDown() {
runBlocking { database.disconnectAndClear(true) }
runBlocking {
database.disconnectAndClear(true)
database.close()
}
}

@Test
Expand Down
20 changes: 15 additions & 5 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ kotlin {
dependsOn(commonTest.get())
}

val commonJava by creating {
kotlin.srcDir("commonJava")
dependsOn(commonMain.get())
}

commonMain.dependencies {
implementation(libs.uuid)
implementation(libs.kotlin.stdlib)
Expand All @@ -226,13 +231,18 @@ kotlin {
api(libs.kermit)
}

androidMain.dependencies {
implementation(libs.ktor.client.okhttp)
androidMain {
dependsOn(commonJava)
dependencies.implementation(libs.ktor.client.okhttp)
}

jvmMain.dependencies {
implementation(libs.ktor.client.okhttp)
implementation(libs.sqlite.jdbc)
jvmMain {
dependsOn(commonJava)

dependencies {
implementation(libs.ktor.client.okhttp)
implementation(libs.sqlite.jdbc)
}
}

iosMain.dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import co.touchlab.kermit.Logger
import co.touchlab.kermit.Severity
import co.touchlab.kermit.TestConfig
import co.touchlab.kermit.TestLogWriter
import com.powersync.db.PowerSyncDatabaseImpl
import com.powersync.db.ActiveDatabaseGroup
import com.powersync.db.schema.Schema
import com.powersync.testutils.UserRow
import com.powersync.testutils.waitFor
Expand Down Expand Up @@ -122,7 +122,7 @@ class DatabaseTest {
waitFor {
assertNotNull(
logWriter.logs.find {
it.message == PowerSyncDatabaseImpl.multipleInstancesMessage
it.message == ActiveDatabaseGroup.multipleInstancesMessage
},
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class SyncIntegrationTest {
fun testPartialSync() =
runTest {
val syncStream = syncStream()
database.connect(syncStream, 1000L)
database.connectInternal(syncStream, 1000L)

val checksums =
buildList {
Expand Down Expand Up @@ -214,7 +214,7 @@ class SyncIntegrationTest {
fun testRemembersLastPartialSync() =
runTest {
val syncStream = syncStream()
database.connect(syncStream, 1000L)
database.connectInternal(syncStream, 1000L)

syncLines.send(
SyncLine.FullCheckpoint(
Expand Down Expand Up @@ -253,7 +253,7 @@ class SyncIntegrationTest {
fun setsDownloadingState() =
runTest {
val syncStream = syncStream()
database.connect(syncStream, 1000L)
database.connectInternal(syncStream, 1000L)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down Expand Up @@ -291,7 +291,7 @@ class SyncIntegrationTest {
val syncStream = syncStream()
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(syncStream, 1000L)
database.connectInternal(syncStream, 1000L)
turbine.waitFor { it.connecting }

database.disconnect()
Expand All @@ -308,7 +308,7 @@ class SyncIntegrationTest {
fun testMultipleSyncsDoNotCreateMultipleStatusEntries() =
runTest {
val syncStream = syncStream()
database.connect(syncStream, 1000L)
database.connectInternal(syncStream, 1000L)

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.powersync.db

internal actual fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any {
// We can't do this on Java 8 :(
return object {}
}

// This would require Java 9+

/*
import java.lang.ref.Cleaner

internal actual fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any {
// Note: It's important that the returned object does not reference the resource directly
val wrapper = CleanableWrapper()
CleanableWrapper.cleaner.register(wrapper, resource::dispose)
return wrapper
}

private class CleanableWrapper {
var cleanable: Cleaner.Cleanable? = null

companion object {
val cleaner: Cleaner = Cleaner.create()
}
}
*/
105 changes: 88 additions & 17 deletions core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt
Original file line number Diff line number Diff line change
@@ -1,23 +1,94 @@
package com.powersync.db

import com.powersync.PowerSyncDatabase
import com.powersync.utils.ExclusiveMethodProvider

internal class ActiveInstanceStore : ExclusiveMethodProvider() {
private val instances = mutableListOf<PowerSyncDatabase>()

/**
* Registers an instance. Returns true if multiple instances with the same identifier are
* present.
*/
suspend fun registerAndCheckInstance(db: PowerSyncDatabase) =
exclusiveMethod("instances") {
instances.add(db)
return@exclusiveMethod instances.filter { it.identifier == db.identifier }.size > 1
import co.touchlab.kermit.Logger
import co.touchlab.stately.concurrency.AtomicBoolean
import co.touchlab.stately.concurrency.Synchronizable
import co.touchlab.stately.concurrency.synchronize
import kotlinx.coroutines.sync.Mutex

/**
* Returns an object that, when deallocated, calls [ActiveDatabaseResource.dispose].
*/
internal expect fun disposeWhenDeallocated(resource: ActiveDatabaseResource): Any

/**
* An collection of PowerSync databases with the same path / identifier.
*
* We expect that each group will only ever have one database because we encourage users to write their databases as
* singletons. We print a warning when two databases are part of the same group.
* Additionally, we want to avoid two databases in the same group having a sync stream open at the same time to avoid
* duplicate resources being used. For this reason, each active database group has a coroutine mutex guarding the
* sync job.
*/
internal class ActiveDatabaseGroup(
val identifier: String,
private val collection: GroupsCollection,
) {
internal var refCount = 0 // Guarded by companion object
internal val syncMutex = Mutex()

fun removeUsage() {
collection.synchronize {
if (--refCount == 0) {
collection.allGroups.remove(this)
}
}
}

internal open class GroupsCollection : Synchronizable() {
internal val allGroups = mutableListOf<ActiveDatabaseGroup>()

private fun findGroup(
warnOnDuplicate: Logger,
identifier: String,
): ActiveDatabaseGroup =
synchronize {
val existing = allGroups.asSequence().firstOrNull { it.identifier == identifier }
val resolvedGroup =
if (existing == null) {
val added = ActiveDatabaseGroup(identifier, this)
allGroups.add(added)
added
} else {
existing
}

if (resolvedGroup.refCount++ != 0) {
warnOnDuplicate.w { multipleInstancesMessage }
}

resolvedGroup
}

internal fun referenceDatabase(
warnOnDuplicate: Logger,
identifier: String,
): Pair<ActiveDatabaseResource, Any> {
val group = findGroup(warnOnDuplicate, identifier)
val resource = ActiveDatabaseResource(group)

return resource to disposeWhenDeallocated(resource)
}
}

companion object : GroupsCollection() {
internal val multipleInstancesMessage =
"""
Multiple PowerSync instances for the same database have been detected.
This can cause unexpected results.
Please check your PowerSync client instantiation logic if this is not intentional.
""".trimIndent()
}
}

internal class ActiveDatabaseResource(
val group: ActiveDatabaseGroup,
) {
val disposed = AtomicBoolean(false)

suspend fun removeInstance(db: PowerSyncDatabase) =
exclusiveMethod("instances") {
instances.remove(db)
fun dispose() {
if (disposed.compareAndSet(false, true)) {
group.removeUsage()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.powersync.sync.PriorityStatusEntry
import com.powersync.sync.SyncStatus
import com.powersync.sync.SyncStatusData
import com.powersync.sync.SyncStream
import com.powersync.utils.ExclusiveMethodProvider
import com.powersync.utils.JsonParam
import com.powersync.utils.JsonUtil
import com.powersync.utils.throttle
Expand All @@ -35,6 +34,8 @@ import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlinx.datetime.LocalDateTime
import kotlinx.datetime.TimeZone
Expand All @@ -57,8 +58,7 @@ internal class PowerSyncDatabaseImpl(
private val dbFilename: String,
val logger: Logger = Logger,
driver: PsSqlDriver = factory.createDriver(scope, dbFilename),
) : ExclusiveMethodProvider(),
PowerSyncDatabase {
) : PowerSyncDatabase {
companion object {
internal val streamConflictMessage =
"""
Expand All @@ -68,41 +68,33 @@ internal class PowerSyncDatabaseImpl(
This connection attempt will be queued and will only be executed after
currently connecting clients are disconnected.
""".trimIndent()

internal val multipleInstancesMessage =
"""
Multiple PowerSync instances for the same database have been detected.
This can cause unexpected results.
Please check your PowerSync client instantiation logic if this is not intentional.
""".trimIndent()

internal val instanceStore = ActiveInstanceStore()
}

override val identifier = dbFilename

private val internalDb = InternalDatabaseImpl(driver, scope)
internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger)
private val resource: ActiveDatabaseResource
private val clearResourceWhenDisposed: Any

var closed = false

/**
* The current sync status.
*/
override val currentStatus: SyncStatus = SyncStatus()

private val mutex = Mutex()
private var syncStream: SyncStream? = null

private var syncJob: Job? = null

private var uploadJob: Job? = null

init {
val db = this
val res = ActiveDatabaseGroup.referenceDatabase(logger, identifier)
resource = res.first
clearResourceWhenDisposed = res.second

runBlocking {
val isMultiple = instanceStore.registerAndCheckInstance(db)
if (isMultiple) {
logger.w { multipleInstancesMessage }
}
val sqliteVersion = internalDb.queries.sqliteVersion().executeAsOne()
logger.d { "SQLiteVersion: $sqliteVersion" }
checkVersion()
Expand All @@ -126,10 +118,10 @@ internal class PowerSyncDatabaseImpl(
crudThrottleMs: Long,
retryDelayMs: Long,
params: Map<String, JsonParam?>,
) = exclusiveMethod("connect") {
disconnect()
) = mutex.withLock {
disconnectInternal()

connect(
connectInternal(
SyncStream(
bucketStorage = bucketStorage,
connector = connector,
Expand All @@ -143,7 +135,7 @@ internal class PowerSyncDatabaseImpl(
}

@OptIn(FlowPreview::class)
internal fun connect(
internal fun connectInternal(
stream: SyncStream,
crudThrottleMs: Long,
) {
Expand All @@ -154,8 +146,7 @@ internal class PowerSyncDatabaseImpl(
syncJob =
scope.launch {
// Get a global lock for checking mutex maps
val streamMutex =
globalMutexFor("streaming-$identifier")
val streamMutex = resource.group.syncMutex

// Poke the streaming mutex to see if another client is using it
var obtainedLock = false
Expand Down Expand Up @@ -337,7 +328,9 @@ internal class PowerSyncDatabaseImpl(
}
}

override suspend fun disconnect() {
override suspend fun disconnect() = mutex.withLock { disconnectInternal() }

private suspend fun disconnectInternal() {
if (syncJob != null && syncJob!!.isActive) {
syncJob?.cancelAndJoin()
}
Expand Down Expand Up @@ -431,13 +424,13 @@ internal class PowerSyncDatabaseImpl(
}

override suspend fun close() =
exclusiveMethod("close") {
mutex.withLock {
if (closed) {
return@exclusiveMethod
return@withLock
}
disconnect()
disconnectInternal()
internalDb.close()
instanceStore.removeInstance(this)
resource.dispose()
closed = true
}

Expand Down
Loading
Loading