Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Remove internal SQLDelight and SQLiter dependencies.
* Add `rawConnection` getter to `ConnectionContext`, which is a `SQLiteConnection` instance from
`androidx.sqlite` that can be used to step through statements in a custom way.
* Fix an issue where `watch()` would run queries more often than intended.

## 1.5.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import com.powersync.testutils.MockedRemoteStorage
import com.powersync.testutils.UserRow
import com.powersync.testutils.databaseTest
import com.powersync.testutils.getTempDir
import com.powersync.testutils.waitFor
import dev.mokkery.answering.throws
import dev.mokkery.everySuspend
import dev.mokkery.matcher.ArgMatchersScope
Expand All @@ -29,6 +28,7 @@ import dev.mokkery.verifySuspend
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.io.files.Path
Expand Down Expand Up @@ -60,7 +60,17 @@ class AttachmentsTest {
database
.watch("SELECT * FROM attachments") {
Attachment.fromCursor(it)
}.onEach { logger.i { "attachments table results: $it" } }
}
// Because tests run on slow machines, it's possible for a schedule like the following
// to happen:
// 1. the attachment is initially saved with QUEUED_DOWNLOAD, triggering a query.
// 2. the attachment is downloaded fast, but the query flow is paused.
// 3. only now is the query scheduled by 1 actually running, reporting SYNCED.
// 4. we delete the attachment.
// 5. thanks to 2, the query runs again, again reporting SYNCED.
// 6. Our test now fails because the second event should be ARCHIVED.
.distinctUntilChanged()
.onEach { logger.i { "attachments table results: $it" } }

