Skip to content

Commit 15729e2

Browse files
update connection pool to use channel implementation
1 parent 50ed399 commit 15729e2

File tree

3 files changed

+55
-134
lines changed

3 files changed

+55
-134
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import com.powersync.db.PowerSyncDatabaseImpl
1010
import com.powersync.db.schema.Schema
1111
import com.powersync.testutils.UserRow
1212
import com.powersync.testutils.waitFor
13+
import kotlinx.coroutines.CompletableDeferred
14+
import kotlinx.coroutines.async
1315
import kotlinx.coroutines.runBlocking
1416
import kotlinx.coroutines.test.runTest
1517
import kotlin.test.AfterTest

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ internal class PowerSyncDatabaseImpl(
5555
private val dbFilename: String,
5656
private val dbDirectory: String? = null,
5757
val logger: Logger = Logger,
58-
driver: PsSqlDriver = factory.createDriver(scope, dbFilename),
5958
) : ExclusiveMethodProvider(),
6059
PowerSyncDatabase {
6160
companion object {
@@ -81,12 +80,12 @@ internal class PowerSyncDatabaseImpl(
8180
override val identifier = dbDirectory + dbFilename
8281

8382
private val internalDb =
84-
InternalDatabaseImpl(
85-
factory = factory,
86-
scope = scope,
87-
dbFilename = dbFilename,
88-
dbDirectory = dbDirectory,
89-
)
83+
InternalDatabaseImpl(
84+
factory = factory,
85+
scope = scope,
86+
dbFilename = dbFilename,
87+
dbDirectory = dbDirectory,
88+
)
9089

9190
internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger)
9291
var closed = false
@@ -112,13 +111,18 @@ internal class PowerSyncDatabaseImpl(
112111
if (isMultiple) {
113112
logger.w { multipleInstancesMessage }
114113
}
115-
114+
116115
powerSyncVersion =
117116
internalDb.get("SELECT powersync_rs_version()") { it.getString(0)!! }
118117
logger.d { "SQLiteVersion: $powerSyncVersion" }
119-
118+
120119
checkVersion()
121120
logger.d { "PowerSyncVersion: ${getPowerSyncVersion()}" }
121+
122+
internalDb.writeTransaction { tx ->
123+
tx.getOptional("SELECT powersync_init()") {}
124+
}
125+
122126
applySchema()
123127
updateHasSynced()
124128
}
Lines changed: 40 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -1,152 +1,67 @@
11
package com.powersync.db.internal
22

3-
import co.touchlab.stately.concurrency.AtomicBoolean
43
import com.powersync.PowerSyncException
54
import com.powersync.PsSqlDriver
5+
import kotlinx.coroutines.CancellationException
66
import kotlinx.coroutines.CompletableDeferred
77
import kotlinx.coroutines.CoroutineScope
8+
import kotlinx.coroutines.Job
9+
import kotlinx.coroutines.channels.Channel
10+
import kotlinx.coroutines.channels.ClosedSendChannelException
811
import kotlinx.coroutines.joinAll
912
import kotlinx.coroutines.launch
10-
import kotlinx.coroutines.sync.Mutex
11-
import kotlinx.coroutines.sync.withLock
12-
13-
internal class WrappedConnection(
14-
val transactorDriver: TransactorDriver,
15-
) {
16-
var busy: Boolean = false
17-
}
18-
19-
internal data class DeferredAction<R>(
20-
val deferred: CompletableDeferred<R>,
21-
val action: suspend (connection: TransactorDriver) -> R,
22-
)
2313

2414
internal class ConnectionPool(
2515
factory: () -> PsSqlDriver,
2616
size: Int = 5,
2717
private val scope: CoroutineScope,
2818
) {
29-
val closed = AtomicBoolean(false)
30-
31-
private val mutex = Mutex()
32-
private val connections = List(size) { WrappedConnection(TransactorDriver(factory())) }
33-
private val queue = mutableListOf<DeferredAction<Any?>>()
34-
private val activeOperations = mutableListOf<CompletableDeferred<Any?>>()
35-
36-
@Suppress("UNCHECKED_CAST")
37-
suspend fun <R> withConnection(action: suspend (connection: TransactorDriver) -> R): R {
38-
if (closed.value) {
39-
throw PowerSyncException(
40-
message = "Cannot process connection pool request",
41-
cause = Exception("Pool is closed"),
42-
)
43-
}
44-
45-
val wrappedDeferred = DeferredAction(CompletableDeferred<Any?>(), action)
46-
47-
val connection =
48-
mutex.withLock {
49-
// check if any connections are free
50-
val freeConnection = connections.find { !it.busy }
51-
if (freeConnection == null) {
52-
queue.add(wrappedDeferred)
53-
return@withLock null
19+
private val available = Channel<Pair<TransactorDriver, CompletableDeferred<Unit>>>()
20+
private val connections: List<Job> =
21+
buildList {
22+
repeat(size) {
23+
scope.launch {
24+
val driver = TransactorDriver(factory())
25+
try {
26+
while (true) {
27+
val done = CompletableDeferred<Unit>()
28+
try {
29+
available.send(driver to done)
30+
} catch (_: ClosedSendChannelException) {
31+
break // Pool closed
32+
}
33+
34+
done.await()
35+
}
36+
} finally {
37+
driver.driver.close()
38+
}
5439
}
55-
activeOperations.add(wrappedDeferred.deferred)
56-
freeConnection.busy = true
57-
return@withLock freeConnection
58-
}
59-
60-
if (connection != null) {
61-
// Can process immediately
62-
scope.launch {
63-
processRequest(wrappedDeferred, connection)
6440
}
6541
}
6642

67-
val result = wrappedDeferred.deferred.await()
68-
if (result != null) {
69-
val casted =
70-
result as? R
71-
?: throw PowerSyncException(
72-
"Could not cast deferred result",
73-
Exception("Casting error"),
74-
)
75-
return casted
76-
}
77-
return null as R
78-
}
43+
suspend fun <R> withConnection(action: suspend (connection: TransactorDriver) -> R): R {
44+
val (connection, done) =
45+
try {
46+
available.receive()
47+
} catch (e: PoolClosedException) {
48+
throw PowerSyncException(
49+
message = "Cannot process connection pool request",
50+
cause = e,
51+
)
52+
}
7953

80-
private suspend fun processRequest(
81-
request: DeferredAction<Any?>,
82-
wrappedConnection: WrappedConnection,
83-
) {
8454
try {
85-
val result = request.action(wrappedConnection.transactorDriver)
86-
request.deferred.complete(result)
87-
} catch (exception: Exception) {
88-
request.deferred.completeExceptionally(exception)
55+
return action(connection)
8956
} finally {
90-
mutex.withLock {
91-
wrappedConnection.busy = false
92-
activeOperations.remove(request.deferred)
93-
}
94-
scope.launch {
95-
processNext()
96-
}
57+
done.complete(Unit)
9758
}
9859
}
9960

100-
private suspend fun processNext() {
101-
val next: Pair<DeferredAction<Any?>, WrappedConnection> =
102-
mutex.withLock {
103-
if (queue.size == 0) {
104-
return
105-
}
106-
// check if a connection is open
107-
val connection = connections.find { !it.busy }
108-
if (connection == null) {
109-
return
110-
}
111-
val next = queue.first()
112-
queue.removeFirst()
113-
114-
connection.busy = true
115-
return@withLock next to connection
116-
}
117-
118-
processRequest(next.first, next.second)
119-
}
120-
12161
suspend fun close() {
122-
if (closed.value) {
123-
return
124-
}
125-
126-
closed.value = true
127-
val activeOperations =
128-
mutex.withLock {
129-
// These should all be pending
130-
for (item in queue) {
131-
item.deferred.completeExceptionally(
132-
PowerSyncException(
133-
message = "Pending requests are aborted",
134-
cause = Exception("Pool has been closed"),
135-
),
136-
)
137-
}
138-
queue.clear()
139-
// Return a copy of active items in order to check them
140-
return@withLock activeOperations.toList()
141-
}
142-
143-
// Wait for all pending operations
144-
// Alternatively we could cancel and ongoing jobs
145-
activeOperations.joinAll()
146-
147-
// Close all connections
148-
for (connection in connections) {
149-
connection.transactorDriver.driver.close()
150-
}
62+
available.cancel(PoolClosedException)
63+
connections.joinAll()
15164
}
15265
}
66+
67+
internal object PoolClosedException : CancellationException("Pool is closed")

0 commit comments

Comments
 (0)