11import Foundation
22import PowerSyncKotlin
33
4- internal final class KotlinPowerSyncDatabaseImpl : PowerSyncDatabaseProtocol {
4+ final class KotlinPowerSyncDatabaseImpl : PowerSyncDatabaseProtocol {
55 let logger : any LoggerProtocol
6-
6+
77 private let kotlinDatabase : PowerSyncKotlin . PowerSyncDatabase
88
99 var currentStatus : SyncStatus { kotlinDatabase. currentStatus }
10-
1110
1211 init (
1312 schema: Schema ,
@@ -189,44 +188,52 @@ internal final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
189188 options: WatchOptions < RowType >
190189 ) throws -> AsyncThrowingStream < [ RowType ] , Error > {
191190 AsyncThrowingStream { continuation in
192- Task {
191+ // Create an outer task to monitor cancellation
192+ let task = Task {
193193 do {
194194 var mapperError : Error ?
195- // HACK!
196- // SKIEE doesn't support custom exceptions in Flows
197- // Exceptions which occur in the Flow itself cause runtime crashes.
198- // The most probable crash would be the internal EXPLAIN statement.
199- // This attempts to EXPLAIN the query before passing it to Kotlin
200- // We could introduce an onChange API in Kotlin which we use to implement watches here.
201- // This would prevent most issues with exceptions.
195+
196+ // EXPLAIN statement to prevent crashes in SKIEE
202197 _ = try await self . kotlinDatabase. getAll (
203198 sql: " EXPLAIN \( options. sql) " ,
204199 parameters: options. parameters,
205200 mapper: { _ in " " }
206201 )
202+
203+ // Watching for changes in the database
207204 for try await values in try self . kotlinDatabase. watch (
208205 sql: options. sql,
209206 parameters: options. parameters,
210207 throttleMs: KotlinLong ( value: options. throttleMs) ,
211- mapper: { cursor in do {
212- return try options . mapper ( cursor )
213- } catch {
214- mapperError = error
215- // The value here does not matter. We will throw the exception later
216- // This is not ideal, this is only a workaround until we expose fine grained access to Kotlin SDK internals.
217- return nil as RowType ?
218- } }
208+ mapper: { cursor in
209+ do {
210+ return try options . mapper ( cursor )
211+ } catch {
212+ mapperError = error
213+ return nil as RowType ?
214+ }
215+ }
219216 ) {
217+ // Check if the outer task is cancelled
218+ try Task . checkCancellation ( ) // This checks if the calling task was cancelled
219+
220220 if mapperError != nil {
221221 throw mapperError!
222222 }
223+
223224 try continuation. yield ( safeCast ( values, to: [ RowType ] . self) )
224225 }
226+
225227 continuation. finish ( )
226228 } catch {
227229 continuation. finish ( throwing: error)
228230 }
229231 }
232+
233+ // Propagate cancellation from the outer task to the inner task
234+ continuation. onTermination = { @Sendable _ in
235+ task. cancel ( ) // This cancels the inner task when the stream is terminated
236+ }
230237 }
231238 }
232239
@@ -237,8 +244,8 @@ internal final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol {
237244 func readTransaction< R> ( callback: @escaping ( any PowerSyncTransaction ) throws -> R ) async throws -> R {
238245 return try safeCast ( await kotlinDatabase. readTransaction ( callback: TransactionCallback ( callback: callback) ) , to: R . self)
239246 }
240-
241- func close( ) async throws {
247+
248+ func close( ) async throws {
242249 try await kotlinDatabase. close ( )
243250 }
244251}
0 commit comments