diff --git a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt index 013b08ea..ba5ad0a1 100644 --- a/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt +++ b/core/src/androidMain/kotlin/com/powersync/DatabaseDriverFactory.android.kt @@ -2,7 +2,6 @@ package com.powersync import android.content.Context import androidx.sqlite.db.SupportSQLiteDatabase -import app.cash.sqldelight.async.coroutines.synchronous import app.cash.sqldelight.driver.android.AndroidSqliteDriver import com.powersync.db.internal.InternalSchema import io.requery.android.database.sqlite.RequerySQLiteOpenHelperFactory @@ -37,7 +36,7 @@ public actual class DatabaseDriverFactory( scope: CoroutineScope, dbFilename: String, ): PsSqlDriver { - val schema = InternalSchema.synchronous() + val schema = InternalSchema this.driver = PsSqlDriver( diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index e0778a51..5478d0da 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -1,6 +1,7 @@ package com.powersync.bucket import com.powersync.db.crud.CrudEntry +import com.powersync.db.internal.PowerSyncTransaction import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult @@ -11,8 +12,12 @@ internal interface BucketStorage { suspend fun nextCrudItem(): CrudEntry? + fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry? + suspend fun hasCrud(): Boolean + fun hasCrud(transaction: PowerSyncTransaction): Boolean + suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean suspend fun saveSyncData(syncDataBatch: SyncDataBatch) diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index d6fac1d5..4113f12e 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -1,11 +1,13 @@ package com.powersync.bucket +import app.cash.sqldelight.db.SqlCursor import co.touchlab.kermit.Logger import co.touchlab.stately.concurrency.AtomicBoolean import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.CrudRow import com.powersync.db.internal.InternalDatabase import com.powersync.db.internal.InternalTable +import com.powersync.db.internal.PowerSyncTransaction import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult import com.powersync.utils.JsonUtil @@ -51,29 +53,38 @@ internal class BucketStorageImpl( return id ?: throw IllegalStateException("Client ID not found") } - override suspend fun nextCrudItem(): CrudEntry? { - val crudItem = - db.getOptional("SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1") { cursor -> - CrudEntry.fromRow( - CrudRow( - id = cursor.getString(0)!!, - txId = cursor.getString(1)?.toInt(), - data = cursor.getString(2)!!, - ), - ) - } + override suspend fun nextCrudItem(): CrudEntry? = + db.getOptional(sql = nextCrudQuery, mapper = nextCrudMapper) + + override fun nextCrudItem(transaction: PowerSyncTransaction): CrudEntry? = + transaction.getOptional(sql = nextCrudQuery, mapper = nextCrudMapper) - return crudItem + private val nextCrudQuery = "SELECT id, tx_id, data FROM ${InternalTable.CRUD} ORDER BY id ASC LIMIT 1" + private val nextCrudMapper: (SqlCursor) -> CrudEntry = { cursor -> + CrudEntry.fromRow( + CrudRow( + id = cursor.getString(0)!!, + txId = cursor.getString(1)?.toInt(), + data = cursor.getString(2)!!, + ), + ) } override suspend fun hasCrud(): Boolean { - val res = - db.getOptional("SELECT 1 FROM ps_crud LIMIT 1") { - it.getLong(0)!! - } + val res = db.getOptional(sql = hasCrudQuery, mapper = hasCrudMapper) + return res == 1L + } + + override fun hasCrud(transaction: PowerSyncTransaction): Boolean { + val res = transaction.getOptional(sql = hasCrudQuery, mapper = hasCrudMapper) return res == 1L } + private val hasCrudQuery = "SELECT 1 FROM ps_crud LIMIT 1" + private val hasCrudMapper:(SqlCursor) -> Long = { + it.getLong(0)!! + } + override suspend fun updateLocalTarget(checkpointCallback: suspend () -> String): Boolean { db.getOptional( "SELECT target_op FROM ${InternalTable.BUCKETS} WHERE name = '\$local' AND target_op = ?", @@ -94,7 +105,7 @@ internal class BucketStorageImpl( logger.i { "[updateLocalTarget] Updating target to checkpoint $opId" } return db.writeTransaction { tx -> - if (hasCrud()) { + if (hasCrud(tx)) { logger.w { "[updateLocalTarget] ps crud is not empty" } return@writeTransaction false } diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 9db82c21..0a69d49b 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -1,7 +1,5 @@ package com.powersync.db -import app.cash.sqldelight.async.coroutines.awaitAsList -import app.cash.sqldelight.async.coroutines.awaitAsOne import app.cash.sqldelight.db.SqlCursor import co.touchlab.kermit.Logger import com.powersync.DatabaseDriverFactory @@ -69,7 +67,7 @@ internal class PowerSyncDatabaseImpl( init { runBlocking { - val sqliteVersion = internalDb.queries.sqliteVersion().awaitAsOne() + val sqliteVersion = internalDb.queries.sqliteVersion().executeAsOne() logger.d { "SQLiteVersion: $sqliteVersion" } checkVersion() logger.d { "PowerSyncVersion: ${getPowerSyncVersion()}" } @@ -82,8 +80,8 @@ internal class PowerSyncDatabaseImpl( private suspend fun applySchema() { val schemaJson = JsonUtil.json.encodeToString(schema) - this.writeTransaction { tx -> - internalDb.queries.replaceSchema(schemaJson).awaitAsOne() + internalDb.writeTransaction { + internalDb.queries.replaceSchema(schemaJson).executeAsOne() } } @@ -142,7 +140,7 @@ internal class PowerSyncDatabaseImpl( } val entries = - internalDb.queries.getCrudEntries((limit + 1).toLong()).awaitAsList().map { + internalDb.queries.getCrudEntries((limit + 1).toLong()).executeAsList().map { CrudEntry.fromRow( CrudRow( id = it.id.toString(), @@ -167,9 +165,9 @@ internal class PowerSyncDatabaseImpl( } override suspend fun getNextCrudTransaction(): CrudTransaction? { - return this.readTransaction { + return internalDb.readTransaction { val entry = - bucketStorage.nextCrudItem() + bucketStorage.nextCrudItem(this) ?: return@readTransaction null val txId = entry.transactionId @@ -177,7 +175,7 @@ internal class PowerSyncDatabaseImpl( if (txId == null) { listOf(entry) } else { - internalDb.queries.getCrudEntryByTxId(txId.toLong()).awaitAsList().map { + internalDb.queries.getCrudEntryByTxId(txId.toLong()).executeAsList().map { CrudEntry.fromRow( CrudRow( id = it.id.toString(), @@ -199,7 +197,7 @@ internal class PowerSyncDatabaseImpl( } } - override suspend fun getPowerSyncVersion(): String = internalDb.queries.powerSyncVersion().awaitAsOne() + override suspend fun getPowerSyncVersion(): String = internalDb.queries.powerSyncVersion().executeAsOne() override suspend fun get( sql: String, @@ -225,9 +223,11 @@ internal class PowerSyncDatabaseImpl( mapper: (SqlCursor) -> RowType, ): Flow> = internalDb.watch(sql, parameters, mapper) - override suspend fun readTransaction(callback: suspend (tx: PowerSyncTransaction) -> R): R = internalDb.writeTransaction(callback) + override suspend fun readTransaction(callback: (tx: PowerSyncTransaction) -> R): R = + internalDb.writeTransaction(callback) - override suspend fun writeTransaction(callback: suspend (tx: PowerSyncTransaction) -> R): R = internalDb.writeTransaction(callback) + override suspend fun writeTransaction(callback: (tx: PowerSyncTransaction) -> R): R = + internalDb.writeTransaction(callback) override suspend fun execute( sql: String, @@ -238,16 +238,16 @@ internal class PowerSyncDatabaseImpl( lastTransactionId: Int, writeCheckpoint: String?, ) { - writeTransaction { tx -> + internalDb.writeTransaction { internalDb.queries.deleteEntriesWithIdLessThan(lastTransactionId.toLong()) - if (writeCheckpoint != null && !bucketStorage.hasCrud()) { - tx.execute( + if (writeCheckpoint != null && !bucketStorage.hasCrud(this)) { + execute( "UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'", listOf(writeCheckpoint), ) } else { - tx.execute( + execute( "UPDATE ps_buckets SET target_op = CAST(? AS INTEGER) WHERE name='\$local'", listOf(bucketStorage.getMaxOpId()), ) @@ -275,8 +275,8 @@ internal class PowerSyncDatabaseImpl( override suspend fun disconnectAndClear(clearLocal: Boolean) { disconnect() - this.writeTransaction { - internalDb.queries.powersyncClear(if (clearLocal) "1" else "0").awaitAsOne() + internalDb.writeTransaction { + internalDb.queries.powersyncClear(if (clearLocal) "1" else "0").executeAsOne() } currentStatus.update(lastSyncedAt = null, hasSynced = false) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt index 95497436..ba580d6d 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/Queries.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/Queries.kt @@ -51,7 +51,7 @@ public interface Queries { mapper: (SqlCursor) -> RowType, ): Flow> - public suspend fun writeTransaction(callback: suspend (PowerSyncTransaction) -> R): R + public suspend fun writeTransaction(callback: (PowerSyncTransaction) -> R): R - public suspend fun readTransaction(callback: suspend (PowerSyncTransaction) -> R): R + public suspend fun readTransaction(callback: (PowerSyncTransaction) -> R): R } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt index 672dda4a..ff9dda40 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabase.kt @@ -2,13 +2,11 @@ package com.powersync.db.internal import app.cash.sqldelight.db.Closeable import com.persistence.PowersyncQueries -import com.powersync.PsSqlDriver import com.powersync.db.Queries import com.powersync.persistence.PsDatabase import kotlinx.coroutines.flow.Flow internal interface InternalDatabase : Queries, Closeable { - val driver: PsSqlDriver val transactor: PsDatabase val queries: PowersyncQueries diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt index 956dd092..59549f1a 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalDatabaseImpl.kt @@ -2,8 +2,6 @@ package com.powersync.db.internal import app.cash.sqldelight.ExecutableQuery import app.cash.sqldelight.Query -import app.cash.sqldelight.async.coroutines.awaitAsList -import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull import app.cash.sqldelight.coroutines.asFlow import app.cash.sqldelight.coroutines.mapToList import app.cash.sqldelight.db.QueryResult @@ -14,44 +12,50 @@ import com.powersync.PsSqlDriver import com.powersync.persistence.PsDatabase import com.powersync.utils.JsonUtil import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import kotlinx.serialization.encodeToString @OptIn(FlowPreview::class) internal class InternalDatabaseImpl( - override val driver: PsSqlDriver, + private val driver: PsSqlDriver, private val scope: CoroutineScope, ) : InternalDatabase { override val transactor: PsDatabase = PsDatabase(driver) override val queries: PowersyncQueries = transactor.powersyncQueries + + // Could be scope.coroutineContext, but the default is GlobalScope, which seems like a bad idea. To discuss. + private val dbContext = Dispatchers.IO private val transaction = object : PowerSyncTransaction { - override suspend fun execute( + override fun execute( sql: String, parameters: List?, - ): Long = this@InternalDatabaseImpl.execute(sql, parameters ?: emptyList()) + ): Long = this@InternalDatabaseImpl.executeSync(sql, parameters ?: emptyList()) - override suspend fun get( + override fun get( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): RowType = this@InternalDatabaseImpl.get(sql, parameters ?: emptyList(), mapper) + ): RowType = this@InternalDatabaseImpl.getSync(sql, parameters ?: emptyList(), mapper) - override suspend fun getAll( + override fun getAll( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): List = this@InternalDatabaseImpl.getAll(sql, parameters ?: emptyList(), mapper) + ): List = this@InternalDatabaseImpl.getAllSync(sql, parameters ?: emptyList(), mapper) - override suspend fun getOptional( + override fun getOptional( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, - ): RowType? = this@InternalDatabaseImpl.getOptional(sql, parameters ?: emptyList(), mapper) + ): RowType? = this@InternalDatabaseImpl.getOptionalSync(sql, parameters ?: emptyList(), mapper) } companion object { @@ -78,6 +82,11 @@ internal class InternalDatabaseImpl( override suspend fun execute( sql: String, parameters: List?, + ): Long = withContext(dbContext) { executeSync(sql, parameters) } + + private fun executeSync( + sql: String, + parameters: List?, ): Long { val numParams = parameters?.size ?: 0 @@ -87,13 +96,19 @@ internal class InternalDatabaseImpl( sql = sql, parameters = numParams, binders = getBindersFromParams(parameters), - ).await() + ).value } override suspend fun get( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, + ): RowType = withContext(dbContext) { getSync(sql, parameters, mapper) } + + private fun getSync( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType, ): RowType { val result = this @@ -102,7 +117,7 @@ internal class InternalDatabaseImpl( parameters = parameters?.size ?: 0, binders = getBindersFromParams(parameters), mapper = mapper, - ).awaitAsOneOrNull() + ).executeAsOneOrNull() return requireNotNull(result) { "Query returned no result" } } @@ -110,6 +125,12 @@ internal class InternalDatabaseImpl( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, + ): List = withContext(dbContext) { getAllSync(sql, parameters, mapper) } + + private fun getAllSync( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType, ): List = this .createQuery( @@ -117,12 +138,18 @@ internal class InternalDatabaseImpl( parameters = parameters?.size ?: 0, binders = getBindersFromParams(parameters), mapper = mapper, - ).awaitAsList() + ).executeAsList() override suspend fun getOptional( sql: String, parameters: List?, mapper: (SqlCursor) -> RowType, + ): RowType? = withContext(dbContext) { getOptionalSync(sql, parameters, mapper) } + + private fun getOptionalSync( + sql: String, + parameters: List?, + mapper: (SqlCursor) -> RowType, ): RowType? = this .createQuery( @@ -130,7 +157,7 @@ internal class InternalDatabaseImpl( parameters = parameters?.size ?: 0, binders = getBindersFromParams(parameters), mapper = mapper, - ).awaitAsOneOrNull() + ).executeAsOneOrNull() override fun watch( sql: String, @@ -182,14 +209,18 @@ internal class InternalDatabaseImpl( } } - override suspend fun readTransaction(callback: suspend (PowerSyncTransaction) -> R): R = - transactor.transactionWithResult(noEnclosing = true) { - callback(transaction) + override suspend fun readTransaction(callback: PowerSyncTransaction.() -> R): R = + withContext(dbContext) { + transactor.transactionWithResult(noEnclosing = true) { + callback(transaction) + } } - override suspend fun writeTransaction(callback: suspend (PowerSyncTransaction) -> R): R = - transactor.transactionWithResult(noEnclosing = true) { - callback(transaction) + override suspend fun writeTransaction(callback: PowerSyncTransaction.() -> R): R = + withContext(dbContext) { + transactor.transactionWithResult(noEnclosing = true) { + callback(transaction) + } } // Register callback for table updates diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalSchema.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalSchema.kt index 4625246f..69f62be7 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/InternalSchema.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/InternalSchema.kt @@ -5,16 +5,16 @@ import app.cash.sqldelight.db.QueryResult import app.cash.sqldelight.db.SqlDriver import app.cash.sqldelight.db.SqlSchema -internal object InternalSchema : SqlSchema> { +internal object InternalSchema : SqlSchema> { override val version: Long get() = 1 - override fun create(driver: SqlDriver): QueryResult.AsyncValue = QueryResult.AsyncValue {} + override fun create(driver: SqlDriver): QueryResult.Value = QueryResult.Value(Unit) override fun migrate( driver: SqlDriver, oldVersion: Long, newVersion: Long, vararg callbacks: AfterVersion, - ): QueryResult.AsyncValue = QueryResult.AsyncValue {} + ): QueryResult.Value = QueryResult.Value(Unit) } diff --git a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransaction.kt b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransaction.kt index 6168b75d..7d696aef 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransaction.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/internal/PowerSyncTransaction.kt @@ -3,24 +3,24 @@ package com.powersync.db.internal import app.cash.sqldelight.db.SqlCursor public interface PowerSyncTransaction { - public suspend fun execute( + public fun execute( sql: String, parameters: List? = listOf(), ): Long - public suspend fun getOptional( + public fun getOptional( sql: String, parameters: List? = listOf(), mapper: (SqlCursor) -> RowType, ): RowType? - public suspend fun getAll( + public fun getAll( sql: String, parameters: List? = listOf(), mapper: (SqlCursor) -> RowType, ): List - public suspend fun get( + public fun get( sql: String, parameters: List? = listOf(), mapper: (SqlCursor) -> RowType, diff --git a/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt b/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt index adb40057..9c53e43e 100644 --- a/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt +++ b/core/src/iosMain/kotlin/com/powersync/DatabaseDriverFactory.ios.kt @@ -1,6 +1,5 @@ package com.powersync -import app.cash.sqldelight.async.coroutines.synchronous import app.cash.sqldelight.driver.native.NativeSqliteDriver import app.cash.sqldelight.driver.native.wrapConnection import co.touchlab.sqliter.DatabaseConfiguration @@ -51,7 +50,7 @@ public actual class DatabaseDriverFactory { scope: CoroutineScope, dbFilename: String, ): PsSqlDriver { - val schema = InternalSchema.synchronous() + val schema = InternalSchema this.driver = PsSqlDriver( scope = scope, diff --git a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt index 684baf3e..97a674a4 100644 --- a/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt +++ b/core/src/jvmMain/kotlin/com/powersync/DatabaseDriverFactory.jvm.kt @@ -1,6 +1,5 @@ package com.powersync -import app.cash.sqldelight.async.coroutines.synchronous import com.powersync.db.internal.InternalSchema import kotlinx.coroutines.CoroutineScope import java.nio.file.Path @@ -34,7 +33,7 @@ public actual class DatabaseDriverFactory { scope: CoroutineScope, dbFilename: String, ): PsSqlDriver { - val schema = InternalSchema.synchronous() + val schema = InternalSchema val driver = PSJdbcSqliteDriver( diff --git a/persistence/build.gradle.kts b/persistence/build.gradle.kts index 75b1bdfa..4997bd45 100644 --- a/persistence/build.gradle.kts +++ b/persistence/build.gradle.kts @@ -78,7 +78,6 @@ sqldelight { databases { create("PsDatabase") { packageName.set("com.powersync.persistence") - generateAsync.set(true) dialect(project(":dialect")) } }