diff --git a/CHANGELOG.md b/CHANGELOG.md index 32e0d7bd..28a95feb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Remove internal SQLDelight and SQLiter dependencies. * Add `rawConnection` getter to `ConnectionContext`, which is a `SQLiteConnection` instance from `androidx.sqlite` that can be used to step through statements in a custom way. +* Fix an issue where `watch()` would run queries more often than intended. ## 1.5.1 diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt index 4d063a2b..92aafdb7 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt @@ -17,7 +17,6 @@ import com.powersync.testutils.MockedRemoteStorage import com.powersync.testutils.UserRow import com.powersync.testutils.databaseTest import com.powersync.testutils.getTempDir -import com.powersync.testutils.waitFor import dev.mokkery.answering.throws import dev.mokkery.everySuspend import dev.mokkery.matcher.ArgMatchersScope @@ -29,6 +28,7 @@ import dev.mokkery.verifySuspend import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.onEach import kotlinx.io.files.Path @@ -60,7 +60,17 @@ class AttachmentsTest { database .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) - }.onEach { logger.i { "attachments table results: $it" } } + } + // Because tests run on slow machines, it's possible for a schedule like the following + // to happen: + // 1. the attachment is initially saved with QUEUED_DOWNLOAD, triggering a query. + // 2. the attachment is downloaded fast, but the query flow is paused. + // 3. only now is the query scheduled by 1 actually running, reporting SYNCED. + // 4. we delete the attachment. + // 5. thanks to 2, the query runs again, again reporting SYNCED. + // 6. Our test now fails because the second event should be ARCHIVED. + .distinctUntilChanged() + .onEach { logger.i { "attachments table results: $it" } } suspend fun updateSchema(db: PowerSyncDatabase) { db.updateSchema( @@ -278,10 +288,13 @@ class AttachmentsTest { """, ) - waitFor { - var nextRecord: Attachment? = attachmentQuery.awaitItem().firstOrNull() - // The record should have been deleted - nextRecord shouldBe null + while (true) { + val item = attachmentQuery.awaitItem() + if (item.isEmpty()) { + break + } + + logger.v { "Waiting for attachment record to be deleted (current $item)" } } // The file should have been deleted from storage @@ -346,10 +359,13 @@ class AttachmentsTest { database.get("SELECT photo_id FROM users") { it.getString("photo_id") } // Wait for the record to be synced (mocked backend will allow it) - waitFor { - val record = attachmentQuery.awaitItem().first() - record shouldNotBe null - record.state shouldBe AttachmentState.SYNCED + while (true) { + val item = attachmentQuery.awaitItem().firstOrNull() + if (item != null && item.state == AttachmentState.SYNCED) { + break + } + + logger.v { "Waiting for attachment record to be synced (current $item)" } } queue.deleteFile( @@ -369,10 +385,14 @@ class AttachmentsTest { ) } - waitFor { - // Record should be deleted - val record = attachmentQuery.awaitItem().firstOrNull() - record shouldBe null + // Record should be deleted + while (true) { + val item = attachmentQuery.awaitItem() + if (item.isEmpty()) { + break + } + + logger.v { "Waiting for attachment record to be deleted (current $item)" } } // A delete should have been attempted for this file @@ -559,14 +579,16 @@ class AttachmentsTest { """, ) + // Depending on when the query updates, we'll see the attachment as queued for + // download or archived. var attachmentRecord = attachmentQuery.awaitItem().first() attachmentRecord shouldNotBe null - attachmentRecord.state shouldBe AttachmentState.QUEUED_DOWNLOAD + if (attachmentRecord.state == AttachmentState.QUEUED_DOWNLOAD) { + attachmentRecord = attachmentQuery.awaitItem().first() + } // The download should fail. We don't specify a retry. The record should be archived. - attachmentRecord = attachmentQuery.awaitItem().first() - attachmentRecord.state shouldBe AttachmentState.ARCHIVED attachmentQuery.cancelAndIgnoreRemainingEvents() diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt index ee91a2d5..810d25d9 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/DatabaseTest.kt @@ -211,11 +211,10 @@ class DatabaseTest { var changeSet = query.awaitItem() // The initial result - changeSet.count() shouldBe 0 + changeSet shouldHaveSize 0 changeSet = query.awaitItem() - changeSet.count() shouldBe 1 - changeSet.contains("users") shouldBe true + changeSet shouldBe setOf("users") query.cancel() } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 84e69e21..306db1f9 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -105,10 +105,11 @@ abstract class BaseSyncIntegrationTest( database.close() turbine.waitFor { !it.connected } - turbine.cancel() + turbine.cancelAndIgnoreRemainingEvents() } // Closing the database should have closed the channel. + logger.v { "Database is closed, waiting to close HTTP stream" } waitFor { syncLines.isClosedForSend shouldBe true } } diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt b/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt index cf031027..03888c54 100644 --- a/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt +++ b/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt @@ -96,6 +96,8 @@ public open class SyncingService( */ try { val attachments = context.getActiveAttachments() + logger.v { "Processing active attachments: $attachments" } + // Performs pending operations and updates attachment states handleSync(attachments, context) @@ -298,6 +300,8 @@ public open class SyncingService( */ public suspend fun deleteArchivedAttachments(context: AttachmentContext): Boolean = context.deleteArchivedAttachments { pendingDelete -> + logger.v { "Deleting archived attachments: $pendingDelete" } + for (attachment in pendingDelete) { if (attachment.localUri == null) { continue diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 1b24f999..2a02d86e 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -81,7 +81,7 @@ internal class PowerSyncDatabaseImpl( private val resource = activeDatabaseGroup.first - private val internalDb = InternalDatabaseImpl(pool) + private val internalDb = InternalDatabaseImpl(pool, logger) internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger) diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 0e902533..d1185c8b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -1,7 +1,7 @@ package com.powersync.db.internal +import co.touchlab.kermit.Logger import com.powersync.ExperimentalPowerSyncAPI -import com.powersync.PowerSyncException import com.powersync.db.SqlCursor import com.powersync.db.ThrowableLockCallback import com.powersync.db.ThrowableTransactionCallback @@ -15,8 +15,9 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.channelFlow -import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onSubscription import kotlinx.coroutines.flow.transform import kotlinx.coroutines.withContext @@ -25,6 +26,7 @@ import kotlin.time.Duration.Companion.milliseconds @OptIn(ExperimentalPowerSyncAPI::class) internal class InternalDatabaseImpl( private val pool: SQLiteConnectionPool, + private val logger: Logger, ) : InternalDatabase { // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. private val dbContext = Dispatchers.IO @@ -79,41 +81,15 @@ internal class InternalDatabaseImpl( tables: Set, throttleMs: Long, triggerImmediately: Boolean, - ): Flow> = - channelFlow { - // Match all possible internal table combinations - val watchedTables = - tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet() - - // Accumulate updates between throttles - val batchedUpdates = AtomicMutableSet() + ): Flow> { + // Match all possible internal table combinations + val watchedTables = + tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet() - updatesOnTables() - .onSubscription { - if (triggerImmediately) { - // Emit an initial event (if requested). No changes would be detected at this point - send(setOf()) - } - }.transform { updates -> - val intersection = updates.intersect(watchedTables) - if (intersection.isNotEmpty()) { - // Transform table names using friendlyTableName - val friendlyTableNames = intersection.map { friendlyTableName(it) }.toSet() - batchedUpdates.addAll(friendlyTableNames) - emit(Unit) - } - } - // Throttling here is a feature which prevents watch queries from spamming updates. - // Throttling by design discards and delays events within the throttle window. Discarded events - // still trigger a trailing edge update. - // Backpressure is avoided on the throttling and consumer level by buffering the last upstream value. - .throttle(throttleMs.milliseconds) - .collect { - // Emit the transformed tables which have changed - val copy = batchedUpdates.toSetAndClear() - send(copy) - } + return rawChangedTables(watchedTables, throttleMs, triggerImmediately).map { + it.mapTo(mutableSetOf(), ::friendlyTableName) } + } override fun watch( sql: String, @@ -121,32 +97,58 @@ internal class InternalDatabaseImpl( throttleMs: Long, mapper: (SqlCursor) -> RowType, ): Flow> = - // Use a channel flow here since we throttle (buffer used under the hood) - // This causes some emissions to be from different scopes. - channelFlow { + flow { // Fetch the tables asynchronously with getAll val tables = getSourceTables(sql, parameters) .filter { it.isNotBlank() } .toSet() + val queries = + rawChangedTables(tables, throttleMs, triggerImmediately = true).map { + logger.v { "Fetching watch() query: $sql" } + val rows = getAll(sql, parameters = parameters, mapper = mapper) + logger.v { "watch query $sql done, emitting downstream" } + rows + } + emitAll(queries) + } + + private fun rawChangedTables( + tableNames: Set, + throttleMs: Long, + triggerImmediately: Boolean, + ): Flow> = + flow { + val batchedUpdates = AtomicMutableSet() + updatesOnTables() - // onSubscription here is very important. - // This ensures that the initial result and all updates are emitted. .onSubscription { - send(getAll(sql, parameters = parameters, mapper = mapper)) - }.filter { - // Only trigger updates on relevant tables - it.intersect(tables).isNotEmpty() + if (triggerImmediately) { + // Emit an initial event (if requested). No changes would be detected at this point + emit(initialUpdateSentinel) + } + }.transform { updates -> + if (updates === initialUpdateSentinel) { + // This should always be emitted despite being empty and not intersecting with + // the tables we care about. + emit(Unit) + } else { + val intersection = updates.intersect(tableNames) + if (intersection.isNotEmpty()) { + batchedUpdates.addAll(intersection) + emit(Unit) + } + } } // Throttling here is a feature which prevents watch queries from spamming updates. // Throttling by design discards and delays events within the throttle window. Discarded events // still trigger a trailing edge update. // Backpressure is avoided on the throttling and consumer level by buffering the last upstream value. - // Note that the buffered upstream "value" only serves to trigger the getAll query. We don't buffer watch results. .throttle(throttleMs.milliseconds) .collect { - send(getAll(sql, parameters = parameters, mapper = mapper)) + val entries = batchedUpdates.toSetAndClear() + emit(entries) } } @@ -259,6 +261,10 @@ internal class InternalDatabaseImpl( val p2: Long, val p3: Long, ) + + private companion object { + val initialUpdateSentinel = emptySet() + } } /** diff --git a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt index 05327ef8..b9cdff76 100644 --- a/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt +++ b/core/src/commonMain/kotlin/com/powersync/utils/AtomicMutableSet.kt @@ -11,7 +11,7 @@ public class AtomicMutableSet : SynchronizedObject() { return set.add(element) } - public fun addAll(elements: Set): Boolean = + public fun addAll(elements: Collection): Boolean = synchronized(this) { return set.addAll(elements) } diff --git a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt index 82d5c976..965964f4 100644 --- a/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt +++ b/core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt @@ -6,13 +6,21 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flow import kotlin.time.Duration +import kotlin.time.TimeSource /** - * Throttles a flow with emissions on the leading and trailing edge. - * Events, from the incoming flow, during the throttle window are discarded. - * Events are discarded by using a conflated buffer. - * This throttle method acts as a slow consumer, but backpressure is not a concern - * due to the conflated buffer dropping events during the throttle window. + * Throttles an upstream flow. + * + * When a new event is emitted on this (upstream) flow, it is passed on downstream. For each value + * passed downstream, the resulting flow will pause for at least [window] (or longer if emitting + * the value downstream takes longer). + * + * While this flow is paused, no further events are passed downstream. The latest upstream event + * emitted during the pause state is buffered and handled once the pause is over. + * + * In other words, this flow will _drop events_, so it should only be used when the upstream flow + * serves as a notification marker (meaning that something downstream needs to run in response to + * events, but the actual event does not matter). */ internal fun Flow.throttle(window: Duration): Flow = flow { @@ -20,11 +28,12 @@ internal fun Flow.throttle(window: Duration): Flow = val bufferedFlow = this@throttle.buffer(Channel.CONFLATED) bufferedFlow.collect { value -> - // Emit the event immediately (leading edge) + // Pause for the downstream emit or the delay window, whatever is longer + val pauseUntil = TimeSource.Monotonic.markNow() + window emit(value) - // Delay for the throttle window to avoid emitting too frequently - delay(window) + // Negating the duration because we want to pause until pauseUntil has passed. + delay(-pauseUntil.elapsedNow()) // The next incoming event will be provided from the buffer. // The next collect will emit the trailing edge diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt b/core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt index 7be3aaee..13f3e449 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt @@ -15,11 +15,12 @@ internal suspend inline fun waitFor( try { test() return - } catch (_: Error) { + } catch (e: Error) { // Treat exceptions as failed + println("waitFor: failed with $e") } delay(interval) } while (begin.elapsedNow() < timeout) - throw Exception("Timeout reached") + throw Exception("waitFor() Timeout reached") } diff --git a/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt b/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt index 46182096..0157012f 100644 --- a/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/utils/JsonTest.kt @@ -1,10 +1,5 @@ package com.powersync.utils -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.test.runTest import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonNull import kotlinx.serialization.json.JsonObject @@ -23,7 +18,6 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue -import kotlin.time.Duration.Companion.milliseconds class JsonTest { @Test @@ -42,28 +36,6 @@ class JsonTest { assertEquals("test", jsonElement.content) } - @Test - fun testThrottle() { - runTest { - val t = - flow { - emit(1) - delay(10) - emit(2) - delay(20) - emit(3) - delay(100) - emit(4) - }.throttle(100.milliseconds) - .map { - // Adding a delay here to simulate a slow consumer - delay(1000) - it - }.toList() - assertEquals(t, listOf(1, 4)) - } - } - @Test fun testBooleanToJsonElement() { val boolean = JsonParam.Boolean(true) diff --git a/core/src/commonTest/kotlin/com/powersync/utils/ThrottleTest.kt b/core/src/commonTest/kotlin/com/powersync/utils/ThrottleTest.kt new file mode 100644 index 00000000..8aa87b7a --- /dev/null +++ b/core/src/commonTest/kotlin/com/powersync/utils/ThrottleTest.kt @@ -0,0 +1,61 @@ +package com.powersync.utils + +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + +class ThrottleTest { + @Test + fun testThrottle() { + runTest { + val t = + flow { + emit(1) + delay(10) + emit(2) + delay(20) + emit(3) + delay(100) + emit(4) + }.throttle(100.milliseconds) + .map { + // Adding a delay here to simulate a slow consumer + delay(1000) + it + }.toList() + assertEquals(t, listOf(1, 4)) + } + } + + @Test + fun testWaitTimeIsNotAdditive() = + runTest { + val upstream = + flow { + repeat(5) { + emit(it) + delay(10.seconds) + } + } + + // If throttle were to start the delay after the downstream emit completed, it would 11 + // seconds, skipping events. That's not what we want though, the downstream delay is + // long enough to not need additional throttling. + val events = + upstream + .throttle(5.seconds) + .map { + delay(6.seconds) + it + }.toList() + + events shouldBe listOf(0, 1, 2, 3, 4) + } +}