Skip to content

Commit 011b1da

Browse files
committed
Actually, use callbacks
1 parent c7adbab commit 011b1da

File tree

9 files changed

+83
-89
lines changed

9 files changed

+83
-89
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import io.kotest.matchers.string.shouldContain
2222
import kotlinx.coroutines.CompletableDeferred
2323
import kotlinx.coroutines.Dispatchers
2424
import kotlinx.coroutines.async
25+
import kotlinx.coroutines.awaitCancellation
26+
import kotlinx.coroutines.cancelAndJoin
2527
import kotlinx.coroutines.delay
2628
import kotlinx.coroutines.flow.takeWhile
2729
import kotlinx.coroutines.launch
@@ -503,37 +505,45 @@ class DatabaseTest {
503505

504506
@Test
505507
@OptIn(ExperimentalPowerSyncAPI::class)
506-
fun testLeaseReadOnly() =
508+
fun testUseRawReadOnly() =
507509
databaseTest {
508510
database.execute(
509511
"INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)",
510512
listOf("a", "[email protected]"),
511513
)
512514

513-
514-
val raw = database.leaseConnection(readOnly = true)
515-
raw.usePrepared("SELECT * FROM users") { stmt ->
516-
stmt.step() shouldBe true
517-
stmt.getText(1) shouldBe "a"
518-
stmt.getText(2) shouldBe "[email protected]"
515+
database.useConnection(true) {
516+
it.usePrepared("SELECT * FROM users") { stmt ->
517+
stmt.step() shouldBe true
518+
stmt.getText(1) shouldBe "a"
519+
stmt.getText(2) shouldBe "a@example.org"
520+
}
519521
}
520-
raw.close()
521522
}
522523

523524
@Test
524525
@OptIn(ExperimentalPowerSyncAPI::class)
525-
fun testLeaseWrite() =
526+
fun testUseRawWrite() =
526527
databaseTest {
527-
val raw = database.leaseConnection(readOnly = false)
528-
raw.usePrepared("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)") { stmt ->
529-
stmt.bindText(1, "name")
530-
stmt.bindText(2, "email")
531-
stmt.step() shouldBe false
532-
533-
stmt.reset()
534-
stmt.step() shouldBe false
528+
val didWrite = CompletableDeferred<Unit>()
529+
530+
val job = scope.launch {
531+
database.useConnection(readOnly = false) { raw ->
532+
raw.usePrepared("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)") { stmt ->
533+
stmt.bindText(1, "name")
534+
stmt.bindText(2, "email")
535+
stmt.step() shouldBe false
536+
537+
stmt.reset()
538+
stmt.step() shouldBe false
539+
}
540+
541+
didWrite.complete(Unit)
542+
awaitCancellation()
543+
}
535544
}
536545

546+
didWrite.await()
537547
database.getAll("SELECT * FROM users") { it.getString("name") } shouldHaveSize 2
538548

539549
// Verify that the statement indeed holds a lock on the database.
@@ -548,7 +558,7 @@ class DatabaseTest {
548558

549559
delay(100.milliseconds)
550560
hadOtherWrite.isCompleted shouldBe false
551-
raw.close()
561+
job.cancelAndJoin()
552562
hadOtherWrite.await()
553563
}
554564
}

core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,5 +233,9 @@ public interface PowerSyncDatabase : Queries {
233233
group,
234234
)
235235
}
236+
237+
public fun databaseGroup(logger: Logger, identifier: String): Pair<ActiveDatabaseResource, Any> {
238+
return ActiveDatabaseGroup.referenceDatabase(logger, identifier)
239+
}
236240
}
237241
}

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,11 +323,15 @@ internal class PowerSyncDatabaseImpl(
323323
}
324324

