Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.0.0-BETA29 (unreleased)

* Fix potential race condition between jobs in `connect()` and `disconnect()`.
* Fix race condition causing data received during uploads not to be applied.

## 1.0.0-BETA28

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import com.powersync.bucket.BucketPriority
import com.powersync.bucket.Checkpoint
import com.powersync.bucket.OpType
import com.powersync.bucket.OplogEntry
import com.powersync.bucket.WriteCheckpointData
import com.powersync.bucket.WriteCheckpointResponse
import com.powersync.connectors.PowerSyncBackendConnector
import com.powersync.connectors.PowerSyncCredentials
import com.powersync.db.PowerSyncDatabaseImpl
Expand All @@ -24,11 +26,15 @@ import com.powersync.testutils.factory
import com.powersync.testutils.generatePrintLogWriter
import com.powersync.testutils.waitFor
import com.powersync.utils.JsonUtil
import dev.mokkery.answering.calls
import dev.mokkery.answering.returns
import dev.mokkery.everySuspend
import dev.mokkery.matcher.any
import dev.mokkery.mock
import dev.mokkery.verify
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
Expand Down Expand Up @@ -61,6 +67,7 @@ class SyncIntegrationTest {
private lateinit var database: PowerSyncDatabaseImpl
private lateinit var connector: PowerSyncBackendConnector
private lateinit var syncLines: Channel<SyncLine>
private lateinit var checkpointResponse: () -> WriteCheckpointResponse

@BeforeTest
fun setup() {
Expand All @@ -79,6 +86,7 @@ class SyncIntegrationTest {
everySuspend { invalidateCredentials() } returns Unit
}
syncLines = Channel()
checkpointResponse = { WriteCheckpointResponse(WriteCheckpointData("1000")) }

runBlocking {
database.disconnectAndClear(true)
Expand All @@ -100,16 +108,18 @@ class SyncIntegrationTest {
dbFilename = "testdb",
) as PowerSyncDatabaseImpl

@OptIn(DelicateCoroutinesApi::class)
private fun syncStream(): SyncStream {
val client = MockSyncService(syncLines)
val client = MockSyncService(syncLines, { checkpointResponse() })
return SyncStream(
bucketStorage = database.bucketStorage,
connector = connector,
httpEngine = client,
uploadCrud = { },
uploadCrud = { connector.uploadData(database) },
retryDelayMs = 10,
logger = logger,
params = JsonObject(emptyMap()),
scope = GlobalScope,
)
}

Expand Down Expand Up @@ -529,4 +539,107 @@ class SyncIntegrationTest {
database.close()
syncLines.close()
}

@Test
fun `handles checkpoints during uploads`() =
runTest {
database.connectInternal(syncStream(), 1000L)

suspend fun expectUserRows(amount: Int) {
val row = database.get("SELECT COUNT(*) FROM users") { it.getLong(0)!! }
assertEquals(amount, row.toInt())
}

val completeUpload = CompletableDeferred<Unit>()
val uploadStarted = CompletableDeferred<Unit>()
everySuspend { connector.uploadData(any()) } calls { (db: PowerSyncDatabase) ->
val batch = db.getCrudBatch()
if (batch == null) return@calls

uploadStarted.complete(Unit)
completeUpload.await()
batch.complete.invoke(null)
}

// Trigger an upload (adding a keep-alive sync line because the execute could start before the database is fully
// connected).
database.execute("INSERT INTO users (id, name, email) VALUES (uuid(), ?, ?)", listOf("local", "[email protected]"))
syncLines.send(SyncLine.KeepAlive(1234))
expectUserRows(1)
uploadStarted.await()

// Pretend that the connector takes forever in uploadData, but the data gets uploaded before the method returns.
syncLines.send(
SyncLine.FullCheckpoint(
Checkpoint(
writeCheckpoint = "1",
lastOpId = "2",
checksums = listOf(BucketChecksum("a", checksum = 0)),
),
),
)
turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
turbine.waitFor { it.downloading }
turbine.cancel()
}

syncLines.send(
SyncLine.SyncDataBucket(
bucket = "a",
data =
listOf(
OplogEntry(
checksum = 0,
opId = "1",
op = OpType.PUT,
rowId = "1",
rowType = "users",
data = """{"id": "test1", "name": "from local", "email": ""}""",
),
OplogEntry(
checksum = 0,
opId = "2",
op = OpType.PUT,
rowId = "2",
rowType = "users",
data = """{"id": "test1", "name": "additional entry", "email": ""}""",
),
),
after = null,
nextAfter = null,
hasMore = false,
),
)
syncLines.send(SyncLine.CheckpointComplete(lastOpId = "2"))

// Despite receiving a valid checkpoint with two rows, it should not be visible because we have local data.
waitFor {
assertNotNull(
logWriter.logs.find {
it.message.contains("Could not apply checkpoint due to local data")
},
)
}
expectUserCount(1)

// Mark the upload as completed, this should trigger a write_checkpoint.json request
val requestedCheckpoint = CompletableDeferred<Unit>()
checkpointResponse = {
requestedCheckpoint.complete(Unit)
WriteCheckpointResponse(WriteCheckpointData(""))
}
completeUpload.complete(Unit)
requestedCheckpoint.await()

// This should apply the checkpoint
turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
turbine.waitFor { !it.downloading }
turbine.cancel()
}

// Meaning that the two rows are now visible
expectUserCount(2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ internal class PowerSyncDatabaseImpl(
retryDelayMs = retryDelayMs,
logger = logger,
params = params.toJsonObject(),
scope = scope,
),
crudThrottleMs,
)
Expand Down Expand Up @@ -222,7 +223,7 @@ internal class PowerSyncDatabaseImpl(
.filter { it.contains(InternalTable.CRUD.toString()) }
.throttle(crudThrottleMs)
.collect {
stream.triggerCrudUpload()
stream.triggerCrudUploadAsync().join()
}
}
}
Expand Down
101 changes: 68 additions & 33 deletions core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.powersync.sync

import co.touchlab.kermit.Logger
import co.touchlab.stately.concurrency.AtomicBoolean
import co.touchlab.stately.concurrency.AtomicReference
import com.powersync.bucket.BucketChecksum
import com.powersync.bucket.BucketRequest
import com.powersync.bucket.BucketStorage
Expand Down Expand Up @@ -29,9 +29,13 @@ import io.ktor.http.contentType
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.readUTF8Line
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.JsonObject
Expand All @@ -43,9 +47,10 @@ internal class SyncStream(
private val retryDelayMs: Long = 5000L,
private val logger: Logger,
private val params: JsonObject,
private val scope: CoroutineScope,
httpEngine: HttpClientEngine? = null,
) {
private var isUploadingCrud = AtomicBoolean(false)
private var isUploadingCrud = AtomicReference<PendingCrudUpload?>(null)

/**
* The current sync status. This instance is updated as changes occur
Expand Down Expand Up @@ -116,14 +121,20 @@ internal class SyncStream(
}
}

suspend fun triggerCrudUpload() {
if (!status.connected || isUploadingCrud.value) {
return
fun triggerCrudUploadAsync(): Job =
scope.launch {
val thisIteration = PendingCrudUpload(CompletableDeferred())
try {
if (!status.connected || !isUploadingCrud.compareAndSet(null, thisIteration)) {
return@launch
}

uploadAllCrud()
} finally {
isUploadingCrud.set(null)
thisIteration.done.complete(Unit)
}
}
isUploadingCrud.value = true
uploadAllCrud()
isUploadingCrud.value = false
}

private suspend fun uploadAllCrud() {
var checkedCrudItem: CrudEntry? = null
Expand Down Expand Up @@ -153,8 +164,13 @@ internal class SyncStream(
break
}
} catch (e: Exception) {
logger.e { "Error uploading crud: ${e.message}" }
status.update(uploading = false, uploadError = e)

if (e is CancellationException) {
throw e
}

logger.e { "Error uploading crud: ${e.message}" }
delay(retryDelayMs)
break
}
Expand Down Expand Up @@ -237,7 +253,6 @@ internal class SyncStream(
validatedCheckpoint = null,
appliedCheckpoint = null,
bucketSet = initialBuckets.keys.toMutableSet(),
retry = false,
)

bucketEntries.forEach { entry ->
Expand All @@ -253,7 +268,12 @@ internal class SyncStream(

streamingSyncRequest(req).collect { value ->
val line = JsonUtil.json.decodeFromString<SyncLine>(value)

state = handleInstruction(line, value, state)

if (state.abortIteration) {
return@collect
}
}

status.update(downloading = false)
Expand Down Expand Up @@ -314,30 +334,40 @@ internal class SyncStream(
}

private suspend fun handleStreamingSyncCheckpointComplete(state: SyncStreamState): SyncStreamState {
val result = bucketStorage.syncLocalDatabase(state.targetCheckpoint!!)
val checkpoint = state.targetCheckpoint!!
var result = bucketStorage.syncLocalDatabase(checkpoint)
val pending = isUploadingCrud.get()

if (!result.checkpointValid) {
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
delay(50)
state.retry = true
state.abortIteration = true
// TODO handle retries
return state
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
// landing here the whole time
} else {
state.appliedCheckpoint = state.targetCheckpoint!!.clone()
logger.i { "validated checkpoint ${state.appliedCheckpoint}" }
} else if (!result.ready && pending != null) {
// We have pending entries in the local upload queue or are waiting to confirm a write checkpoint, which
// prevented this checkpoint from applying. Wait for that to complete and try again.
logger.d { "Could not apply checkpoint due to local data. Waiting for in-progress upload before retrying." }
pending.done.await()

result = bucketStorage.syncLocalDatabase(checkpoint)
}

state.validatedCheckpoint = state.targetCheckpoint
status.update(
lastSyncedAt = Clock.System.now(),
downloading = false,
hasSynced = true,
clearDownloadError = true,
)
if (result.checkpointValid && result.ready) {
state.appliedCheckpoint = checkpoint.clone()
logger.i { "validated checkpoint ${state.appliedCheckpoint}" }

state.validatedCheckpoint = state.targetCheckpoint
status.update(
lastSyncedAt = Clock.System.now(),
downloading = false,
hasSynced = true,
clearDownloadError = true,
)
} else {
logger.d { "Could not apply checkpoint. Waiting for next sync complete line" }
}

return state
}
Expand All @@ -352,12 +382,12 @@ internal class SyncStream(
// This means checksums failed. Start again with a new checkpoint.
// TODO: better back-off
delay(50)
state.retry = true
state.abortIteration = true
// TODO handle retries
return state
} else if (!result.ready) {
// Checksums valid, but need more data for a consistent checkpoint.
// Continue waiting.
// Checkpoint is valid, but we have local data preventing this to be published. We'll try to resolve this
// once we have a complete checkpoint if the problem persists.
} else {
logger.i { "validated partial checkpoint ${state.appliedCheckpoint} up to priority of $priority" }
}
Expand Down Expand Up @@ -441,10 +471,11 @@ internal class SyncStream(
// Connection would be closed automatically right after this
logger.i { "Token expiring reconnect" }
connector.invalidateCredentials()
state.retry = true
state.abortIteration = true
return state
}
triggerCrudUpload()
// Don't await the upload job, we can keep receiving sync lines
triggerCrudUploadAsync()
return state
}
}
Expand All @@ -454,5 +485,9 @@ internal data class SyncStreamState(
var validatedCheckpoint: Checkpoint?,
var appliedCheckpoint: Checkpoint?,
var bucketSet: MutableSet<String>?,
var retry: Boolean,
var abortIteration: Boolean = false,
)

private class PendingCrudUpload(
val done: CompletableDeferred<Unit>,
)
Loading
Loading