diff --git a/CHANGELOG.md b/CHANGELOG.md index 285f7703..bfac0821 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.0.0-BETA30 + +* Fix a deadlock when calling `connect()` immediately after opening a database. + The issue has been introduced in version `1.0.0-BETA29`. + ## 1.0.0-BETA29 * Fix potential race condition between jobs in `connect()` and `disconnect()`. diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 3ce4a791..fd91ff77 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -34,12 +34,26 @@ class SyncIntegrationTest { users shouldHaveSize amount } + @Test + @OptIn(DelicateCoroutinesApi::class) + fun connectImmediately() = + databaseTest(createInitialDatabase = false) { + // Regression test for https://github.com/powersync-ja/powersync-kotlin/issues/169 + val database = openDatabase() + database.connect(connector) + + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected } + turbine.cancel() + } + } + @Test @OptIn(DelicateCoroutinesApi::class) fun closesResponseStreamOnDatabaseClose() = databaseTest { - val syncStream = database.syncStream() - database.connectInternal(syncStream, 1000L) + database.connect(connector) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -58,8 +72,7 @@ class SyncIntegrationTest { @OptIn(DelicateCoroutinesApi::class) fun cleansResourcesOnDisconnect() = databaseTest { - val syncStream = database.syncStream() - database.connectInternal(syncStream, 1000L) + database.connect(connector) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -80,8 +93,7 @@ class SyncIntegrationTest { @Test fun cannotUpdateSchemaWhileConnected() = databaseTest { - val syncStream = database.syncStream() - database.connectInternal(syncStream, 1000L) + database.connect(connector) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -99,8 +111,7 @@ class SyncIntegrationTest { @Test fun testPartialSync() = databaseTest { - val syncStream = database.syncStream() - database.connectInternal(syncStream, 1000L) + database.connect(connector) val checksums = buildList { @@ -146,7 +157,7 @@ class SyncIntegrationTest { } turbineScope(timeout = 10.0.seconds) { - val turbine = syncStream.status.asFlow().testIn(this) + val turbine = database.currentStatus.asFlow().testIn(this) turbine.waitFor { it.connected } database.expectUserCount(0) @@ -191,8 +202,7 @@ class SyncIntegrationTest { @Test fun testRemembersLastPartialSync() = databaseTest { - val syncStream = database.syncStream() - database.connectInternal(syncStream, 1000L) + database.connect(connector) syncLines.send( SyncLine.FullCheckpoint( @@ -228,8 +238,7 @@ class SyncIntegrationTest { @Test fun setsDownloadingState() = databaseTest { - val syncStream = database.syncStream() - database.connectInternal(syncStream, 1000L) + database.connect(connector) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -261,10 +270,9 @@ class SyncIntegrationTest { fun setsConnectingState() = databaseTest { turbineScope(timeout = 10.0.seconds) { - val syncStream = database.syncStream() val turbine = database.currentStatus.asFlow().testIn(this) - database.connectInternal(syncStream, 1000L) + database.connect(connector) turbine.waitFor { it.connecting } database.disconnect() @@ -277,8 +285,7 @@ class SyncIntegrationTest { @Test fun testMultipleSyncsDoNotCreateMultipleStatusEntries() = databaseTest { - val syncStream = database.syncStream() - database.connectInternal(syncStream, 1000L) + database.connect(connector) turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -324,7 +331,7 @@ class SyncIntegrationTest { turbineScope(timeout = 10.0.seconds) { // Connect the first database - database.connect(connector, 1000L) + database.connect(connector) db2.connect(connector) waitFor { @@ -350,7 +357,7 @@ class SyncIntegrationTest { val turbine2 = db2.currentStatus.asFlow().testIn(this) // Connect the first database - database.connect(connector, 1000L) + database.connect(connector) turbine1.waitFor { it.connecting } db2.connect(connector) @@ -414,7 +421,7 @@ class SyncIntegrationTest { databaseTest { val testConnector = TestConnector() connector = testConnector - database.connectInternal(database.syncStream(), 1000L) + database.connect(testConnector) suspend fun expectUserRows(amount: Int) { val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index 298f433c..88e2382b 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -7,23 +7,23 @@ import co.touchlab.kermit.Severity import co.touchlab.kermit.TestConfig import co.touchlab.kermit.TestLogWriter import com.powersync.DatabaseDriverFactory -import com.powersync.PowerSyncDatabase import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse import com.powersync.connectors.PowerSyncBackendConnector import com.powersync.connectors.PowerSyncCredentials +import com.powersync.createPowerSyncDatabaseImpl import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema import com.powersync.sync.SyncLine -import com.powersync.sync.SyncStream import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock +import io.ktor.client.HttpClient +import io.ktor.client.HttpClientConfig import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import kotlinx.io.files.Path -import kotlinx.serialization.json.JsonObject expect val factory: DatabaseDriverFactory @@ -43,18 +43,22 @@ fun generatePrintLogWriter() = } } -internal fun databaseTest(testBody: suspend ActiveDatabaseTest.() -> Unit) = - runTest { - val running = ActiveDatabaseTest(this) +internal fun databaseTest( + createInitialDatabase: Boolean = true, + testBody: suspend ActiveDatabaseTest.() -> Unit, +) = runTest { + val running = ActiveDatabaseTest(this) + if (createInitialDatabase) { // Make sure the database is initialized, we're using internal APIs that expect initialization. running.database = running.openDatabaseAndInitialize() + } - try { - running.testBody() - } finally { - running.cleanup() - } + try { + running.testBody() + } finally { + running.cleanup() } +} @OptIn(ExperimentalKermitApi::class) internal class ActiveDatabaseTest( @@ -104,32 +108,27 @@ internal class ActiveDatabaseTest( fun openDatabase(): PowerSyncDatabaseImpl { logger.d { "Opening database $databaseName in directory $testDirectory" } val db = - PowerSyncDatabase( + createPowerSyncDatabaseImpl( factory = factory, schema = Schema(UserRow.table), dbFilename = databaseName, dbDirectory = testDirectory, logger = logger, scope = scope, + createClient = ::createClient, ) doOnCleanup { db.close() } - return db as PowerSyncDatabaseImpl + return db } suspend fun openDatabaseAndInitialize(): PowerSyncDatabaseImpl = openDatabase().also { it.readLock { } } - fun PowerSyncDatabase.syncStream(): SyncStream { - val client = MockSyncService(syncLines) { checkpointResponse() } - return SyncStream( - bucketStorage = database.bucketStorage, - connector = connector, - httpEngine = client, - uploadCrud = { connector.uploadData(this) }, - retryDelayMs = 10, - logger = logger, - params = JsonObject(emptyMap()), - scope = scope, - ) + private fun createClient(config: HttpClientConfig<*>.() -> Unit): HttpClient { + val engine = MockSyncService(syncLines) { checkpointResponse() } + + return HttpClient(engine) { + config() + } } fun doOnCleanup(action: suspend () -> Unit) { diff --git a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabaseFactory.kt b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabaseFactory.kt index 1206bfe2..9ba2ca60 100644 --- a/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabaseFactory.kt +++ b/core/src/commonMain/kotlin/com/powersync/PowerSyncDatabaseFactory.kt @@ -4,7 +4,10 @@ import co.touchlab.kermit.Logger import co.touchlab.skie.configuration.annotations.DefaultArgumentInterop import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema +import com.powersync.sync.SyncStream import com.powersync.utils.generateLogger +import io.ktor.client.HttpClient +import io.ktor.client.HttpClientConfig import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.GlobalScope @@ -30,7 +33,7 @@ public fun PowerSyncDatabase( ): PowerSyncDatabase { val generatedLogger: Logger = generateLogger(logger) - return PowerSyncDatabaseImpl( + return createPowerSyncDatabaseImpl( schema = schema, factory = factory, dbFilename = dbFilename, @@ -39,3 +42,22 @@ public fun PowerSyncDatabase( dbDirectory = dbDirectory, ) } + +internal fun createPowerSyncDatabaseImpl( + factory: DatabaseDriverFactory, + schema: Schema, + dbFilename: String, + scope: CoroutineScope, + logger: Logger, + dbDirectory: String?, + createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient = SyncStream::defaultHttpClient, +): PowerSyncDatabaseImpl = + PowerSyncDatabaseImpl( + schema = schema, + factory = factory, + dbFilename = dbFilename, + scope = scope, + logger = logger, + dbDirectory = dbDirectory, + createClient = createClient, + ) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index bc47feb8..87209f4f 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -24,6 +24,8 @@ import com.powersync.utils.JsonParam import com.powersync.utils.JsonUtil import com.powersync.utils.throttle import com.powersync.utils.toJsonObject +import io.ktor.client.HttpClient +import io.ktor.client.HttpClientConfig import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.FlowPreview @@ -61,6 +63,7 @@ internal class PowerSyncDatabaseImpl( private val dbFilename: String, private val dbDirectory: String? = null, val logger: Logger = Logger, + private val createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient, ) : PowerSyncDatabase { companion object { internal val streamConflictMessage = @@ -148,22 +151,25 @@ internal class PowerSyncDatabaseImpl( crudThrottleMs: Long, retryDelayMs: Long, params: Map, - ) = mutex.withLock { + ) { waitReady() - disconnectInternal() - - connectInternal( - SyncStream( - bucketStorage = bucketStorage, - connector = connector, - uploadCrud = suspend { connector.uploadData(this) }, - retryDelayMs = retryDelayMs, - logger = logger, - params = params.toJsonObject(), - scope = scope, - ), - crudThrottleMs, - ) + mutex.withLock { + disconnectInternal() + + connectInternal( + SyncStream( + bucketStorage = bucketStorage, + connector = connector, + uploadCrud = suspend { connector.uploadData(this) }, + retryDelayMs = retryDelayMs, + logger = logger, + params = params.toJsonObject(), + scope = scope, + createClient = createClient, + ), + crudThrottleMs, + ) + } } @OptIn(FlowPreview::class) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 60225952..3f07e0b5 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -13,7 +13,6 @@ import com.powersync.utils.JsonUtil import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig import io.ktor.client.call.body -import io.ktor.client.engine.HttpClientEngine import io.ktor.client.plugins.HttpTimeout import io.ktor.client.plugins.contentnegotiation.ContentNegotiation import io.ktor.client.plugins.timeout @@ -48,7 +47,7 @@ internal class SyncStream( private val logger: Logger, private val params: JsonObject, private val scope: CoroutineScope, - httpEngine: HttpClientEngine? = null, + createClient: (HttpClientConfig<*>.() -> Unit) -> HttpClient, ) { private var isUploadingCrud = AtomicReference(null) @@ -59,26 +58,12 @@ internal class SyncStream( private var clientId: String? = null - private val httpClient: HttpClient - - init { - fun HttpClientConfig<*>.configureClient() { + private val httpClient: HttpClient = + createClient { install(HttpTimeout) install(ContentNegotiation) } - httpClient = - if (httpEngine == null) { - HttpClient { - configureClient() - } - } else { - HttpClient(httpEngine) { - configureClient() - } - } - } - fun invalidateCredentials() { connector.invalidateCredentials() } @@ -484,6 +469,13 @@ internal class SyncStream( triggerCrudUploadAsync() return state } + + internal companion object { + fun defaultHttpClient(config: HttpClientConfig<*>.() -> Unit) = + HttpClient { + config(this) + } + } } internal data class SyncStreamState( diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 069139bd..d335b4b4 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -29,6 +29,7 @@ import dev.mokkery.verify import dev.mokkery.verify.VerifyMode.Companion.order import dev.mokkery.verifyNoMoreCalls import dev.mokkery.verifySuspend +import io.ktor.client.HttpClient import io.ktor.client.engine.mock.MockEngine import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay @@ -103,7 +104,7 @@ class SyncStreamTest { SyncStream( bucketStorage = bucketStorage, connector = connector, - httpEngine = assertNoHttpEngine, + createClient = { config -> HttpClient(assertNoHttpEngine, config) }, uploadCrud = {}, logger = logger, params = JsonObject(emptyMap()), @@ -139,7 +140,7 @@ class SyncStreamTest { SyncStream( bucketStorage = bucketStorage, connector = connector, - httpEngine = assertNoHttpEngine, + createClient = { config -> HttpClient(assertNoHttpEngine, config) }, uploadCrud = { }, retryDelayMs = 10, logger = logger, @@ -179,7 +180,7 @@ class SyncStreamTest { SyncStream( bucketStorage = bucketStorage, connector = connector, - httpEngine = assertNoHttpEngine, + createClient = { config -> HttpClient(assertNoHttpEngine, config) }, uploadCrud = { }, retryDelayMs = 10, logger = logger, @@ -220,7 +221,7 @@ class SyncStreamTest { SyncStream( bucketStorage = bucketStorage, connector = connector, - httpEngine = client, + createClient = { config -> HttpClient(client, config) }, uploadCrud = { }, retryDelayMs = 10, logger = logger, diff --git a/gradle.properties b/gradle.properties index d71c9775..6cc158f7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -17,7 +17,7 @@ development=true RELEASE_SIGNING_ENABLED=true # Library config GROUP=com.powersync -LIBRARY_VERSION=1.0.0-BETA29 +LIBRARY_VERSION=1.0.0-BETA30 GITHUB_REPO=https://github.com/powersync-ja/powersync-kotlin.git # POM POM_URL=https://github.com/powersync-ja/powersync-kotlin/