suspend fun updateSchema(db: PowerSyncDatabase) {
db.updateSchema(
Expand Down Expand Up @@ -278,10 +288,13 @@ class AttachmentsTest {
""",
)

waitFor {
var nextRecord: Attachment? = attachmentQuery.awaitItem().firstOrNull()
// The record should have been deleted
nextRecord shouldBe null
while (true) {
val item = attachmentQuery.awaitItem()
if (item.isEmpty()) {
break
}

logger.v { "Waiting for attachment record to be deleted (current $item)" }
}

// The file should have been deleted from storage
Expand Down Expand Up @@ -346,10 +359,13 @@ class AttachmentsTest {
database.get("SELECT photo_id FROM users") { it.getString("photo_id") }

// Wait for the record to be synced (mocked backend will allow it)
waitFor {
val record = attachmentQuery.awaitItem().first()
record shouldNotBe null
record.state shouldBe AttachmentState.SYNCED
while (true) {
val item = attachmentQuery.awaitItem().firstOrNull()
if (item != null && item.state == AttachmentState.SYNCED) {
break
}

logger.v { "Waiting for attachment record to be synced (current $item)" }
}

queue.deleteFile(
Expand All @@ -369,10 +385,14 @@ class AttachmentsTest {
)
}

waitFor {
// Record should be deleted
val record = attachmentQuery.awaitItem().firstOrNull()
record shouldBe null
// Record should be deleted
while (true) {
val item = attachmentQuery.awaitItem()
if (item.isEmpty()) {
break
}

logger.v { "Waiting for attachment record to be deleted (current $item)" }
}

// A delete should have been attempted for this file
Expand Down Expand Up @@ -559,14 +579,16 @@ class AttachmentsTest {
""",
)

// Depending on when the query updates, we'll see the attachment as queued for
// download or archived.
var attachmentRecord = attachmentQuery.awaitItem().first()
attachmentRecord shouldNotBe null

attachmentRecord.state shouldBe AttachmentState.QUEUED_DOWNLOAD
if (attachmentRecord.state == AttachmentState.QUEUED_DOWNLOAD) {
attachmentRecord = attachmentQuery.awaitItem().first()
}

// The download should fail. We don't specify a retry. The record should be archived.
attachmentRecord = attachmentQuery.awaitItem().first()

attachmentRecord.state shouldBe AttachmentState.ARCHIVED

attachmentQuery.cancelAndIgnoreRemainingEvents()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,10 @@ class DatabaseTest {

var changeSet = query.awaitItem()
// The initial result
changeSet.count() shouldBe 0
changeSet shouldHaveSize 0

changeSet = query.awaitItem()
changeSet.count() shouldBe 1
changeSet.contains("users") shouldBe true
changeSet shouldBe setOf("users")

query.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ abstract class BaseSyncIntegrationTest(

database.close()
turbine.waitFor { !it.connected }
turbine.cancel()
turbine.cancelAndIgnoreRemainingEvents()
}

// Closing the database should have closed the channel.
logger.v { "Database is closed, waiting to close HTTP stream" }
waitFor { syncLines.isClosedForSend shouldBe true }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public open class SyncingService(
*/
try {
val attachments = context.getActiveAttachments()
logger.v { "Processing active attachments: $attachments" }

// Performs pending operations and updates attachment states
handleSync(attachments, context)

Expand Down Expand Up @@ -298,6 +300,8 @@ public open class SyncingService(
*/
public suspend fun deleteArchivedAttachments(context: AttachmentContext): Boolean =
context.deleteArchivedAttachments { pendingDelete ->
logger.v { "Deleting archived attachments: $pendingDelete" }

for (attachment in pendingDelete) {
if (attachment.localUri == null) {
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ internal class PowerSyncDatabaseImpl(

private val resource = activeDatabaseGroup.first

private val internalDb = InternalDatabaseImpl(pool)
private val internalDb = InternalDatabaseImpl(pool, logger)

internal val bucketStorage: BucketStorage = BucketStorageImpl(internalDb, logger)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.powersync.db.internal

import co.touchlab.kermit.Logger
import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.PowerSyncException
import com.powersync.db.SqlCursor
import com.powersync.db.ThrowableLockCallback
import com.powersync.db.ThrowableTransactionCallback
Expand All @@ -15,8 +15,9 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.withContext
Expand All @@ -25,6 +26,7 @@ import kotlin.time.Duration.Companion.milliseconds
@OptIn(ExperimentalPowerSyncAPI::class)
internal class InternalDatabaseImpl(
private val pool: SQLiteConnectionPool,
private val logger: Logger,
) : InternalDatabase {
// Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss.
private val dbContext = Dispatchers.IO
Expand Down Expand Up @@ -79,74 +81,74 @@ internal class InternalDatabaseImpl(
tables: Set<String>,
throttleMs: Long,
triggerImmediately: Boolean,
): Flow<Set<String>> =
channelFlow {
// Match all possible internal table combinations
val watchedTables =
tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet()

// Accumulate updates between throttles
val batchedUpdates = AtomicMutableSet<String>()
): Flow<Set<String>> {
// Match all possible internal table combinations
val watchedTables =
tables.flatMap { listOf(it, "ps_data__$it", "ps_data_local__$it") }.toSet()

updatesOnTables()
.onSubscription {
if (triggerImmediately) {
// Emit an initial event (if requested). No changes would be detected at this point
send(setOf())
}
}.transform { updates ->
val intersection = updates.intersect(watchedTables)
if (intersection.isNotEmpty()) {
// Transform table names using friendlyTableName
val friendlyTableNames = intersection.map { friendlyTableName(it) }.toSet()
batchedUpdates.addAll(friendlyTableNames)
emit(Unit)
}
}
// Throttling here is a feature which prevents watch queries from spamming updates.
// Throttling by design discards and delays events within the throttle window. Discarded events
// still trigger a trailing edge update.
// Backpressure is avoided on the throttling and consumer level by buffering the last upstream value.
.throttle(throttleMs.milliseconds)
.collect {
// Emit the transformed tables which have changed
val copy = batchedUpdates.toSetAndClear()
send(copy)
}
return rawChangedTables(watchedTables, throttleMs, triggerImmediately).map {
it.mapTo(mutableSetOf(), ::friendlyTableName)
}
}

override fun <RowType : Any> watch(
sql: String,
parameters: List<Any?>?,
throttleMs: Long,
mapper: (SqlCursor) -> RowType,
): Flow<List<RowType>> =
// Use a channel flow here since we throttle (buffer used under the hood)
// This causes some emissions to be from different scopes.
channelFlow {
flow {
// Fetch the tables asynchronously with getAll
val tables =
getSourceTables(sql, parameters)
.filter { it.isNotBlank() }
.toSet()

val queries =
rawChangedTables(tables, throttleMs, triggerImmediately = true).map {
logger.v { "Fetching watch() query: $sql" }
val rows = getAll(sql, parameters = parameters, mapper = mapper)
logger.v { "watch query $sql done, emitting downstream" }
rows
}
emitAll(queries)
}

private fun rawChangedTables(
tableNames: Set<String>,
throttleMs: Long,
triggerImmediately: Boolean,
): Flow<Set<String>> =
flow {
val batchedUpdates = AtomicMutableSet<String>()

updatesOnTables()
// onSubscription here is very important.
// This ensures that the initial result and all updates are emitted.
.onSubscription {
send(getAll(sql, parameters = parameters, mapper = mapper))
}.filter {
// Only trigger updates on relevant tables
it.intersect(tables).isNotEmpty()
if (triggerImmediately) {
// Emit an initial event (if requested). No changes would be detected at this point
emit(initialUpdateSentinel)
}
}.transform { updates ->
if (updates === initialUpdateSentinel) {
// This should always be emitted despite being empty and not intersecting with
// the tables we care about.
emit(Unit)
} else {
val intersection = updates.intersect(tableNames)
if (intersection.isNotEmpty()) {
batchedUpdates.addAll(intersection)
emit(Unit)
}
}
}
// Throttling here is a feature which prevents watch queries from spamming updates.
// Throttling by design discards and delays events within the throttle window. Discarded events
// still trigger a trailing edge update.
// Backpressure is avoided on the throttling and consumer level by buffering the last upstream value.
// Note that the buffered upstream "value" only serves to trigger the getAll query. We don't buffer watch results.
.throttle(throttleMs.milliseconds)
.collect {
send(getAll(sql, parameters = parameters, mapper = mapper))
val entries = batchedUpdates.toSetAndClear()
emit(entries)
}
}

Expand Down Expand Up @@ -259,6 +261,10 @@ internal class InternalDatabaseImpl(
val p2: Long,
val p3: Long,
)

private companion object {
val initialUpdateSentinel = emptySet<String>()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class AtomicMutableSet<T> : SynchronizedObject() {
return set.add(element)
}

public fun addAll(elements: Set<T>): Boolean =
public fun addAll(elements: Collection<T>): Boolean =
synchronized(this) {
return set.addAll(elements)
}
Expand Down
25 changes: 17 additions & 8 deletions core/src/commonMain/kotlin/com/powersync/utils/ThrottleFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,34 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.flow
import kotlin.time.Duration
import kotlin.time.TimeSource

/**
* Throttles a flow with emissions on the leading and trailing edge.
* Events, from the incoming flow, during the throttle window are discarded.
* Events are discarded by using a conflated buffer.
* This throttle method acts as a slow consumer, but backpressure is not a concern
* due to the conflated buffer dropping events during the throttle window.
* Throttles an upstream flow.
*
* When a new event is emitted on this (upstream) flow, it is passed on downstream. For each value
* passed downstream, the resulting flow will pause for at least [window] (or longer if emitting
* the value downstream takes longer).
*
* While this flow is paused, no further events are passed downstream. The latest upstream event
* emitted during the pause state is buffered and handled once the pause is over.
*
* In other words, this flow will _drop events_, so it should only be used when the upstream flow
* serves as a notification marker (meaning that something downstream needs to run in response to
* events, but the actual event does not matter).
*/
internal fun <T> Flow<T>.throttle(window: Duration): Flow<T> =
flow {
// Use a buffer before throttle (ensure only the latest event is kept)
val bufferedFlow = [email protected](Channel.CONFLATED)

bufferedFlow.collect { value ->
// Emit the event immediately (leading edge)
// Pause for the downstream emit or the delay window, whatever is longer
val pauseUntil = TimeSource.Monotonic.markNow() + window
emit(value)

// Delay for the throttle window to avoid emitting too frequently
delay(window)
// Negating the duration because we want to pause until pauseUntil has passed.
delay(-pauseUntil.elapsedNow())

// The next incoming event will be provided from the buffer.
// The next collect will emit the trailing edge
Expand Down
5 changes: 3 additions & 2 deletions core/src/commonTest/kotlin/com/powersync/testutils/WaitFor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ internal suspend inline fun waitFor(
try {
test()
return
} catch (_: Error) {
} catch (e: Error) {
// Treat exceptions as failed
println("waitFor: failed with $e")
}
delay(interval)
} while (begin.elapsedNow() < timeout)

throw Exception("Timeout reached")
throw Exception("waitFor() Timeout reached")
}
Loading
Loading