Skip to content

Commit 9a28c67

Browse files
committed
New: AsyncValueObservation
1 parent 1fda417 commit 9a28c67

File tree

2 files changed

+245
-0
lines changed

2 files changed

+245
-0
lines changed

GRDB/ValueObservation/ValueObservation.swift

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,109 @@ extension ValueObservation: Refinable {
211211
}
212212
}
213213

214+
#if swift(>=5.5)
215+
extension ValueObservation {
216+
// MARK: - Asynchronous Observation
217+
218+
/// The database observation, as an asynchronous sequence of
219+
/// database changes.
220+
///
221+
/// [**Experimental**](http://github.com/groue/GRDB.swift#what-are-experimental-features)
222+
///
223+
/// - parameter reader: A DatabaseReader.
224+
/// - parameter scheduler: A Scheduler. By default, fresh values are
225+
/// dispatched asynchronously on the main queue.
226+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
227+
public func values(
228+
in reader: DatabaseReader,
229+
scheduling scheduler: ValueObservationScheduler = .async(onQueue: .main))
230+
-> AsyncValueObservation<Reducer.Value>
231+
{
232+
AsyncValueObservation { onError, onChange in
233+
self.start(in: reader, scheduling: scheduler, onError: onError, onChange: onChange)
234+
}
235+
}
236+
}
237+
238+
/// An asynchronous sequence of database changes.
239+
///
240+
/// [**Experimental**](http://github.com/groue/GRDB.swift#what-are-experimental-features)
241+
///
242+
/// Usage:
243+
///
244+
/// let observation = ValueObservation.tracking(Player.fetchAll)
245+
/// let dbQueue: DatabaseQueue: ...
246+
///
247+
/// // Each database change in the player prints "Fresh players: ..."
248+
/// for try await players in observation.values(in: dbQueue) {
249+
/// print("Fresh players: \(players)")
250+
/// }
251+
///
252+
/// See `ValueObservation` for more information.
253+
///
254+
/// - note: This async sequence never ends.
255+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
256+
public struct AsyncValueObservation<Element>: AsyncSequence {
257+
public typealias AsyncIterator = Iterator
258+
typealias StartFunction = (
259+
_ onError: @escaping (Error) -> Void,
260+
_ onChange: @escaping (Element) -> Void)
261+
-> DatabaseCancellable
262+
private var start: StartFunction
263+
264+
init(start: @escaping StartFunction) {
265+
self.start = start
266+
}
267+
268+
public func makeAsyncIterator() -> Iterator {
269+
// This cancellable will be retained by the Iterator, which itself will
270+
// be retained by the Swift async runtime.
271+
//
272+
// We must not retain this cancellable in any other way, in order to
273+
// cancel the observation when the Swift async runtime releases
274+
// the iterator.
275+
var cancellable: AnyDatabaseCancellable?
276+
let stream = AsyncThrowingStream(Element.self, bufferingPolicy: .unbounded) { continuation in
277+
cancellable = AnyDatabaseCancellable(start(
278+
// onError
279+
{ error in
280+
continuation.finish(throwing: error)
281+
},
282+
// onChange
283+
{ [weak cancellable] element in
284+
if case .terminated = continuation.yield(element) {
285+
// TODO: I could never see this code running. Is it needed?
286+
cancellable?.cancel()
287+
}
288+
}))
289+
continuation.onTermination = { @Sendable [weak cancellable] _ in
290+
cancellable?.cancel()
291+
}
292+
}
293+
294+
let iterator = stream.makeAsyncIterator()
295+
if let cancellable = cancellable {
296+
return Iterator(
297+
iterator: iterator,
298+
cancellable: cancellable)
299+
} else {
300+
// GRDB bug: there is no point throwing any error.
301+
fatalError("Expected AsyncThrowingStream to have started the observation already")
302+
}
303+
}
304+
305+
/// An asynchronous iterator that supplies database changes one at a time.
306+
public struct Iterator: AsyncIteratorProtocol {
307+
var iterator: AsyncThrowingStream<Element, Error>.AsyncIterator
308+
let cancellable: AnyDatabaseCancellable
309+
310+
public mutating func next() async throws -> Element? {
311+
try await iterator.next()
312+
}
313+
}
314+
}
315+
#endif
316+
214317
#if canImport(Combine)
215318
extension ValueObservation {
216319
// MARK: - Publishing Observed Values

Tests/GRDBTests/ValueObservationTests.swift

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,4 +518,146 @@ class ValueObservationTests: GRDBTestCase {
518518
try test(makeDatabaseQueue())
519519
try test(makeDatabasePool())
520520
}
521+
522+
#if swift(>=5.5)
523+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
524+
func testAsyncAwait_values_prefix() async throws {
525+
let dbQueue = try makeDatabaseQueue()
526+
527+
// We need something to change
528+
try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }
529+
530+
let cancellationExpectation = expectation(description: "cancelled")
531+
let observation = ValueObservation
532+
.tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
533+
.handleEvents(didCancel: { cancellationExpectation.fulfill() })
534+
535+
let task = Task { () -> [Int] in
536+
var counts: [Int] = []
537+
538+
for try await count in observation.values(in: dbQueue).prefix(3) {
539+
counts.append(count)
540+
try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
541+
}
542+
return counts
543+
}
544+
545+
let counts = try await task.value
546+
547+
// All values were published
548+
XCTAssertEqual(counts, [0, 1, 2])
549+
550+
// Observation was ended
551+
wait(for: [cancellationExpectation], timeout: 2)
552+
}
553+
554+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
555+
func testAsyncAwait_values_prefix_immediate_scheduling() async throws {
556+
let dbQueue = try makeDatabaseQueue()
557+
558+
// We need something to change
559+
try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }
560+
561+
let cancellationExpectation = expectation(description: "cancelled")
562+
let observation = ValueObservation
563+
.tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
564+
.handleEvents(didCancel: { cancellationExpectation.fulfill() })
565+
566+
let task = Task { @MainActor () -> [Int] in
567+
var counts: [Int] = []
568+
569+
for try await count in observation.values(in: dbQueue, scheduling: .immediate).prefix(3) {
570+
counts.append(count)
571+
try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
572+
}
573+
return counts
574+
}
575+
576+
let counts = try await task.value
577+
578+
// All values were published
579+
XCTAssertEqual(counts, [0, 1, 2])
580+
581+
// Observation was ended
582+
wait(for: [cancellationExpectation], timeout: 2)
583+
}
584+
585+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
586+
func testAsyncAwait_values_break() async throws {
587+
let dbQueue = try makeDatabaseQueue()
588+
589+
// We need something to change
590+
try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }
591+
592+
let cancellationExpectation = expectation(description: "cancelled")
593+
let observation = ValueObservation
594+
.tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
595+
.handleEvents(didCancel: { cancellationExpectation.fulfill() })
596+
597+
let task = Task { () -> [Int] in
598+
var counts: [Int] = []
599+
600+
for try await count in observation.values(in: dbQueue) {
601+
counts.append(count)
602+
if count == 2 {
603+
break
604+
} else {
605+
try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
606+
}
607+
}
608+
return counts
609+
}
610+
611+
let counts = try await task.value
612+
613+
// All values were published
614+
XCTAssertEqual(counts, [0, 1, 2])
615+
616+
// Observation was ended
617+
wait(for: [cancellationExpectation], timeout: 2)
618+
}
619+
620+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
621+
func testAsyncAwait_values_cancelled() async throws {
622+
let dbQueue = try makeDatabaseQueue()
623+
624+
// We need something to change
625+
try await dbQueue.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") }
626+
627+
let cancellationExpectation = expectation(description: "cancelled")
628+
let valueExpectation = expectation(description: "value")
629+
valueExpectation.assertForOverFulfill = false
630+
let observation = ValueObservation
631+
.tracking { try Int.fetchOne($0, sql: "SELECT COUNT(*) FROM t")! }
632+
.handleEvents(
633+
didReceiveValue: { _ in valueExpectation.fulfill() },
634+
didCancel: { cancellationExpectation.fulfill() })
635+
636+
struct TestError: Error { }
637+
do {
638+
try await withThrowingTaskGroup(of: Void.self) { group in
639+
group.addTask {
640+
// Infinite loop
641+
for try await _ in observation.values(in: dbQueue) {
642+
try await dbQueue.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") }
643+
}
644+
}
645+
group.addTask {
646+
// Throw after a delay
647+
try await Task.sleep(nanoseconds: 1_000_000)
648+
throw TestError()
649+
}
650+
651+
for try await _ in group { }
652+
}
653+
XCTFail("Expected error")
654+
} catch is TestError {
655+
} catch {
656+
XCTFail("Unexpected error \(error)")
657+
}
658+
659+
// A value was observed, and observation was ended
660+
wait(for: [valueExpectation, cancellationExpectation], timeout: 2)
661+
}
662+
#endif
521663
}

0 commit comments

Comments
 (0)