Skip to content

Commit 0a9bae6

Browse files
add tests for connection pool close
1 parent 824e559 commit 0a9bae6

File tree

12 files changed

+105
-61
lines changed

12 files changed

+105
-61
lines changed

core/build.gradle.kts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,6 @@ kotlin {
271271
// tests.
272272
jvmTest.get().dependsOn(commonIntegrationTest)
273273

274-
androidMain.get().dependsOn(commonJDBC)
275-
jvmMain.get().dependsOn(commonJDBC)
276-
277274
// We're linking the xcframework for the simulator tests, so they can use integration tests too
278275
iosSimulatorArm64Test.orNull?.dependsOn(commonIntegrationTest)
279276
}

core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package com.powersync
22

33
import android.content.Context
4+
import com.powersync.db.JdbcSqliteDriver
5+
import com.powersync.db.buildDefaultWalProperties
46
import com.powersync.db.internal.InternalSchema
7+
import com.powersync.db.migrateDriver
58
import kotlinx.coroutines.CoroutineScope
69
import org.sqlite.SQLiteCommitListener
710

core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ import com.powersync.db.schema.Schema
1111
import com.powersync.testutils.UserRow
1212
import com.powersync.testutils.waitFor
1313
import kotlinx.coroutines.CompletableDeferred
14+
import kotlinx.coroutines.Dispatchers
1415
import kotlinx.coroutines.async
16+
import kotlinx.coroutines.delay
1517
import kotlinx.coroutines.runBlocking
1618
import kotlinx.coroutines.test.runTest
19+
import kotlinx.coroutines.withContext
1720
import kotlin.test.AfterTest
1821
import kotlin.test.BeforeTest
1922
import kotlin.test.Test
@@ -58,7 +61,11 @@ class DatabaseTest {
5861

5962
@AfterTest
6063
fun tearDown() {
61-
runBlocking { database.disconnectAndClear(true) }
64+
runBlocking {
65+
if (!database.closed) {
66+
database.disconnectAndClear(true)
67+
}
68+
}
6269
com.powersync.testutils.cleanup("testdb")
6370
}
6471

@@ -220,6 +227,47 @@ class DatabaseTest {
220227
}
221228
}
222229

