diff --git a/CHANGELOG.md b/CHANGELOG.md index dc37f86b..61ab0a93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Support tables created outside of PowerSync with the `RawTable` API. For more information, see [the documentation](https://docs.powersync.com/usage/use-case-examples/raw-tables). * Fix `runWrapped` catching cancellation exceptions. +* Fix errors in `PowerSyncBackendConnector.fetchCredentials()` crashing Android apps. ## 1.2.2 diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index ca744539..3e9136cb 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -13,6 +13,8 @@ import com.powersync.bucket.OpType import com.powersync.bucket.OplogEntry import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse +import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.PendingStatement import com.powersync.db.schema.PendingStatementParameter @@ -23,24 +25,20 @@ import com.powersync.testutils.databaseTest import com.powersync.testutils.waitFor import com.powersync.utils.JsonParam import com.powersync.utils.JsonUtil -import dev.mokkery.answering.returns -import dev.mokkery.every -import dev.mokkery.verify -import dev.mokkery.verifyNoMoreCalls -import dev.mokkery.verifySuspend import io.kotest.matchers.collections.shouldHaveSingleElement import io.kotest.matchers.collections.shouldHaveSize import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import io.kotest.matchers.string.shouldContain import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.launch -import kotlinx.serialization.encodeToString import kotlinx.serialization.json.jsonObject import kotlinx.serialization.json.jsonPrimitive import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertNotNull +import kotlin.test.fail import kotlin.time.Duration.Companion.seconds @OptIn(LegacySyncImplementation::class) @@ -116,6 +114,7 @@ abstract class BaseSyncIntegrationTest( turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) turbine.waitFor { it.connected } + connector.cachedCredentials shouldNotBe null database.disconnect() turbine.waitFor { !it.connected } @@ -126,7 +125,7 @@ abstract class BaseSyncIntegrationTest( waitFor { syncLines.isClosedForSend shouldBe true } // And called invalidateCredentials on the connector - verify { connector.invalidateCredentials() } + connector.cachedCredentials shouldBe null } @Test @@ -630,6 +629,12 @@ abstract class BaseSyncIntegrationTest( @Test fun testTokenExpired() = databaseTest { + var fetchCredentialsCalls = 0 + connector.fetchCredentialsCallback = { + fetchCredentialsCalls++ + TestConnector.testCredentials + } + turbineScope(timeout = 10.0.seconds) { val turbine = database.currentStatus.asFlow().testIn(this) @@ -638,13 +643,48 @@ abstract class BaseSyncIntegrationTest( syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000)) turbine.waitFor { it.connected } - verifySuspend { connector.getCredentialsCached() } - verifyNoMoreCalls(connector) + fetchCredentialsCalls shouldBe 1 // Should invalidate credentials when token expires syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 0)) turbine.waitFor { !it.connected } - verify { connector.invalidateCredentials() } + connector.cachedCredentials shouldBe null + + turbine.cancel() + } + } + + @Test + fun testTokenThrows() = + databaseTest { + // Regression test for https://github.com/powersync-ja/powersync-kotlin/issues/219 + var attempt = 0 + val connector = + object : PowerSyncBackendConnector() { + override suspend fun fetchCredentials(): PowerSyncCredentials? { + attempt++ + if (attempt == 1) { + fail("Expected exception from fetchCredentials") + } + + return TestConnector.testCredentials + } + + override suspend fun uploadData(database: PowerSyncDatabase) { + fail("Not implemented: uploadData") + } + } + + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + + database.connect(connector, 1000L, retryDelayMs = 5000, options = options) + turbine.waitFor { it.downloadError != null } + + database.currentStatus.downloadError?.toString() shouldContain "Expected exception from fetchCredentials" + + // Should retry, and the second fetchCredentials call will work + turbine.waitFor { it.connected } turbine.cancel() } @@ -662,10 +702,21 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { databaseTest { val prefetchCalled = CompletableDeferred() val completePrefetch = CompletableDeferred() - every { connector.prefetchCredentials() } returns - scope.launch { - prefetchCalled.complete(Unit) - completePrefetch.await() + var fetchCredentialsCount = 0 + + val connector = + object : PowerSyncBackendConnector() { + override suspend fun fetchCredentials(): PowerSyncCredentials? { + fetchCredentialsCount++ + if (fetchCredentialsCount == 2) { + prefetchCalled.complete(Unit) + completePrefetch.await() + } + + return TestConnector.testCredentials + } + + override suspend fun uploadData(database: PowerSyncDatabase) {} } turbineScope(timeout = 10.0.seconds) { @@ -676,8 +727,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000)) turbine.waitFor { it.connected } - verifySuspend { connector.getCredentialsCached() } - verifyNoMoreCalls(connector) + fetchCredentialsCount shouldBe 1 syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10)) prefetchCalled.await() @@ -689,6 +739,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { turbine.waitFor { !it.connected } turbine.waitFor { it.connected } + fetchCredentialsCount shouldBe 2 turbine.cancel() } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt index adbf2b37..4e9c49a7 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/testutils/TestUtils.kt @@ -9,19 +9,15 @@ import co.touchlab.kermit.Severity import co.touchlab.kermit.TestConfig import co.touchlab.kermit.TestLogWriter import com.powersync.DatabaseDriverFactory +import com.powersync.TestConnector import com.powersync.bucket.WriteCheckpointData import com.powersync.bucket.WriteCheckpointResponse -import com.powersync.connectors.PowerSyncBackendConnector -import com.powersync.connectors.PowerSyncCredentials import com.powersync.createPowerSyncDatabaseImpl import com.powersync.db.PowerSyncDatabaseImpl import com.powersync.db.schema.Schema import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncLine import com.powersync.utils.JsonUtil -import dev.mokkery.answering.returns -import dev.mokkery.everySuspend -import dev.mokkery.mock import io.ktor.client.HttpClient import io.ktor.client.HttpClientConfig import io.ktor.client.engine.mock.toByteArray @@ -103,16 +99,7 @@ internal class ActiveDatabaseTest( "db-$suffix" } - var connector = - mock { - everySuspend { getCredentialsCached() } returns - PowerSyncCredentials( - token = "test-token", - endpoint = "https://test.com", - ) - - everySuspend { invalidateCredentials() } returns Unit - } + var connector = TestConnector() fun openDatabase(schema: Schema = Schema(UserRow.table)): PowerSyncDatabaseImpl { logger.d { "Opening database $databaseName in directory $testDirectory" } diff --git a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt index 0b30074d..17f23302 100644 --- a/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt +++ b/core/src/commonMain/kotlin/com/powersync/connectors/PowerSyncBackendConnector.kt @@ -8,6 +8,8 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlin.coroutines.cancellation.CancellationException /** @@ -19,10 +21,17 @@ import kotlin.coroutines.cancellation.CancellationException * */ public abstract class PowerSyncBackendConnector { - private var cachedCredentials: PowerSyncCredentials? = null + internal var cachedCredentials: PowerSyncCredentials? = null + private var fetchingCredentials = Mutex() + private var fetchRequest: Job? = null private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) + private suspend fun fetchAndCacheCredentials(): PowerSyncCredentials? = + fetchCredentials().also { + cachedCredentials = it + } + /** * Get credentials current cached, or fetch new credentials if none are * available. @@ -33,8 +42,15 @@ public abstract class PowerSyncBackendConnector { public open suspend fun getCredentialsCached(): PowerSyncCredentials? { return runWrapped { cachedCredentials?.let { return@runWrapped it } - prefetchCredentials().join() - cachedCredentials + + return fetchingCredentials.withLock { + // With concurrent calls, it's possible that credentials have just been fetched. + cachedCredentials?.let { return it } + + val credentials = fetchAndCacheCredentials() + check(credentials === cachedCredentials) + credentials + } } } @@ -55,22 +71,39 @@ public abstract class PowerSyncBackendConnector { * * This may be called before the current credentials have expired. */ - @Throws(PowerSyncException::class, CancellationException::class) + @Deprecated( + "Call updateCredentials, bring your own CoroutineScope if you need it to be asynchronous", + replaceWith = ReplaceWith("updateCredentials"), + ) public open fun prefetchCredentials(): Job { fetchRequest?.takeIf { it.isActive }?.let { return it } val request = scope.launch { - fetchCredentials().also { value -> - cachedCredentials = value - fetchRequest = null - } + fetchAndCacheCredentials().also { fetchRequest = null } } fetchRequest = request return request } + /** + * If no other task is currently fetching credentials, calls [fetchCredentials] again and caches + * the result internally. + * + * This is used by the sync client if a token is about to expire: By fetching a new token early, + * we can avoid interruptions in the sync process. + */ + public suspend fun updateCredentials() { + if (fetchingCredentials.tryLock()) { + try { + fetchAndCacheCredentials() + } finally { + fetchingCredentials.unlock() + } + } + } + /** * Get credentials for PowerSync. * diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 80b8d7ad..0897e51c 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -400,7 +400,7 @@ internal class SyncStream( if (credentialsInvalidation == null) { val job = scope.launch { - connector.prefetchCredentials().join() + connector.updateCredentials() logger.v { "Stopping because new credentials are available" } // Token has been refreshed, start another iteration diff --git a/core/src/commonTest/kotlin/com/powersync/TestConnector.kt b/core/src/commonTest/kotlin/com/powersync/TestConnector.kt index 1c59b889..2f8d81a5 100644 --- a/core/src/commonTest/kotlin/com/powersync/TestConnector.kt +++ b/core/src/commonTest/kotlin/com/powersync/TestConnector.kt @@ -5,10 +5,7 @@ import com.powersync.connectors.PowerSyncCredentials class TestConnector : PowerSyncBackendConnector() { var fetchCredentialsCallback: suspend () -> PowerSyncCredentials? = { - PowerSyncCredentials( - token = "test-token", - endpoint = "https://test.com", - ) + testCredentials } var uploadDataCallback: suspend (PowerSyncDatabase) -> Unit = { val tx = it.getNextCrudTransaction() @@ -20,4 +17,12 @@ class TestConnector : PowerSyncBackendConnector() { override suspend fun uploadData(database: PowerSyncDatabase) { uploadDataCallback(database) } + + companion object { + val testCredentials = + PowerSyncCredentials( + token = "test-token", + endpoint = "https://powersynctest.example.com", + ) + } } diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index e9dedfe0..1e903f8b 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -6,16 +6,16 @@ import co.touchlab.kermit.Severity import co.touchlab.kermit.TestConfig import co.touchlab.kermit.TestLogWriter import com.powersync.ExperimentalPowerSyncAPI +import com.powersync.TestConnector import com.powersync.bucket.BucketStorage import com.powersync.connectors.PowerSyncBackendConnector -import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import com.powersync.db.schema.Schema import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock -import dev.mokkery.verify +import io.kotest.matchers.shouldBe import io.ktor.client.HttpClient import io.ktor.client.engine.mock.MockEngine import kotlinx.coroutines.delay @@ -55,24 +55,12 @@ class SyncStreamTest { mock { everySuspend { getClientId() } returns "test-client-id" } - connector = - mock { - everySuspend { getCredentialsCached() } returns - PowerSyncCredentials( - token = "test-token", - endpoint = "https://test.com", - ) - } + connector = TestConnector() } @Test fun testInvalidateCredentials() = runTest { - connector = - mock { - everySuspend { invalidateCredentials() } returns Unit - } - syncStream = SyncStream( bucketStorage = bucketStorage, @@ -86,8 +74,9 @@ class SyncStreamTest { schema = Schema(), ) + connector.cachedCredentials = TestConnector.testCredentials syncStream.invalidateCredentials() - verify { connector.invalidateCredentials() } + connector.cachedCredentials shouldBe null } // TODO: Work on improving testing this without needing to test the logs are displayed