Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ jobs:
if: runner.os == 'macOS'
uses: maxim-lobanov/setup-xcode@v1
with:
xcode-version: latest-stable
# TODO: Update to latest-stable once GH installs iOS 26 simulators
xcode-version: '^16.4.0'

- name: Build and run tests with Gradle
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class AttachmentsTest {
* immediately be deleted.
*/
archivedCacheLimit = 0,
logger = logger,
)

doOnCleanup {
Expand Down Expand Up @@ -175,7 +176,7 @@ class AttachmentsTest {
val exists = queue.localStorage.fileExists(localUri)
exists shouldBe false

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}

Expand Down Expand Up @@ -204,6 +205,7 @@ class AttachmentsTest {
* immediately be deleted.
*/
archivedCacheLimit = 0,
logger = logger,
)

doOnCleanup {
Expand Down Expand Up @@ -284,7 +286,7 @@ class AttachmentsTest {
// The file should have been deleted from storage
queue.localStorage.fileExists(localUri) shouldBe false

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}

Expand Down Expand Up @@ -314,6 +316,7 @@ class AttachmentsTest {
*/
archivedCacheLimit = 0,
syncThrottleDuration = 0.seconds,
logger = logger,
)

doOnCleanup {
Expand Down Expand Up @@ -382,7 +385,7 @@ class AttachmentsTest {
)
}

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}

Expand Down Expand Up @@ -411,6 +414,7 @@ class AttachmentsTest {
* Keep some items in the cache
*/
archivedCacheLimit = 10,
logger = logger,
)

doOnCleanup {
Expand Down Expand Up @@ -493,7 +497,7 @@ class AttachmentsTest {
attachmentRecord = attachmentQuery.awaitItem().first()
attachmentRecord.state shouldBe AttachmentState.SYNCED

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}

Expand Down Expand Up @@ -537,6 +541,7 @@ class AttachmentsTest {
exception: Exception,
): Boolean = false
},
logger = logger,
)
doOnCleanup {
queue.stopSyncing()
Expand Down Expand Up @@ -574,7 +579,7 @@ class AttachmentsTest {

attachmentRecord.state shouldBe AttachmentState.ARCHIVED

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,4 +913,19 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
query.cancelAndIgnoreRemainingEvents()
}
}

@Test
fun `ends iteration on http close`() =
databaseTest {
turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
database.connect(TestConnector(), options = getOptions())
turbine.waitFor { it.connected }

syncLines.close()
turbine.waitFor { !it.connected }

turbine.cancelAndIgnoreRemainingEvents()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ public open class AttachmentQueue(
* Use a lock here to prevent conflicting state updates.
*/
attachmentsService.withContext { attachmentsContext ->
logger.v { "processWatchedAttachments($items)" }

/**
* Need to get all the attachments which are tracked in the DB.
* We might need to restore an archived attachment.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.powersync.attachments.implementation

import co.touchlab.kermit.Logger
import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.PowerSyncDatabase
import com.powersync.attachments.Attachment
import com.powersync.attachments.AttachmentContext
import com.powersync.attachments.AttachmentState
import com.powersync.db.getString
import com.powersync.db.internal.ConnectionContext
import kotlinx.serialization.json.Json
import com.powersync.db.runWrapped
import kotlin.time.Clock

/**
Expand Down Expand Up @@ -131,6 +132,7 @@ public open class AttachmentContextImpl(
* @returns true if all items have been deleted. Returns false if there might be more archived
* items remaining.
*/
@OptIn(ExperimentalPowerSyncAPI::class)
public override suspend fun deleteArchivedAttachments(callback: suspend (attachments: List<Attachment>) -> Unit): Boolean {
// First fetch the attachments in order to allow other cleanup
val limit = 1000
Expand All @@ -154,12 +156,21 @@ public open class AttachmentContextImpl(
),
) { Attachment.fromCursor(it) }
callback(attachments)
db.execute(
"DELETE FROM $table WHERE id IN (SELECT value FROM json_each(?));",
listOf(
Json.encodeToString(attachments.map { it.id }),
),
)

runWrapped {
db.useConnection(readOnly = false) { conn ->
conn.usePrepared("DELETE FROM $table WHERE id = ?") { stmt ->
for (attachment in attachments) {
stmt.bindText(1, attachment.id)
stmt.step()

stmt.reset()
stmt.clearBindings()
}
}
}
}

return attachments.size < limit
}

Expand Down
35 changes: 30 additions & 5 deletions core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.powersync.sync.Instruction
import com.powersync.sync.LegacySyncImplementation
import com.powersync.sync.SyncDataBatch
import com.powersync.sync.SyncLocalDatabaseResult
import com.powersync.utils.JsonUtil
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonObject

Expand Down Expand Up @@ -55,25 +56,49 @@ internal interface BucketStorage {
}

internal sealed interface PowerSyncControlArguments {
/**
* Returns the arguments for the `powersync_control` SQL invocation.
*/
val sqlArguments: Pair<String, Any?>

@Serializable
class Start(
val parameters: JsonObject,
val schema: SerializableSchema,
) : PowerSyncControlArguments
) : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?>
get() = "start" to JsonUtil.json.encodeToString(this)
}

data object Stop : PowerSyncControlArguments
data object Stop : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "stop" to null
}