230+
@Test
231+
fun testClosingReadPool() =
232+
runTest {
233+
val pausedLock = CompletableDeferred<Unit>()
234+
val inLock = CompletableDeferred<Unit>()
235+
// Request a lock
236+
val lockJob =
237+
async {
238+
database.readLock {
239+
inLock.complete(Unit)
240+
runBlocking {
241+
pausedLock.await()
242+
}
243+
}
244+
}
245+
246+
// Wait for the lock to be active
247+
inLock.await()
248+
249+
// Close the database. This should close the read pool
250+
// The pool should wait for jobs to complete before closing
251+
val closeJob =
252+
async {
253+
database.close()
254+
}
255+
256+
// Wait a little for testing
257+
// Spawns in a different context for the delay to actually take affect
258+
async { withContext(Dispatchers.Default) { delay(500) } }.await()
259+
260+
// The database should not close yet
261+
assertEquals(actual = database.closed, expected = false)
262+
263+
// Release the lock
264+
pausedLock.complete(Unit)
265+
lockJob.await()
266+
closeJob.await()
267+
268+
assertEquals(actual = database.closed, expected = true)
269+
}
270+
223271
@Test
224272
fun warnsMultipleInstances() =
225273
runTest {

core/src/commonJDBC/kotlin/com/powersync/JdbcPreparedStatement.kt renamed to core/src/commonJava/kotlin/com/powersync/db/JdbcPreparedStatement.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.powersync
1+
package com.powersync.db
22

33
import app.cash.sqldelight.db.QueryResult
44
import app.cash.sqldelight.db.SqlCursor

core/src/commonJDBC/kotlin/com/powersync/JdbcSqliteDriver.kt renamed to core/src/commonJava/kotlin/com/powersync/db/JdbcSqliteDriver.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.powersync
1+
package com.powersync.db
22

33
import app.cash.sqldelight.Query
44
import app.cash.sqldelight.Transacter

core/src/commonJDBC/kotlin/com/powersync/WalProperties.kt renamed to core/src/commonJava/kotlin/com/powersync/db/WalProperties.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.powersync
1+
package com.powersync.db
22

33
import java.util.Properties
44

core/src/commonMain/kotlin/com/powersync/PowerSyncDatabase.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ import kotlin.coroutines.cancellation.CancellationException
1919
* All changes to local tables are automatically recorded, whether connected or not. Once connected, the changes are uploaded.
2020
*/
2121
public interface PowerSyncDatabase : Queries {
22+
/**
23+
* Indicates if the PowerSync client has been closed.
24+
* A new client is required after a client has been closed.
25+
*/
26+
public val closed: Boolean
27+
2228
/**
2329
* Identifies the database client.
2430
* This is typically the database name.

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,22 @@ internal class PowerSyncDatabaseImpl(
7070

7171
override val identifier = dbDirectory + dbFilename
7272

73+
private val activeDatabaseGroup = ActiveDatabaseGroup.referenceDatabase(logger, identifier)
74+
private val resource = activeDatabaseGroup.first
75+
private val clearResourceWhenDisposed = activeDatabaseGroup.second
76+
7377
private val internalDb =
7478
InternalDatabaseImpl(
7579
factory = factory,
7680
scope = scope,
7781
dbFilename = dbFilename,
7882
dbDirectory = dbDirectory,
83+
writeLockMutex = resource.group.writeLockMutex,
7984
)
8085

8186
internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger)
82-
private val resource: ActiveDatabaseResource
83-
private val clearResourceWhenDisposed: Any
8487

85-
var closed = false
88+
override var closed = false
8689

8790
/**
8891
* The current sync status.
@@ -98,10 +101,6 @@ internal class PowerSyncDatabaseImpl(
98101
private var powerSyncVersion: String? = null
99102

100103
init {
101-
val res = ActiveDatabaseGroup.referenceDatabase(logger, identifier)
102-
resource = res.first
103-
clearResourceWhenDisposed = res.second
104-
105104
runBlocking {
106105
val sqliteVersion = internalDb.get("SELECT sqlite_version()") { it.getString(0)!! }
107106
logger.d { "SQLiteVersion: $sqliteVersion" }
@@ -323,23 +322,14 @@ internal class PowerSyncDatabaseImpl(
323322

324323
override suspend fun <R> readTransaction(callback: ThrowableTransactionCallback<R>): R = internalDb.writeTransaction(callback)
325324

326-
override suspend fun <R> writeLock(callback: ThrowableLockCallback<R>): R =
327-
resource.group.writeLockMutex.withLock {
328-
internalDb.writeLock(callback)
329-
}
325+
override suspend fun <R> writeLock(callback: ThrowableLockCallback<R>): R = internalDb.writeLock(callback)
330326

331-
override suspend fun <R> writeTransaction(callback: ThrowableTransactionCallback<R>): R =
332-
resource.group.writeLockMutex.withLock {
333-
internalDb.writeTransaction(callback)
334-
}
327+
override suspend fun <R> writeTransaction(callback: ThrowableTransactionCallback<R>): R = internalDb.writeTransaction(callback)
335328

336329
override suspend fun execute(
337330
sql: String,
338331
parameters: List<Any?>?,
339-
): Long =
340-
resource.group.writeLockMutex.withLock {
341-
internalDb.execute(sql, parameters)
342-
}
332+
): Long = internalDb.execute(sql, parameters)
343333

344334
private suspend fun handleWriteCheckpoint(
345335
lastTransactionId: Int,

core/src/commonMain/kotlin/com/powersync/db/internal/ConnectionPool.kt

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,22 @@ internal class ConnectionPool(
1818
) {
1919
private val available = Channel<Pair<TransactorDriver, CompletableDeferred<Unit>>>()
2020
private val connections: List<Job> =
21-
buildList {
22-
repeat(size) {
23-
scope.launch {
24-
val driver = TransactorDriver(factory())
25-
try {
26-
while (true) {
27-
val done = CompletableDeferred<Unit>()
28-
try {
29-
available.send(driver to done)
30-
} catch (_: ClosedSendChannelException) {
31-
break // Pool closed
32-
}
33-
34-
done.await()
21+
List(size) {
22+
scope.launch {
23+
val driver = TransactorDriver(factory())
24+
try {
25+
while (true) {
26+
val done = CompletableDeferred<Unit>()
27+
try {
28+
available.send(driver to done)
29+
} catch (_: ClosedSendChannelException) {
30+
break // Pool closed
3531
}
36-
} finally {
37-
driver.driver.close()
32+
33+
done.await()
3834
}
35+
} finally {
36+
driver.driver.close()
3937
}
4038
}
4139
}
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package com.powersync.db.internal
22

3-
import app.cash.sqldelight.db.Closeable
43
import com.powersync.db.Queries
54
import kotlinx.coroutines.flow.SharedFlow
65

7-
internal interface InternalDatabase :
8-
Queries,
9-
Closeable {
6+
internal interface InternalDatabase : Queries {
107
fun updatesOnTables(): SharedFlow<Set<String>>
8+
9+
suspend fun close(): Unit
1110
}

0 commit comments

Comments
 (0)