Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Support tables created outside of PowerSync with the `RawTable` API.
For more information, see [the documentation](https://docs.powersync.com/usage/use-case-examples/raw-tables).
* Fix `runWrapped` catching cancellation exceptions.
* Fix errors in `PowerSyncBackendConnector.fetchCredentials()` crashing Android apps.

## 1.2.2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import com.powersync.bucket.OpType
import com.powersync.bucket.OplogEntry
import com.powersync.bucket.WriteCheckpointData
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.db.PowerSyncDatabaseImpl
import com.powersync.db.schema.PendingStatement
import com.powersync.db.schema.PendingStatementParameter
Expand All @@ -23,24 +25,20 @@ import com.powersync.testutils.databaseTest
import com.powersync.testutils.waitFor
import com.powersync.utils.JsonParam
import com.powersync.utils.JsonUtil
import dev.mokkery.answering.returns
import dev.mokkery.every
import dev.mokkery.verify
import dev.mokkery.verifyNoMoreCalls
import dev.mokkery.verifySuspend
import io.kotest.matchers.collections.shouldHaveSingleElement
import io.kotest.matchers.collections.shouldHaveSize
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.kotest.matchers.string.shouldContain
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.jsonObject
import kotlinx.serialization.json.jsonPrimitive
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.fail
import kotlin.time.Duration.Companion.seconds

@OptIn(LegacySyncImplementation::class)
Expand Down Expand Up @@ -116,6 +114,7 @@ abstract class BaseSyncIntegrationTest(
turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
turbine.waitFor { it.connected }
connector.cachedCredentials shouldNotBe null

database.disconnect()
turbine.waitFor { !it.connected }
Expand All @@ -126,7 +125,7 @@ abstract class BaseSyncIntegrationTest(
waitFor { syncLines.isClosedForSend shouldBe true }

// And called invalidateCredentials on the connector
verify { connector.invalidateCredentials() }
connector.cachedCredentials shouldBe null
}

@Test
Expand Down Expand Up @@ -630,6 +629,12 @@ abstract class BaseSyncIntegrationTest(
@Test
fun testTokenExpired() =
databaseTest {
var fetchCredentialsCalls = 0
connector.fetchCredentialsCallback = {
fetchCredentialsCalls++
TestConnector.testCredentials
}

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)

Expand All @@ -638,13 +643,48 @@ abstract class BaseSyncIntegrationTest(

syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000))
turbine.waitFor { it.connected }
verifySuspend { connector.getCredentialsCached() }
verifyNoMoreCalls(connector)
fetchCredentialsCalls shouldBe 1

// Should invalidate credentials when token expires
syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 0))
turbine.waitFor { !it.connected }
verify { connector.invalidateCredentials() }
connector.cachedCredentials shouldBe null

turbine.cancel()
}
}

@Test
fun testTokenThrows() =
databaseTest {
// Regression test for https://github.com/powersync-ja/powersync-kotlin/issues/219
var attempt = 0
val connector =
object : PowerSyncBackendConnector() {
override suspend fun fetchCredentials(): PowerSyncCredentials? {
attempt++
if (attempt == 1) {
fail("Expected exception from fetchCredentials")
}

return TestConnector.testCredentials
}

override suspend fun uploadData(database: PowerSyncDatabase) {
fail("Not implemented: uploadData")
}
}

turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)

database.connect(connector, 1000L, retryDelayMs = 5000, options = options)
turbine.waitFor { it.downloadError != null }

database.currentStatus.downloadError?.toString() shouldContain "Expected exception from fetchCredentials"

// Should retry, and the second fetchCredentials call will work
turbine.waitFor { it.connected }