data class TextLine(
val line: String,
) : PowerSyncControlArguments
) : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "line_text" to line
}

class BinaryLine(
val line: ByteArray,
line: ByteArray,
) : PowerSyncControlArguments {
override fun toString(): String = "BinaryLine"

override val sqlArguments: Pair<String, Any?> = "line_binary" to line
}

data object CompletedUpload : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "completed_upload" to null
}

data object CompletedUpload : PowerSyncControlArguments
data object ConnectionEstablished : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "connection" to "established"
}

data object ResponseStreamEnd : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "connection" to "end"
}
}

@Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,7 @@ internal class BucketStorageImpl(
db.writeTransaction { tx ->
logger.v { "powersync_control: $args" }

val (op: String, data: Any?) =
when (args) {
is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args)
PowerSyncControlArguments.Stop -> "stop" to null

PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null

is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line
is PowerSyncControlArguments.TextLine -> "line_text" to args.line
}

val (op: String, data: Any?) = args.sqlArguments
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult)
}
}
5 changes: 4 additions & 1 deletion core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,14 @@ internal class SyncStream(
throw RuntimeException("Received error when connecting to sync stream: ${httpResponse.bodyAsText()}")
}

status.update { copy(connected = true, connecting = false) }
block(isBson, httpResponse)
}
}

private fun receiveTextLines(req: JsonElement): Flow<String> =
flow {
connectToSyncEndpoint(req, supportBson = false) { isBson, response ->
status.update { copy(connected = true, connecting = false) }
check(!isBson)

emitAll(response.body<ByteReadChannel>().lines())
Expand All @@ -316,13 +316,16 @@ internal class SyncStream(
private fun receiveTextOrBinaryLines(req: JsonElement): Flow<PowerSyncControlArguments> =
flow {
connectToSyncEndpoint(req, supportBson = false) { isBson, response ->
emit(PowerSyncControlArguments.ConnectionEstablished)
val body = response.body<ByteReadChannel>()

if (isBson) {
emitAll(body.bsonObjects().map { PowerSyncControlArguments.BinaryLine(it) })
} else {
emitAll(body.lines().map { PowerSyncControlArguments.TextLine(it) })
}

emit(PowerSyncControlArguments.ResponseStreamEnd)
}
}

Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ kotlinx-datetime = "0.7.1"
kotlinx-io = "0.8.0"
ktor = "3.2.3"
uuid = "0.8.4"
powersync-core = "0.4.5"
powersync-core = "0.4.6"
turbine = "1.2.1"
kotest = "5.9.1" # we can't upgrade to 6.x because that requires Java 11 or above (we need Java 8 support)

Expand Down
Loading