diff --git a/CHANGELOG.md b/CHANGELOG.md index fb66b71..4e9ac01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 1.0.0-Beta.8 + +* Improved watch query internals. Added the ability to throttle watched queries. +* Added support for sync bucket priorities. + ## 1.0.0-Beta.7 * Fixed an issue where throwing exceptions in the query `mapper` could cause a runtime crash. diff --git a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index 03c7ebd..f819483 100644 --- a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "25b8cd5d97789d7e497d6a5e0b04419a426018d83f0e80ab6817b213aa976748", + "originHash" : "33297127250b66812faa920958a24bae46bf9e9d1c38ea6b84ca413efaf16afd", "pins" : [ { "identity" : "anycodable", @@ -15,8 +15,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/powersync-ja/powersync-kotlin.git", "state" : { - "revision" : "203db74889df8a20e3c6ac38aede6b0186d2e3b5", - "version" : "1.0.0-BETA23.0" + "revision" : "cb1a7d186144290b5ad706f5c2c9b67ff707356e", + "version" : "1.0.0-BETA27.0" } }, { @@ -24,8 +24,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "state" : { - "revision" : "5de629f7ddc649a1e89c64fde6113fe113fe14de", - "version" : "0.3.9" + "revision" : "fb313c473b17457d79bf3847905f5a288901d493", + "version" : "0.3.11" } }, { diff --git a/Demo/PowerSyncExample/PowerSync/SystemManager.swift b/Demo/PowerSyncExample/PowerSync/SystemManager.swift index 8f88dec..cd0aa0b 100644 --- a/Demo/PowerSyncExample/PowerSync/SystemManager.swift +++ b/Demo/PowerSyncExample/PowerSync/SystemManager.swift @@ -36,16 +36,17 @@ class SystemManager { func watchLists(_ callback: @escaping (_ lists: [ListContent]) -> Void ) async { do { for try await lists in try self.db.watch( - sql: "SELECT * FROM \(LISTS_TABLE)", - parameters: [], - mapper: { cursor in - try ListContent( - id: cursor.getString(name: "id"), - name: cursor.getString(name: "name"), - createdAt: cursor.getString(name: "created_at"), - ownerId: cursor.getString(name: "owner_id") - ) - } + options: WatchOptions( + sql: "SELECT * FROM \(LISTS_TABLE)", + mapper: { cursor in + try ListContent( + id: cursor.getString(name: "id"), + name: cursor.getString(name: "name"), + createdAt: cursor.getString(name: "created_at"), + ownerId: cursor.getString(name: "owner_id") + ) + } + ) ) { callback(lists) } @@ -55,7 +56,7 @@ class SystemManager { } func insertList(_ list: NewListContent) async throws { - _ = try await self.db.execute( + let result = try await self.db.execute( sql: "INSERT INTO \(LISTS_TABLE) (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)", parameters: [list.name, connector.currentUserID] ) diff --git a/Package.resolved b/Package.resolved index 621389f..7d54c7b 100644 --- a/Package.resolved +++ b/Package.resolved @@ -5,8 +5,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/powersync-ja/powersync-kotlin.git", "state" : { - "revision" : "0541a4744088ea24084c47c158ab116db35f9345", - "version" : "1.0.0-BETA26.0" + "revision" : "cb1a7d186144290b5ad706f5c2c9b67ff707356e", + "version" : "1.0.0-BETA27.0" } }, { diff --git a/Package.swift b/Package.swift index 072869f..c1d32e7 100644 --- a/Package.swift +++ b/Package.swift @@ -16,7 +16,7 @@ let package = Package( targets: ["PowerSync"]), ], dependencies: [ - .package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA26.0"), + .package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA27.0"), .package(url: "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "0.3.11"..<"0.4.0") ], targets: [ diff --git a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift index 020d202..d7f6139 100644 --- a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift +++ b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift @@ -25,7 +25,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { func waitForFirstSync() async throws { try await kotlinDatabase.waitForFirstSync() } - + func waitForFirstSync(priority: Int32) async throws { try await kotlinDatabase.waitForFirstSync(priority: priority) } @@ -165,38 +165,42 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { parameters: [Any]?, mapper: @escaping (SqlCursor) -> RowType ) throws -> AsyncThrowingStream<[RowType], Error> { - AsyncThrowingStream { continuation in - Task { - do { - for await values in try self.kotlinDatabase.watch( - sql: sql, - parameters: parameters, - mapper: mapper - ) { - try continuation.yield(safeCast(values, to: [RowType].self)) - } - continuation.finish() - } catch { - continuation.finish(throwing: error) - } - } - } + try watch(options: WatchOptions(sql: sql, parameters: parameters, mapper: mapper)) } func watch( sql: String, parameters: [Any]?, mapper: @escaping (SqlCursor) throws -> RowType + ) throws -> AsyncThrowingStream<[RowType], Error> { + try watch(options: WatchOptions(sql: sql, parameters: parameters, mapper: mapper)) + } + + func watch( + options: WatchOptions ) throws -> AsyncThrowingStream<[RowType], Error> { AsyncThrowingStream { continuation in Task { do { var mapperError: Error? + // HACK! + // SKIEE doesn't support custom exceptions in Flows + // Exceptions which occur in the Flow itself cause runtime crashes. + // The most probable crash would be the internal EXPLAIN statement. + // This attempts to EXPLAIN the query before passing it to Kotlin + // We could introduce an onChange API in Kotlin which we use to implement watches here. + // This would prevent most issues with exceptions. + _ = try await self.kotlinDatabase.getAll( + sql: "EXPLAIN \(options.sql)", + parameters: options.parameters, + mapper: { _ in "" } + ) for try await values in try self.kotlinDatabase.watch( - sql: sql, - parameters: parameters, + sql: options.sql, + parameters: options.parameters, + throttleMs: KotlinLong(value: options.throttleMs), mapper: { cursor in do { - return try mapper(cursor) + return try options.mapper(cursor) } catch { mapperError = error // The value here does not matter. We will throw the exception later @@ -217,12 +221,11 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - public func writeTransaction(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R { + func writeTransaction(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R { return try safeCast(await kotlinDatabase.writeTransaction(callback: TransactionCallback(callback: callback)), to: R.self) } - public func readTransaction(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R { + func readTransaction(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R { return try safeCast(await kotlinDatabase.readTransaction(callback: TransactionCallback(callback: callback)), to: R.self) } } - diff --git a/Sources/PowerSync/QueriesProtocol.swift b/Sources/PowerSync/QueriesProtocol.swift index fd76b39..0f1bbab 100644 --- a/Sources/PowerSync/QueriesProtocol.swift +++ b/Sources/PowerSync/QueriesProtocol.swift @@ -1,7 +1,23 @@ -import Foundation import Combine +import Foundation import PowerSyncKotlin +public let DEFAULT_WATCH_THROTTLE_MS = Int64(30) + +public struct WatchOptions { + public var sql: String + public var parameters: [Any] + public var throttleMs: Int64 + public var mapper: (SqlCursor) throws -> RowType + + public init(sql: String, parameters: [Any]? = [], throttleMs: Int64? = DEFAULT_WATCH_THROTTLE_MS, mapper: @escaping (SqlCursor) throws -> RowType) { + self.sql = sql + self.parameters = parameters ?? [] // Default to empty array if nil + self.throttleMs = throttleMs ?? DEFAULT_WATCH_THROTTLE_MS // Default to the constant if nil + self.mapper = mapper + } +} + public protocol Queries { /// Execute a write query (INSERT, UPDATE, DELETE) /// Using `RETURNING *` will result in an error. @@ -69,6 +85,10 @@ public protocol Queries { mapper: @escaping (SqlCursor) throws -> RowType ) throws -> AsyncThrowingStream<[RowType], Error> + func watch( + options: WatchOptions + ) throws -> AsyncThrowingStream<[RowType], Error> + /// Execute a write transaction with the given callback func writeTransaction(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R @@ -76,33 +96,33 @@ public protocol Queries { func readTransaction(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R } -extension Queries { - public func execute(_ sql: String) async throws -> Int64 { +public extension Queries { + func execute(_ sql: String) async throws -> Int64 { return try await execute(sql: sql, parameters: []) } - public func get( + func get( _ sql: String, mapper: @escaping (SqlCursor) -> RowType ) async throws -> RowType { return try await get(sql: sql, parameters: [], mapper: mapper) } - public func getAll( + func getAll( _ sql: String, mapper: @escaping (SqlCursor) -> RowType ) async throws -> [RowType] { return try await getAll(sql: sql, parameters: [], mapper: mapper) } - public func getOptional( + func getOptional( _ sql: String, mapper: @escaping (SqlCursor) -> RowType ) async throws -> RowType? { return try await getOptional(sql: sql, parameters: [], mapper: mapper) } - public func watch( + func watch( _ sql: String, mapper: @escaping (SqlCursor) -> RowType ) throws -> AsyncThrowingStream<[RowType], Error> { diff --git a/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift b/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift index ef2eaf1..872c570 100644 --- a/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift +++ b/Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift @@ -198,13 +198,13 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase { // Create an actor to handle concurrent mutations actor ResultsStore { - private var results: [[String]] = [] + private var results: Set = [] func append(_ names: [String]) { - results.append(names) + results.formUnion(names) } - func getResults() -> [[String]] { + func getResults() -> Set { results } @@ -213,14 +213,16 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase { } } + let resultsStore = ResultsStore() let stream = try database.watch( - sql: "SELECT name FROM users ORDER BY id", - parameters: nil - ) { cursor in - cursor.getString(index: 0)! - } + options: WatchOptions( + sql: "SELECT name FROM users ORDER BY id", + mapper: { cursor in + cursor.getString(index: 0)! + } + )) let watchTask = Task { for try await names in stream { @@ -240,13 +242,18 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase { sql: "INSERT INTO users (id, name, email) VALUES (?, ?, ?)", parameters: ["2", "User 2", "user2@example.com"] ) + await fulfillment(of: [expectation], timeout: 5) watchTask.cancel() let finalResults = await resultsStore.getResults() - XCTAssertEqual(finalResults.count, 2) - XCTAssertEqual(finalResults[1], ["User 1", "User 2"]) + // The count of invocations here can vary a lot depending on the order of execution + // In some cases the creation of the users can fire before the initial watched query + // has emitted a result. + // However the watched query should always emit the latest result set. + XCTAssertLessThanOrEqual(finalResults.count, 3) + XCTAssertEqual(finalResults, ["User 1", "User 2"]) } func testWatchError() async throws {