turbine.cancel()
}
Expand All @@ -662,10 +702,21 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
databaseTest {
val prefetchCalled = CompletableDeferred<Unit>()
val completePrefetch = CompletableDeferred<Unit>()
every { connector.prefetchCredentials() } returns
scope.launch {
prefetchCalled.complete(Unit)
completePrefetch.await()
var fetchCredentialsCount = 0

val connector =
object : PowerSyncBackendConnector() {
override suspend fun fetchCredentials(): PowerSyncCredentials? {
fetchCredentialsCount++
if (fetchCredentialsCount == 2) {
prefetchCalled.complete(Unit)
completePrefetch.await()
}

return TestConnector.testCredentials
}

override suspend fun uploadData(database: PowerSyncDatabase) {}
}

turbineScope(timeout = 10.0.seconds) {
Expand All @@ -676,8 +727,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {

syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 4000))
turbine.waitFor { it.connected }
verifySuspend { connector.getCredentialsCached() }
verifyNoMoreCalls(connector)
fetchCredentialsCount shouldBe 1

syncLines.send(SyncLine.KeepAlive(tokenExpiresIn = 10))
prefetchCalled.await()
Expand All @@ -689,6 +739,7 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
turbine.waitFor { !it.connected }

turbine.waitFor { it.connected }
fetchCredentialsCount shouldBe 2
turbine.cancel()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,15 @@ import co.touchlab.kermit.Severity
import co.touchlab.kermit.TestConfig
import co.touchlab.kermit.TestLogWriter
import com.powersync.DatabaseDriverFactory
import com.powersync.TestConnector
import com.powersync.bucket.WriteCheckpointData
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.createPowerSyncDatabaseImpl
import com.powersync.db.PowerSyncDatabaseImpl
import com.powersync.db.schema.Schema
import com.powersync.sync.LegacySyncImplementation
import com.powersync.sync.SyncLine
import com.powersync.utils.JsonUtil
import dev.mokkery.answering.returns
import dev.mokkery.everySuspend
import dev.mokkery.mock
import io.ktor.client.HttpClient
import io.ktor.client.HttpClientConfig
import io.ktor.client.engine.mock.toByteArray
Expand Down Expand Up @@ -103,16 +99,7 @@ internal class ActiveDatabaseTest(
"db-$suffix"
}

var connector =
mock<PowerSyncBackendConnector> {
everySuspend { getCredentialsCached() } returns
PowerSyncCredentials(
token = "test-token",
endpoint = "https://test.com",
)

everySuspend { invalidateCredentials() } returns Unit
}
var connector = TestConnector()

fun openDatabase(schema: Schema = Schema(UserRow.table)): PowerSyncDatabaseImpl {
logger.d { "Opening database $databaseName in directory $testDirectory" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.cancellation.CancellationException

/**
Expand All @@ -19,10 +21,17 @@ import kotlin.coroutines.cancellation.CancellationException
*
*/
public abstract class PowerSyncBackendConnector {
private var cachedCredentials: PowerSyncCredentials? = null
internal var cachedCredentials: PowerSyncCredentials? = null
private var fetchingCredentials = Mutex()

private var fetchRequest: Job? = null
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())

private suspend fun fetchAndCacheCredentials(): PowerSyncCredentials? =
fetchCredentials().also {
cachedCredentials = it
}

/**
* Get credentials current cached, or fetch new credentials if none are
* available.
Expand All @@ -33,8 +42,15 @@ public abstract class PowerSyncBackendConnector {
public open suspend fun getCredentialsCached(): PowerSyncCredentials? {
return runWrapped {
cachedCredentials?.let { return@runWrapped it }
prefetchCredentials().join()
cachedCredentials

return fetchingCredentials.withLock {
// With concurrent calls, it's possible that credentials have just been fetched.
cachedCredentials?.let { return it }

val credentials = fetchAndCacheCredentials()
check(credentials === cachedCredentials)
credentials
}
}
}

Expand All @@ -55,22 +71,39 @@ public abstract class PowerSyncBackendConnector {
*
* This may be called before the current credentials have expired.
*/
@Throws(PowerSyncException::class, CancellationException::class)
@Deprecated(
"Call updateCredentials, bring your own CoroutineScope if you need it to be asynchronous",
replaceWith = ReplaceWith("updateCredentials"),
)
public open fun prefetchCredentials(): Job {
fetchRequest?.takeIf { it.isActive }?.let { return it }

val request =
scope.launch {
fetchCredentials().also { value ->
cachedCredentials = value
fetchRequest = null
}
fetchAndCacheCredentials().also { fetchRequest = null }
}

fetchRequest = request
return request
}

/**
* If no other task is currently fetching credentials, calls [fetchCredentials] again and caches
* the result internally.
*
* This is used by the sync client if a token is about to expire: By fetching a new token early,
* we can avoid interruptions in the sync process.
*/
public suspend fun updateCredentials() {
if (fetchingCredentials.tryLock()) {
try {
fetchAndCacheCredentials()
} finally {
fetchingCredentials.unlock()
}
}
}

/**
* Get credentials for PowerSync.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ internal class SyncStream(
if (credentialsInvalidation == null) {
val job =
scope.launch {
connector.prefetchCredentials().join()
connector.updateCredentials()
logger.v { "Stopping because new credentials are available" }

// Token has been refreshed, start another iteration
Expand Down
13 changes: 9 additions & 4 deletions core/src/commonTest/kotlin/com/powersync/TestConnector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ import com.powersync.connectors.PowerSyncCredentials

class TestConnector : PowerSyncBackendConnector() {
var fetchCredentialsCallback: suspend () -> PowerSyncCredentials? = {
PowerSyncCredentials(
token = "test-token",
endpoint = "https://test.com",
)
testCredentials
}
var uploadDataCallback: suspend (PowerSyncDatabase) -> Unit = {
val tx = it.getNextCrudTransaction()
Expand All @@ -20,4 +17,12 @@ class TestConnector : PowerSyncBackendConnector() {
override suspend fun uploadData(database: PowerSyncDatabase) {
uploadDataCallback(database)
}

companion object {
val testCredentials =
PowerSyncCredentials(
token = "test-token",
endpoint = "https://powersynctest.example.com",
)
}
}
21 changes: 5 additions & 16 deletions core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import co.touchlab.kermit.Severity
import co.touchlab.kermit.TestConfig
import co.touchlab.kermit.TestLogWriter
import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.TestConnector
import com.powersync.bucket.BucketStorage
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.db.crud.CrudEntry
import com.powersync.db.crud.UpdateType
import com.powersync.db.schema.Schema
import dev.mokkery.answering.returns
import dev.mokkery.everySuspend
import dev.mokkery.mock
import dev.mokkery.verify
import io.kotest.matchers.shouldBe
import io.ktor.client.HttpClient
import io.ktor.client.engine.mock.MockEngine
import kotlinx.coroutines.delay
Expand Down Expand Up @@ -55,24 +55,12 @@ class SyncStreamTest {
mock<BucketStorage> {
everySuspend { getClientId() } returns "test-client-id"
}
connector =
mock<PowerSyncBackendConnector> {
everySuspend { getCredentialsCached() } returns
PowerSyncCredentials(
token = "test-token",
endpoint = "https://test.com",
)
}
connector = TestConnector()
}

@Test
fun testInvalidateCredentials() =
runTest {
connector =
mock<PowerSyncBackendConnector> {
everySuspend { invalidateCredentials() } returns Unit
}

syncStream =
SyncStream(
bucketStorage = bucketStorage,
Expand All @@ -86,8 +74,9 @@ class SyncStreamTest {
schema = Schema(),
)

connector.cachedCredentials = TestConnector.testCredentials
syncStream.invalidateCredentials()
verify { connector.invalidateCredentials() }
connector.cachedCredentials shouldBe null
}

// TODO: Work on improving testing this without needing to test the logs are displayed
Expand Down
Loading