325325
@ExperimentalPowerSyncAPI
326-
override suspend fun leaseConnection(readOnly: Boolean): SQLiteConnectionLease {
326+
override suspend fun <T> useConnection(
327+
readOnly: Boolean,
328+
block: suspend (SQLiteConnectionLease) -> T
329+
): T {
327330
waitReady()
328-
return internalDb.leaseConnection(readOnly)
331+
return internalDb.useConnection(readOnly, block)
329332
}
330333

334+
331335
override suspend fun <RowType : Any> get(
332336
sql: String,
333337
parameters: List<Any?>?,

core/src/commonMain/kotlin/com/powersync/db/Queries.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,15 +192,11 @@ public interface Queries {
192192
*
193193
* This is useful when you need full control over the raw statements to use.
194194
*
195-
* The connection needs to be released by calling [SQLiteConnectionLease.close] as soon as
196-
* you're done with it, because the connection will occupy a read resource or the write lock
197-
* while active.
198-
*
199195
* Misusing this API, for instance by not cleaning up transactions started on the underlying
200196
* connection with a `BEGIN` statement or forgetting to close it, can disrupt the rest of the
201197
* PowerSync SDK. For this reason, this method should only be used if absolutely necessary.
202198
*/
203199
@ExperimentalPowerSyncAPI()
204200
@HiddenFromObjC()
205-
public suspend fun leaseConnection(readOnly: Boolean = false): SQLiteConnectionLease
201+
public suspend fun <T> useConnection(readOnly: Boolean = false, block: suspend (SQLiteConnectionLease) -> T): T
206202
}

core/src/commonMain/kotlin/com/powersync/db/driver/InternalConnectionPool.kt

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
1111
import kotlinx.coroutines.flow.SharedFlow
1212
import kotlinx.coroutines.launch
1313
import kotlinx.coroutines.sync.Mutex
14+
import kotlinx.coroutines.sync.withLock
1415

1516
@OptIn(ExperimentalPowerSyncAPI::class)
1617
internal class InternalConnectionPool(
@@ -65,40 +66,37 @@ internal class InternalConnectionPool(
6566
return connection
6667
}
6768

68-
override suspend fun read(): SQLiteConnectionLease {
69-
return readPool.obtainConnection()
69+
override suspend fun <T> read(callback: suspend (SQLiteConnectionLease) -> T): T {
70+
return readPool.read(callback)
7071
}
7172

72-
override suspend fun write(): SQLiteConnectionLease {
73-
writeLockMutex.lock()
74-
return RawConnectionLease(writeConnection) {
75-
// When we've leased a write connection, we may have to update table update flows
76-
// after users ran their custom statements.
77-
writeConnection.prepare("SELECT powersync_update_hooks('get')").use {
78-
check(it.step())
79-
val updatedTables = JsonUtil.json.decodeFromString<Set<String>>(it.getText(0))
80-
if (updatedTables.isNotEmpty()) {
81-
scope.launch {
82-
tableUpdatesFlow.emit(updatedTables)
73+
override suspend fun <T> write(callback: suspend (SQLiteConnectionLease) -> T): T {
74+
return writeLockMutex.withLock {
75+
try {
76+
callback(RawConnectionLease(writeConnection))
77+
} finally {
78+
// When we've leased a write connection, we may have to update table update flows
79+
// after users ran their custom statements.
80+
writeConnection.prepare("SELECT powersync_update_hooks('get')").use {
81+
check(it.step())
82+
val updatedTables = JsonUtil.json.decodeFromString<Set<String>>(it.getText(0))
83+
if (updatedTables.isNotEmpty()) {
84+
scope.launch {
85+
tableUpdatesFlow.emit(updatedTables)
86+
}
8387
}
8488
}
8589
}
86-
87-
writeLockMutex.unlock()
8890
}
8991
}
9092

9193
override suspend fun <R> withAllConnections(action: suspend (SQLiteConnectionLease, List<SQLiteConnectionLease>) -> R) {
9294
// First get a lock on all read connections
9395
readPool.withAllConnections { rawReadConnections ->
94-
val readers = rawReadConnections.map { RawConnectionLease(it) {} }
96+
val readers = rawReadConnections.map { RawConnectionLease(it) }
9597
// Then get access to the write connection
96-
val writer = write()
97-
98-
try {
98+
write { writer ->
9999
action(writer, readers)
100-
} finally {
101-
writer.close()
102100
}
103101
}
104102
}

core/src/commonMain/kotlin/com/powersync/db/driver/RawConnectionLease.kt

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import com.powersync.ExperimentalPowerSyncAPI
1111
@OptIn(ExperimentalPowerSyncAPI::class)
1212
internal class RawConnectionLease(
1313
private val connection: SQLiteConnection,
14-
private val returnConnection: () -> Unit,
1514
) : SQLiteConnectionLease {
1615
private var isCompleted = false
1716

@@ -39,12 +38,4 @@ internal class RawConnectionLease(
3938
checkNotCompleted()
4039
return connection.prepare(sql).use(block)
4140
}
42-
43-
override suspend fun close() {
44-
// Note: This is a lease, don't close the underlying connection.
45-
if (!isCompleted) {
46-
isCompleted = true
47-
returnConnection()
48-
}
49-
}
5041
}

core/src/commonMain/kotlin/com/powersync/db/driver/ReadPool.kt

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,21 @@ internal class ReadPool(
4343
}
4444
}
4545

46-
suspend fun obtainConnection(): RawConnectionLease {
47-
val (connection, done) =
48-
try {
49-
available.receive()
50-
} catch (e: PoolClosedException) {
51-
throw PowerSyncException(
52-
message = "Cannot process connection pool request",
53-
cause = e,
54-
)
55-
}
46+
suspend fun <T> read(block: suspend (SQLiteConnectionLease) -> T): T {
47+
val (connection, done) = try {
48+
available.receive()
49+
} catch (e: PoolClosedException) {
50+
throw PowerSyncException(
51+
message = "Cannot process connection pool request",
52+
cause = e,
53+
)
54+
}
5655

57-
return RawConnectionLease(connection) { done.complete(Unit) }
56+
try {
57+
return block(RawConnectionLease(connection))
58+
} finally {
59+
done.complete(Unit)
60+
}
5861
}
5962

6063
suspend fun <R> withAllConnections(action: suspend (connections: List<SQLiteConnection>) -> R): R {

core/src/commonMain/kotlin/com/powersync/db/driver/SQLiteConnectionPool.kt

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@ import kotlinx.coroutines.runBlocking
77

88
@ExperimentalPowerSyncAPI()
99
public interface SQLiteConnectionPool {
10-
public suspend fun read(): SQLiteConnectionLease
10+
public suspend fun <T> read(callback: suspend (SQLiteConnectionLease) -> T): T
11+
public suspend fun <T> write(callback: suspend (SQLiteConnectionLease) -> T): T
1112

1213
public suspend fun <R> withAllConnections(action: suspend (
1314
writer: SQLiteConnectionLease,
1415
readers: List<SQLiteConnectionLease>
1516
) -> R)
1617

17-
public suspend fun write(): SQLiteConnectionLease
18-
1918
public val updates: SharedFlow<Set<String>>
2019

2120
public suspend fun close()
@@ -50,9 +49,4 @@ public interface SQLiteConnectionLease {
5049
it.step()
5150
}
5251
}
53-
54-
/**
55-
* Returns the leased connection to the pool.
56-
*/
57-
public suspend fun close()
5852
}

core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,14 @@ internal class InternalDatabaseImpl(
152152
}
153153
}
154154

155-
156-
override suspend fun leaseConnection(readOnly: Boolean): SQLiteConnectionLease {
155+
override suspend fun <T> useConnection(
156+
readOnly: Boolean,
157+
block: suspend (SQLiteConnectionLease) -> T
158+
): T {
157159
return if (readOnly) {
158-
pool.read()
160+
pool.read(block)
159161
} else {
160-
pool.write()
162+
pool.write(block)
161163
}
162164
}
163165

@@ -168,14 +170,10 @@ internal class InternalDatabaseImpl(
168170
private suspend fun <R> internalReadLock(callback: suspend (SQLiteConnectionLease) -> R): R =
169171
withContext(dbContext) {
170172
runWrapped {
171-
val connection = leaseConnection(readOnly = true)
172-
try {
173+
useConnection(true) { connection ->
173174
catchSwiftExceptions {
174175
callback(connection)
175176
}
176-
} finally {
177-
// Closing the lease will release the connection back into the pool.
178-
connection.close()
179177
}
180178
}
181179
}
@@ -197,16 +195,12 @@ internal class InternalDatabaseImpl(
197195
@OptIn(ExperimentalPowerSyncAPI::class)
198196
private suspend fun <R> internalWriteLock(callback: suspend (SQLiteConnectionLease) -> R): R =
199197
withContext(dbContext) {
200-
val lease = pool.write()
201-
try {
198+
pool.write { writer ->
202199
runWrapped {
203200
catchSwiftExceptions {
204-
callback(lease)
201+
callback(writer)
205202
}
206203
}
207-
} finally {
208-
// Returning the lease will unlock the writeLockMutex
209-
lease.close()
210204
}
211205
}
212206

0 commit comments

Comments
 (0)