Skip to content

Commit 824e559

Browse files
update global locks
1 parent 394507b commit 824e559

File tree

3 files changed

+28
-46
lines changed

3 files changed

+28
-46
lines changed

core/src/commonMain/kotlin/com/powersync/db/ActiveInstanceStore.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ internal class ActiveDatabaseGroup(
2626
) {
2727
internal var refCount = 0 // Guarded by companion object
2828
internal val syncMutex = Mutex()
29+
internal val writeLockMutex = Mutex()
2930

3031
fun removeUsage() {
3132
collection.synchronize {

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ internal class PowerSyncDatabaseImpl(
5656
private val dbFilename: String,
5757
private val dbDirectory: String? = null,
5858
val logger: Logger = Logger,
59-
driver: PsSqlDriver = factory.createDriver(scope, dbFilename),
6059
) : PowerSyncDatabase {
6160
companion object {
6261
internal val streamConflictMessage =
@@ -104,9 +103,11 @@ internal class PowerSyncDatabaseImpl(
104103
clearResourceWhenDisposed = res.second
105104

106105
runBlocking {
107-
val sqliteVersion = internalDb.queries.sqliteVersion().executeAsOne()
106+
val sqliteVersion = internalDb.get("SELECT sqlite_version()") { it.getString(0)!! }
108107
logger.d { "SQLiteVersion: $sqliteVersion" }
109-
checkVersion()
108+
powerSyncVersion =
109+
internalDb.get("SELECT powersync_rs_version()") { it.getString(0)!! }
110+
checkVersion(powerSyncVersion!!)
110111
logger.d { "PowerSyncVersion: ${getPowerSyncVersion()}" }
111112

112113
internalDb.writeTransaction { tx ->
@@ -322,14 +323,23 @@ internal class PowerSyncDatabaseImpl(
322323

323324
override suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R = internalDb.writeTransaction(callback)
324325

325-
override suspend fun <R> writeLock(callback: ThrowableLockCallback<R>): R = internalDb.writeLock(callback)
326+
override suspend fun <R> writeLock(callback: ThrowableLockCallback<R>): R =
327+
resource.group.writeLockMutex.withLock {
328+
internalDb.writeLock(callback)
329+
}
326330

327-
override suspend fun <R> writeTransaction(callback: ThrowableTransactionCallback<R>): R = internalDb.writeTransaction(callback)
331+
override suspend fun <R> writeTransaction(callback: ThrowableTransactionCallback<R>): R =
332+
resource.group.writeLockMutex.withLock {
333+
internalDb.writeTransaction(callback)
334+
}
328335

329336
override suspend fun execute(
330337
sql: String,
331338
parameters: List<Any?>?,
332-
): Long = internalDb.execute(sql, parameters)
339+
): Long =
340+
resource.group.writeLockMutex.withLock {
341+
internalDb.execute(sql, parameters)
342+
}
333343

334344
private suspend fun handleWriteCheckpoint(
335345
lastTransactionId: Int,
@@ -464,28 +474,21 @@ internal class PowerSyncDatabaseImpl(
464474
/**
465475
* Check that a supported version of the powersync extension is loaded.
466476
*/
467-
private suspend fun checkVersion() {
468-
val version: String =
469-
try {
470-
getPowerSyncVersion()
471-
} catch (e: Exception) {
472-
throw Exception("The powersync extension is not loaded correctly. Details: $e")
473-
}
474-
477+
private suspend fun checkVersion(powerSyncVersion: String) {
475478
// Parse version
476479
val versionInts: List<Int> =
477480
try {
478-
version
481+
powerSyncVersion
479482
.split(Regex("[./]"))
480483
.take(3)
481484
.map { it.toInt() }
482485
} catch (e: Exception) {
483-
throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $version. Details: $e")
486+
throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $powerSyncVersion. Details: $e")
484487
}
485488

486489
// Validate ^0.2.0
487490
if (versionInts[0] != 0 || versionInts[1] < 2 || versionInts[2] < 0) {
488-
throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $version")
491+
throw Exception("Unsupported powersync extension version. Need ^0.2.0, got: $powerSyncVersion")
489492
}
490493
}
491494
}

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import kotlinx.coroutines.flow.channelFlow
2020
import kotlinx.coroutines.flow.filter
2121
import kotlinx.coroutines.flow.onSubscription
2222
import kotlinx.coroutines.runBlocking
23-
import kotlinx.coroutines.sync.Mutex
24-
import kotlinx.coroutines.sync.withLock
2523
import kotlinx.coroutines.withContext
2624
import kotlinx.serialization.encodeToString
2725

@@ -57,21 +55,6 @@ internal class InternalDatabaseImpl(
5755

5856
companion object {
5957
const val DEFAULT_WATCH_THROTTLE_MS = 30L
60-
61-
// A meta mutex for protecting mutex map operations
62-
private val globalLock = Mutex()
63-
64-
// Static mutex max which globally shares write locks
65-
private val mutexMap = mutableMapOf<String, Mutex>()
66-
67-
// Run an action inside a global shared mutex
68-
private suspend fun <T> withSharedMutex(
69-
key: String,
70-
action: suspend () -> T,
71-
): T {
72-
val mutex = globalLock.withLock { mutexMap.getOrPut(key) { Mutex() } }
73-
return mutex.withLock { action() }
74-
}
7558
}
7659

7760
override suspend fun execute(
@@ -167,21 +150,16 @@ internal class InternalDatabaseImpl(
167150
}
168151
}
169152

170-
/**
171-
* Creates a read lock while providing an internal transactor for transactions
172-
*/
173153
private suspend fun <R> internalWriteLock(callback: (TransactorDriver) -> R): R =
174154
withContext(dbContext) {
175-
withSharedMutex(dbIdentifier) {
176-
runWrapped {
177-
catchSwiftExceptions {
178-
callback(writeConnection)
179-
}
180-
}.also {
181-
// Trigger watched queries
182-
// Fire updates inside the write lock
183-
writeConnection.driver.fireTableUpdates()
155+
runWrapped {
156+
catchSwiftExceptions {
157+
callback(writeConnection)
184158
}
159+
}.also {
160+
// Trigger watched queries
161+
// Fire updates inside the write lock
162+
writeConnection.driver.fireTableUpdates()
185163
}
186164
}
187165

0 commit comments

Comments
 (0)