Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 12 additions & 11 deletions Demo/PowerSyncExample/PowerSync/SystemManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,17 @@ class SystemManager {
func watchLists(_ callback: @escaping (_ lists: [ListContent]) -> Void ) async {
do {
for try await lists in try self.db.watch<ListContent>(
sql: "SELECT * FROM \(LISTS_TABLE)",
parameters: [],
mapper: { cursor in
try ListContent(
id: cursor.getString(name: "id"),
name: cursor.getString(name: "name"),
createdAt: cursor.getString(name: "created_at"),
ownerId: cursor.getString(name: "owner_id")
)
}
options: WatchOptions(
sql: "SELECT * FROM \(LISTS_TABLE)",
mapper: { cursor in
try ListContent(
id: cursor.getString(name: "id"),
name: cursor.getString(name: "name"),
createdAt: cursor.getString(name: "created_at"),
ownerId: cursor.getString(name: "owner_id")
)
}
)
) {
callback(lists)
}
Expand All @@ -55,7 +56,7 @@ class SystemManager {
}

func insertList(_ list: NewListContent) async throws {
_ = try await self.db.execute(
let result = try await self.db.execute(
sql: "INSERT INTO \(LISTS_TABLE) (id, created_at, name, owner_id) VALUES (uuid(), datetime(), ?, ?)",
parameters: [list.name, connector.currentUserID]
)
Expand Down
4 changes: 2 additions & 2 deletions Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ let package = Package(
targets: ["PowerSync"]),
],
dependencies: [
.package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA26.0"),
.package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA27.0"),
.package(url: "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "0.3.11"..<"0.4.0")
],
targets: [
Expand Down
49 changes: 26 additions & 23 deletions Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
func waitForFirstSync() async throws {
try await kotlinDatabase.waitForFirstSync()
}

func waitForFirstSync(priority: Int32) async throws {
try await kotlinDatabase.waitForFirstSync(priority: priority)
}
Expand Down Expand Up @@ -165,38 +165,42 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
parameters: [Any]?,
mapper: @escaping (SqlCursor) -> RowType
) throws -> AsyncThrowingStream<[RowType], Error> {
AsyncThrowingStream { continuation in
Task {
do {
for await values in try self.kotlinDatabase.watch(
sql: sql,
parameters: parameters,
mapper: mapper
) {
try continuation.yield(safeCast(values, to: [RowType].self))
}
continuation.finish()
} catch {
continuation.finish(throwing: error)
}
}
}
try watch(options: WatchOptions(sql: sql, parameters: parameters, mapper: mapper))
}

func watch<RowType>(
sql: String,
parameters: [Any]?,
mapper: @escaping (SqlCursor) throws -> RowType
) throws -> AsyncThrowingStream<[RowType], Error> {
try watch(options: WatchOptions(sql: sql, parameters: parameters, mapper: mapper))
}

func watch<RowType>(
options: WatchOptions<RowType>
) throws -> AsyncThrowingStream<[RowType], Error> {
AsyncThrowingStream { continuation in
Task {
do {
var mapperError: Error?
// HACK!
// SKIEE doesn't support custom exceptions in Flows
// Exceptions which occur in the Flow itself cause runtime crashes.
// The most probable crash would be the internal EXPLAIN statement.
// This attempts to EXPLAIN the query before passing it to Kotlin
// We could introduce an onChange API in Kotlin which we use to implement watches here.
// This would prevent most issues with exceptions.
_ = try await self.kotlinDatabase.getAll(
sql: "EXPLAIN \(options.sql)",
parameters: options.parameters,
mapper: { _ in "" }
)
for try await values in try self.kotlinDatabase.watch(
sql: sql,
parameters: parameters,
sql: options.sql,
parameters: options.parameters,
throttleMs: KotlinLong(value: options.throttleMs),
mapper: { cursor in do {
return try mapper(cursor)
return try options.mapper(cursor)
} catch {
mapperError = error
// The value here does not matter. We will throw the exception later
Expand All @@ -217,12 +221,11 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
}
}

public func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
return try safeCast(await kotlinDatabase.writeTransaction(callback: TransactionCallback(callback: callback)), to: R.self)
}

public func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R {
return try safeCast(await kotlinDatabase.readTransaction(callback: TransactionCallback(callback: callback)), to: R.self)
}
}

34 changes: 27 additions & 7 deletions Sources/PowerSync/QueriesProtocol.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import Foundation
import Combine
import Foundation
import PowerSyncKotlin

public let DEFAULT_WATCH_THROTTLE_MS = Int64(30)

public struct WatchOptions<RowType> {
public var sql: String
public var parameters: [Any]
public var throttleMs: Int64
public var mapper: (SqlCursor) throws -> RowType

public init(sql: String, parameters: [Any]? = [], throttleMs: Int64? = DEFAULT_WATCH_THROTTLE_MS, mapper: @escaping (SqlCursor) throws -> RowType) {
self.sql = sql
self.parameters = parameters ?? [] // Default to empty array if nil
self.throttleMs = throttleMs ?? DEFAULT_WATCH_THROTTLE_MS // Default to the constant if nil
self.mapper = mapper
}
}

public protocol Queries {
/// Execute a write query (INSERT, UPDATE, DELETE)
/// Using `RETURNING *` will result in an error.
Expand Down Expand Up @@ -69,40 +85,44 @@ public protocol Queries {
mapper: @escaping (SqlCursor) throws -> RowType
) throws -> AsyncThrowingStream<[RowType], Error>

func watch<RowType>(
options: WatchOptions<RowType>
) throws -> AsyncThrowingStream<[RowType], Error>

/// Execute a write transaction with the given callback
func writeTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R

/// Execute a read transaction with the given callback
func readTransaction<R>(callback: @escaping (any PowerSyncTransaction) throws -> R) async throws -> R
}

extension Queries {
public func execute(_ sql: String) async throws -> Int64 {
public extension Queries {
func execute(_ sql: String) async throws -> Int64 {
return try await execute(sql: sql, parameters: [])
}

public func get<RowType>(
func get<RowType>(
_ sql: String,
mapper: @escaping (SqlCursor) -> RowType
) async throws -> RowType {
return try await get(sql: sql, parameters: [], mapper: mapper)
}

public func getAll<RowType>(
func getAll<RowType>(
_ sql: String,
mapper: @escaping (SqlCursor) -> RowType
) async throws -> [RowType] {
return try await getAll(sql: sql, parameters: [], mapper: mapper)
}

public func getOptional<RowType>(
func getOptional<RowType>(
_ sql: String,
mapper: @escaping (SqlCursor) -> RowType
) async throws -> RowType? {
return try await getOptional(sql: sql, parameters: [], mapper: mapper)
}

public func watch<RowType>(
func watch<RowType>(
_ sql: String,
mapper: @escaping (SqlCursor) -> RowType
) throws -> AsyncThrowingStream<[RowType], Error> {
Expand Down
27 changes: 17 additions & 10 deletions Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {

// Create an actor to handle concurrent mutations
actor ResultsStore {
private var results: [[String]] = []
private var results: Set<String> = []

func append(_ names: [String]) {
results.append(names)
results.formUnion(names)
}

func getResults() -> [[String]] {
func getResults() -> Set<String> {
results
}

Expand All @@ -213,14 +213,16 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
}
}


let resultsStore = ResultsStore()

let stream = try database.watch(
sql: "SELECT name FROM users ORDER BY id",
parameters: nil
) { cursor in
cursor.getString(index: 0)!
}
options: WatchOptions(
sql: "SELECT name FROM users ORDER BY id",
mapper: { cursor in
cursor.getString(index: 0)!
}
))

let watchTask = Task {
for try await names in stream {
Expand All @@ -240,13 +242,18 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
sql: "INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
parameters: ["2", "User 2", "[email protected]"]
)


await fulfillment(of: [expectation], timeout: 5)
watchTask.cancel()

let finalResults = await resultsStore.getResults()
XCTAssertEqual(finalResults.count, 2)
XCTAssertEqual(finalResults[1], ["User 1", "User 2"])
// The count of invocations here can vary a lot depending on the order of execution
// In some cases the creation of the users can fire before the initial watched query
// has emitted a result.
// However the watched query should always emit the latest result set.
XCTAssertLessThanOrEqual(finalResults.count, 3)
XCTAssertEqual(finalResults, ["User 1", "User 2"])
}

func testWatchError() async throws {
Expand Down