Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.0.0-BETA29

* Added queing protection and warnings when connecting multiple PowerSync clients to the same database file.

## 1.0.0-BETA28

* Update PowerSync SQLite core extension to 0.3.12.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.powersync

import app.cash.turbine.turbineScope
import co.touchlab.kermit.ExperimentalKermitApi
import co.touchlab.kermit.LogWriter
import co.touchlab.kermit.Logger
import co.touchlab.kermit.Severity
import co.touchlab.kermit.TestConfig
Expand All @@ -18,13 +20,17 @@ import com.powersync.sync.SyncStream
import com.powersync.testutils.MockSyncService
import com.powersync.testutils.UserRow
import com.powersync.testutils.cleanup
import com.powersync.testutils.factory
import com.powersync.testutils.waitFor
import com.powersync.testutils.waitForString
import com.powersync.utils.JsonUtil
import dev.mokkery.answering.returns
import dev.mokkery.everySuspend
import dev.mokkery.mock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
Expand All @@ -38,7 +44,7 @@ import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds

@OptIn(co.touchlab.kermit.ExperimentalKermitApi::class)
@OptIn(ExperimentalKermitApi::class)
class SyncIntegrationTest {
private val logger =
Logger(
Expand Down Expand Up @@ -75,12 +81,15 @@ class SyncIntegrationTest {

@AfterTest
fun teardown() {
runBlocking {
database.close()
}
cleanup("testdb")
}

private fun openDb() =
PowerSyncDatabase(
factory = com.powersync.testutils.factory,
factory = factory,
schema = Schema(UserRow.table),
dbFilename = "testdb",
) as PowerSyncDatabaseImpl
Expand Down Expand Up @@ -271,6 +280,26 @@ class SyncIntegrationTest {
syncLines.close()
}

@Test
fun setsConnectingState() =
runTest {
turbineScope(timeout = 10.0.seconds) {
val syncStream = syncStream()
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(syncStream, 1000L)
turbine.waitFor { it.connecting }

database.disconnect()

turbine.waitFor { !it.connecting && !it.connected }
turbine.cancel()
}

database.close()
syncLines.close()
}

@Test
fun testMultipleSyncsDoNotCreateMultipleStatusEntries() =
runTest {
Expand Down Expand Up @@ -312,6 +341,139 @@ class SyncIntegrationTest {
turbine.cancel()
}

database.close()
syncLines.close()
}

@Test
fun warnsMultipleConnectionAttempts() =
runTest {
val logMessages = MutableSharedFlow<String>(extraBufferCapacity = 10)

Logger.setLogWriters(
object : LogWriter() {
override fun log(
severity: Severity,
message: String,
tag: String,
throwable: Throwable?,
) {
logMessages.tryEmit(message)
}
},
)

Logger.setMinSeverity(Severity.Verbose)

val db2 =
PowerSyncDatabase(
factory = factory,
schema = Schema(UserRow.table),
dbFilename = "testdb",
logger = Logger,
) as PowerSyncDatabaseImpl

turbineScope(timeout = 10.0.seconds) {
val logTurbine = logMessages.asSharedFlow().testIn(this)

// Connect the first database
database.connect(connector, 1000L)
db2.connect(connector)

logTurbine.waitForString { it == PowerSyncDatabaseImpl.streamConflictMessage }

db2.disconnect()

database.disconnect()

logTurbine.cancel()
}

db2.close()
database.close()
syncLines.close()
}

@Test
fun queuesMultipleConnectionAttempts() =
runTest {
val db2 =
PowerSyncDatabase(
factory = factory,
schema = Schema(UserRow.table),
dbFilename = "testdb",
logger = Logger,
) as PowerSyncDatabaseImpl

turbineScope(timeout = 10.0.seconds) {
val turbine1 = database.currentStatus.asFlow().testIn(this)
val turbine2 = db2.currentStatus.asFlow().testIn(this)

// Connect the first database
database.connect(connector, 1000L)

turbine1.waitFor { it.connecting }
db2.connect(connector)

// Should not be connecting yet
assertEquals(false, db2.currentStatus.connecting)

database.disconnect()
turbine1.waitFor { !it.connecting }

// Should start connecting after the other database disconnected
turbine2.waitFor { it.connecting }
db2.disconnect()
turbine2.waitFor { !it.connecting }

turbine1.cancel()
turbine2.cancel()
}

db2.close()
database.close()
syncLines.close()
}

@Test
fun reconnectsAfterDisconnecting() =
runTest {
turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(connector, 1000L)
turbine.waitFor { it.connecting }

database.disconnect()
turbine.waitFor { !it.connecting }

database.connect(connector, 1000L)
turbine.waitFor { it.connecting }
database.disconnect()
turbine.waitFor { !it.connecting }

turbine.cancel()
}

database.close()
syncLines.close()
}

@Test
fun reconnects() =
runTest {
turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(connector, 1000L, retryDelayMs = 5000)
turbine.waitFor { it.connecting }

database.connect(connector, 1000L, retryDelayMs = 5000)
turbine.waitFor { it.connecting }

turbine.cancel()
}

database.close()
syncLines.close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.powersync.db

import co.touchlab.kermit.Logger
import co.touchlab.stately.concurrency.AtomicBoolean
import com.powersync.DatabaseDriverFactory
import com.powersync.PowerSyncDatabase
import com.powersync.PsSqlDriver
Expand Down Expand Up @@ -33,6 +34,8 @@ import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Instant
import kotlinx.datetime.LocalDateTime
import kotlinx.datetime.TimeZone
Expand All @@ -56,8 +59,34 @@ internal class PowerSyncDatabaseImpl(
val logger: Logger = Logger,
driver: PsSqlDriver = factory.createDriver(scope, dbFilename),
) : PowerSyncDatabase {
companion object {
/**
* We store a map of connection streaming mutexes, but we need to access that safely.
*/
internal val streamingMapMutex = Mutex()

/**
* Manages streaming operations for the same database between clients
*/
internal val streamMutexes = mutableMapOf<String, Mutex>()

internal val streamConflictMessage =
"""
Another PowerSync client is already connected to this database.
Multiple connections to the same database should be avoided.
Please check your PowerSync client instantiation logic.
This connection attempt will be queued and will only be executed after
currently connecting clients are disconnected.
""".trimIndent()
}

/**
* Manages multiple connection attempts within the same client
*/
private val connectingMutex = Mutex()
private val internalDb = InternalDatabaseImpl(driver, scope)
internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger)
val closed = AtomicBoolean(false)

/**
* The current sync status.
Expand Down Expand Up @@ -95,8 +124,7 @@ internal class PowerSyncDatabaseImpl(
crudThrottleMs: Long,
retryDelayMs: Long,
params: Map<String, JsonParam?>,
) {
// close connection if one is open
) = connectingMutex.withLock {
disconnect()

connect(
Expand All @@ -118,9 +146,47 @@ internal class PowerSyncDatabaseImpl(
crudThrottleMs: Long,
) {
this.syncStream = stream

val db = this

syncJob =
scope.launch {
syncStream!!.streamingSync()
// Get a global lock for checking mutex maps
val streamMutex =
streamingMapMutex.withLock {
val streamMutex = streamMutexes.getOrPut(dbFilename) { Mutex() }

// Poke the streaming mutex to see if another client is using it
// We check the mutex status in the global lock to prevent race conditions
// between multiple clients.
var obtainedLock = false
try {
// This call will throw if the lock is already held by this db client.
// We should never reach that point since we disconnect before connecting.
obtainedLock = streamMutex.tryLock(db)
if (!obtainedLock) {
// The mutex is held already by another PowerSync instance (owner).
// (The tryLock should throw if this client already holds the lock).
logger.w(streamConflictMessage)
}
} catch (ex: IllegalStateException) {
logger.e { "The streaming sync client did not disconnect before connecting" }
} finally {
if (obtainedLock) {
streamMutex.unlock(db)
}
}

// It should be safe to use this mutex instance externally
return@withLock streamMutex
}

// Start the Mutex request in this job.
// Disconnecting will cancel the job and request.
// This holds the lock while streaming is in progress.
streamMutex.withLock(db) {
syncStream!!.streamingSync()
}
}

scope.launch {
Expand Down Expand Up @@ -316,7 +382,10 @@ internal class PowerSyncDatabaseImpl(

SyncedAt(
priority = BucketPriority(it.getLong(0)!!.toInt()),
syncedAt = LocalDateTime.parse(rawTime.replace(" ", "T")).toInstant(TimeZone.UTC),
syncedAt =
LocalDateTime
.parse(rawTime.replace(" ", "T"))
.toInstant(TimeZone.UTC),
)
}

Expand Down Expand Up @@ -364,6 +433,10 @@ internal class PowerSyncDatabaseImpl(
}

override suspend fun close() {
if (closed.value) {
return
}
closed.value = true
disconnect()
internalDb.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,13 @@ suspend inline fun ReceiveTurbine<SyncStatusData>.waitFor(matcher: (SyncStatusDa
}
}
}

// JVM build fails if this is also called waitFor
suspend inline fun ReceiveTurbine<String>.waitForString(matcher: (String) -> Boolean) {
while (true) {
val item = awaitItem()
if (matcher(item)) {
break
}
}
}
Loading
Loading