Skip to content

Commit 1c59de4

Browse files
update watched query implementation
1 parent a9d605e commit 1c59de4

File tree

8 files changed

+124
-51
lines changed

8 files changed

+124
-51
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
# 1.0.0-Beta.14
4+
5+
- Removed references to the PowerSync Kotlin SDK from all public API protocols.
6+
- Improved the stability of watched queries. Watched queries were previously susceptible to runtime crashes if an exception was thrown in the update stream. Errors are now gracefully handled.
7+
38
# 1.0.0-Beta.13
49

510
- Update `powersync-kotlin` dependency to version `1.0.0-BETA32`, which includes:

Package.resolved

Lines changed: 0 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ let package = Package(
1717
targets: ["PowerSync"]),
1818
],
1919
dependencies: [
20-
.package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA32.0"),
20+
// .package(url: "https://github.com/powersync-ja/powersync-kotlin.git", exact: "1.0.0-BETA32.0"),
21+
.package(path: "/Users/stevenontong/Documents/platform_code/powersync/powersync-kotlin"),
2122
.package(url: "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "0.3.12"..<"0.4.0")
2223
],
2324
targets: [

Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift

Lines changed: 75 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
55
let logger: any LoggerProtocol
66

77
private let kotlinDatabase: PowerSyncKotlin.PowerSyncDatabase
8+
private let encoder = JSONEncoder()
89
let currentStatus: SyncStatus
910

1011
init(
@@ -221,43 +222,30 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
221222
// Create an outer task to monitor cancellation
222223
let task = Task {
223224
do {
224-
var mapperError: Error?
225-
// HACK!
226-
// SKIEE doesn't support custom exceptions in Flows
227-
// Exceptions which occur in the Flow itself cause runtime crashes.
228-
// The most probable crash would be the internal EXPLAIN statement.
229-
// This attempts to EXPLAIN the query before passing it to Kotlin
230-
// We could introduce an onChange API in Kotlin which we use to implement watches here.
231-
// This would prevent most issues with exceptions.
232-
// EXPLAIN statement to prevent crashes in SKIEE
233-
_ = try await self.kotlinDatabase.getAll(
234-
sql: "EXPLAIN \(options.sql)",
235-
parameters: mapParameters(options.parameters),
236-
mapper: { _ in "" }
225+
let watchedTables = try await self.getQuerySourceTables(
226+
sql: options.sql,
227+
parameters: options.parameters
237228
)
238229

239230
// Watching for changes in the database
240-
for try await values in try self.kotlinDatabase.watch(
241-
sql: options.sql,
242-
parameters: mapParameters(options.parameters),
231+
for try await _ in try self.kotlinDatabase.onChange(
232+
tables: Set(watchedTables),
243233
throttleMs: options.throttleMs,
244-
mapper: { cursor in
245-
do {
246-
return try options.mapper(KotlinSqlCursor(base: cursor))
247-
} catch {
248-
mapperError = error
249-
return ()
250-
}
251-
}
234+
triggerImmediately: true // Allows emitting the first result even if there aren't changes
252235
) {
253236
// Check if the outer task is cancelled
254-
try Task.checkCancellation() // This checks if the calling task was cancelled
255-
256-
if mapperError != nil {
257-
throw mapperError!
258-
}
259-
260-
try continuation.yield(safeCast(values, to: [RowType].self))
237+
try Task.checkCancellation()
238+
239+
try continuation.yield(
240+
safeCast(
241+
await self.getAll(
242+
sql: options.sql,
243+
parameters: options.parameters,
244+
mapper: options.mapper
245+
),
246+
to: [RowType].self
247+
)
248+
)
261249
}
262250

263251
continuation.finish()
@@ -352,13 +340,66 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
352340
return try await handler()
353341
} catch {
354342
// Try and parse errors back from the Kotlin side
355-
356343
if let mapperError = SqlCursorError.fromDescription(error.localizedDescription) {
357344
throw mapperError
358345
}
359346

360-
// Throw remaining errors as-is
361-
throw error
347+
throw PowerSyncError.operationFailed(
348+
underlyingError: error
349+
)
350+
}
351+
}
352+
353+
private func getQuerySourceTables(
354+
sql: String,
355+
parameters: [Any?]
356+
) async throws -> Set<String> {
357+
let rows = try await getAll(
358+
sql: "EXPLAIN \(sql)",
359+
parameters: parameters,
360+
mapper: { cursor in
361+
try ExplainQueryResult(
362+
addr: cursor.getString(index: 0),
363+
opcode: cursor.getString(index: 1),
364+
p1: cursor.getInt64(index: 2),
365+
p2: cursor.getInt64(index: 3),
366+
p3: cursor.getInt64(index: 4)
367+
)
368+
}
369+
)
370+
371+
let rootPages = rows.compactMap { r in
372+
if (r.opcode == "OpenRead" || r.opcode == "OpenWrite") &&
373+
r.p3 == 0 && r.p2 != 0
374+
{
375+
return r.p2
376+
}
377+
return nil
378+
}
379+
380+
do {
381+
let pagesData = try encoder.encode(rootPages)
382+
let tableRows = try await getAll(
383+
sql: "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))",
384+
parameters: [
385+
String(data: pagesData, encoding: .utf8)
386+
]
387+
) { try $0.getString(index: 0) }
388+
389+
return Set(tableRows)
390+
} catch {
391+
throw PowerSyncError.operationFailed(
392+
message: "Could not determine watched query tables",
393+
underlyingError: error
394+
)
362395
}
363396
}
364397
}
398+
399+
private struct ExplainQueryResult {
400+
let addr: String
401+
let opcode: String
402+
let p1: Int64
403+
let p2: Int64
404+
let p3: Int64
405+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import Foundation
2+
3+
/// Enum representing errors that can occur in the PowerSync system.
4+
public enum PowerSyncError: Error, LocalizedError {
5+
6+
/// Represents a failure in an operation, potentially with a custom message and an underlying error.
7+
case operationFailed(message: String? = nil, underlyingError: Error? = nil)
8+
9+
/// A localized description of the error, providing details about the failure.
10+
public var errorDescription: String? {
11+
switch self {
12+
case let .operationFailed(message, underlyingError):
13+
// Combine message and underlying error description if both are available
14+
if let message = message, let underlyingError = underlyingError {
15+
return "\(message): \(underlyingError.localizedDescription)"
16+
} else if let message = message {
17+
// Return only the message if no underlying error is available
18+
return message
19+
} else if let underlyingError = underlyingError {
20+
// Return only the underlying error description if no message is provided
21+
return underlyingError.localizedDescription
22+
} else {
23+
// Fallback to a generic error description if neither message nor underlying error is provided
24+
return "An unknown error occurred."
25+
}
26+
}
27+
}
28+
}

Sources/PowerSync/protocol/db/CrudBatch.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import Foundation
44

55
/// A transaction of client-side changes.
66
public protocol CrudBatch {
7-
/// Unique transaction id.
8-
///
9-
/// If nil, this contains a list of changes recorded without an explicit transaction associated.
7+
/// Indicates if there are additional Crud items in the queue which are not included in this batch
108
var hasMore: Bool { get }
119

1210
/// List of client-side changes.

Tests/PowerSyncTests/CrudTests.swift

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,25 @@ final class CrudTests: XCTestCase {
6969
XCTAssert(fullBatch.hasMore == false)
7070
XCTAssert(fullBatch.crud.count == 100)
7171

72-
guard let txBatch = try await database.getNextCrudTransaction() else {
72+
guard let nextTx = try await database.getNextCrudTransaction() else {
7373
return XCTFail("Failed to get transaction crud batch")
7474
}
7575

76-
XCTAssert(txBatch.crud.count == 100)
76+
XCTAssert(nextTx.crud.count == 100)
77+
78+
for r in nextTx.crud {
79+
print(r)
80+
}
7781

7882
// Completing the transaction should clear the items
79-
try await txBatch.complete()
83+
try await nextTx.complete()
8084

8185
let afterCompleteBatch = try await database.getNextCrudTransaction()
86+
87+
for r in afterCompleteBatch?.crud ?? [] {
88+
print(r)
89+
}
90+
8291
XCTAssertNil(afterCompleteBatch)
8392
}
8493
}

Tests/PowerSyncTests/Kotlin/KotlinPowerSyncDatabaseImplTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ final class KotlinPowerSyncDatabaseImplTests: XCTestCase {
9696
sql: "SELECT name FROM users WHERE id = ?",
9797
parameters: ["999"]
9898
) { cursor in
99-
try cursor.getString(name: "")
99+
try cursor.getString(name: "name")
100100
}
101101

102102
XCTAssertNil(nonExistent)

0 commit comments

Comments
 (0)