Skip to content

Commit d9c73d3

Browse files
authored
[Observation] Initial implementation of Observed for transactional tracked values over time (#79817)
This is an implementation for the feature swiftlang/swift-evolution#2726
1 parent 53aa827 commit d9c73d3

File tree

4 files changed

+376
-3
lines changed

4 files changed

+376
-3
lines changed

stdlib/public/Observation/Sources/Observation/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ add_swift_target_library(swiftObservation ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} IS
1717
Observable.swift
1818
ObservationRegistrar.swift
1919
ObservationTracking.swift
20+
Observations.swift
2021
ThreadLocal.cpp
2122
ThreadLocal.swift
2223

Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
import _Concurrency
13+
14+
@usableFromInline
15+
@available(SwiftStdlib 5.1, *)
16+
@_silgen_name("swift_task_addCancellationHandler")
17+
func _taskAddCancellationHandler(handler: () -> Void) -> UnsafeRawPointer /*CancellationNotificationStatusRecord*/
18+
19+
@usableFromInline
20+
@available(SwiftStdlib 5.1, *)
21+
@_silgen_name("swift_task_removeCancellationHandler")
22+
func _taskRemoveCancellationHandler(
23+
record: UnsafeRawPointer /*CancellationNotificationStatusRecord*/
24+
)
25+
26+
func withIsolatedTaskCancellationHandler<T: Sendable>(
27+
operation: @isolated(any) () async throws -> T,
28+
onCancel handler: @Sendable () -> Void,
29+
isolation: isolated (any Actor)? = #isolation
30+
) async rethrows -> T {
31+
// unconditionally add the cancellation record to the task.
32+
// if the task was already cancelled, it will be executed right away.
33+
let record = _taskAddCancellationHandler(handler: handler)
34+
defer { _taskRemoveCancellationHandler(record: record) }
35+
36+
return try await operation()
37+
}
38+
39+
/// An asychronous sequence generated from a closure that tracks the transactional changes of `@Observable` types.
40+
///
41+
/// `Observations` conforms to `AsyncSequence`, providing a intutive and safe mechanism to track changes to
42+
/// types that are marked as `@Observable` by using Swift Concurrency to indicate transactional boundaries
43+
/// starting from the willSet of the first mutation to the next suspension point of the safe access.
44+
@available(SwiftStdlib 9999, *)
45+
public struct Observations<Element: Sendable, Failure: Error>: AsyncSequence, Sendable {
46+
public enum Iteration: Sendable {
47+
case next(Element)
48+
case finish
49+
}
50+
51+
struct State {
52+
enum Continuation {
53+
case cancelled
54+
case active(UnsafeContinuation<Void, Never>)
55+
func resume() {
56+
switch self {
57+
case .cancelled: break
58+
case .active(let continuation): continuation.resume()
59+
}
60+
}
61+
}
62+
var id = 0
63+
var continuations: [Int: Continuation] = [:]
64+
var dirty = false
65+
66+
// create a generation id for the unique identification of the continuations
67+
// this allows the shared awaiting of the willSets.
68+
// Most likely, there wont be more than a handful of active iterations
69+
// so this only needs to be unique for those active iterations
70+
// that are in the process of calling next.
71+
static func generation(_ state: _ManagedCriticalState<State>) -> Int {
72+
state.withCriticalRegion { state in
73+
defer { state.id &+= 1 }
74+
return state.id
75+
}
76+
}
77+
78+
// the cancellation of awaiting on willSet only ferries in resuming early
79+
// it is the responsability of the caller to check if the task is actually
80+
// cancelled after awaiting the willSet to act accordingly.
81+
static func cancel(_ state: _ManagedCriticalState<State>, id: Int) {
82+
state.withCriticalRegion { state in
83+
guard let continuation = state.continuations.removeValue(forKey: id) else {
84+
// if there was no continuation yet active (e.g. it was cancelled at
85+
// the start of the invocation, then put a tombstone in to gate that
86+
// resuming later
87+
state.continuations[id] = .cancelled
88+
return nil as Continuation?
89+
}
90+
return continuation
91+
}?.resume()
92+
}
93+
94+
// fire off ALL awaiting willChange continuations such that they are no
95+
// longer pending.
96+
static func emitWillChange(_ state: _ManagedCriticalState<State>) {
97+
let continuations = state.withCriticalRegion { state in
98+
// if there are no continuations present then we have to set the state as dirty
99+
// else if this is uncondiitonally set the state might produce duplicate events
100+
// one for the dirty and one for the continuation.
101+
if state.continuations.count == 0 {
102+
state.dirty = true
103+
}
104+
defer {
105+
state.continuations.removeAll()
106+
}
107+
return state.continuations.values
108+
}
109+
for continuation in continuations {
110+
continuation.resume()
111+
}
112+
}
113+
114+
// install a willChange continuation into the set of continuations
115+
// this must take a locally unique id (to the active calls of next)
116+
static func willChange(isolation iterationIsolation: isolated (any Actor)? = #isolation, state: _ManagedCriticalState<State>, id: Int) async {
117+
return await withUnsafeContinuation(isolation: iterationIsolation) { continuation in
118+
state.withCriticalRegion { state in
119+
defer { state.dirty = false }
120+
switch state.continuations[id] {
121+
case .cancelled:
122+
return continuation as UnsafeContinuation<Void, Never>?
123+
case .active:
124+
// the Iterator itself cannot be shared across isolations so any call to next that may share an id is a misbehavior
125+
// or an internal book-keeping failure
126+
fatalError("Iterator incorrectly shared across task isolations")
127+
case .none:
128+
if state.dirty {
129+
return continuation
130+
} else {
131+
state.continuations[id] = .active(continuation)
132+
return nil
133+
}
134+
}
135+
}?.resume()
136+
}
137+
}
138+
}
139+
140+
// @isolated(any) closures cannot be composed and retain or forward their isolation
141+
// this basically would be replaced with `{ .next(elementProducer()) }` if that
142+
// were to become possible.
143+
enum Emit {
144+
case iteration(@isolated(any) @Sendable () throws(Failure) -> Iteration)
145+
case element(@isolated(any) @Sendable () throws(Failure) -> Element)
146+
147+
var isolation: (any Actor)? {
148+
switch self {
149+
case .iteration(let closure): closure.isolation
150+
case .element(let closure): closure.isolation
151+
}
152+
}
153+
}
154+
155+
let state: _ManagedCriticalState<State>
156+
let emit: Emit
157+
158+
// internal funnel method for initialziation
159+
internal init(emit: Emit) {
160+
self.emit = emit
161+
self.state = _ManagedCriticalState(State())
162+
}
163+
164+
/// Constructs an asynchronous sequence for a given closure by tracking changes of `@Observable` types.
165+
///
166+
/// The emit closure is responsible for extracting a value out of a single or many `@Observable` types.
167+
///
168+
/// - Parameters:
169+
/// - isolation: The concurrency isolation domain of the caller.
170+
/// - emit: A closure to generate an element for the sequence.
171+
public init(
172+
@_inheritActorContext _ emit: @escaping @isolated(any) @Sendable () throws(Failure) -> Element
173+
) {
174+
self.init(emit: .element(emit))
175+
}
176+
177+
/// Constructs an asynchronous sequence for a given closure by tracking changes of `@Observable` types.
178+
///
179+
/// The emit closure is responsible for extracting a value out of a single or many `@Observable` types. This method
180+
/// continues to be invoked until the .finished option is returned or an error is thrown.
181+
///
182+
/// - Parameters:
183+
/// - isolation: The concurrency isolation domain of the caller.
184+
/// - emit: A closure to generate an element for the sequence.
185+
public static func untilFinished(
186+
@_inheritActorContext _ emit: @escaping @isolated(any) @Sendable () throws(Failure) -> Iteration
187+
) -> Observations<Element, Failure> {
188+
.init(emit: .iteration(emit))
189+
}
190+
191+
public struct Iterator: AsyncIteratorProtocol {
192+
// the state ivar serves two purposes:
193+
// 1) to store a critical region of state of the mutations
194+
// 2) to idenitify the termination of _this_ sequence
195+
var state: _ManagedCriticalState<State>?
196+
let emit: Emit
197+
var started = false
198+
199+
// this is the primary implementation of the tracking
200+
// it is bound to be called on the specified isolation of the construction
201+
fileprivate static func trackEmission(isolation trackingIsolation: isolated (any Actor)?, state: _ManagedCriticalState<State>, emit: Emit) throws(Failure) -> Iteration {
202+
// this ferries in an intermediate form with Result to skip over `withObservationTracking` not handling errors being thrown
203+
// particularly this case is that the error is also an iteration state transition data point (it terminates the sequence)
204+
// so we need to hold that to get a chance to catch and clean-up
205+
let result = withObservationTracking {
206+
switch emit {
207+
case .element(let element):
208+
Result(catching: element).map { Iteration.next($0) }
209+
case .iteration(let iteration):
210+
Result(catching: iteration)
211+
}
212+
} onChange: { [state] in
213+
// resume all cases where the awaiting continuations are awaiting a willSet
214+
State.emitWillChange(state)
215+
}
216+
return try result.get()
217+
}
218+
219+
fileprivate mutating func terminate(throwing failure: Failure? = nil, id: Int) throws(Failure) -> Element? {
220+
// this is purely defensive to any leaking out of iteration generation ids
221+
state?.withCriticalRegion { state in
222+
state.continuations.removeValue(forKey: id)
223+
}?.resume()
224+
// flag the sequence as terminal by nil'ing out the state
225+
state = nil
226+
if let failure {
227+
throw failure
228+
} else {
229+
return nil
230+
}
231+
}
232+
233+
fileprivate mutating func trackEmission(isolation iterationIsolation: isolated (any Actor)?, state: _ManagedCriticalState<State>, id: Int) async throws(Failure) -> Element? {
234+
guard !Task.isCancelled else {
235+
// the task was cancelled while awaiting a willChange so ensure a proper termination
236+
return try terminate(id: id)
237+
}
238+
// start by directly tracking the emission via a withObservation tracking on the isolation specified fro mthe init
239+
switch try await Iterator.trackEmission(isolation: emit.isolation, state: state, emit: emit) {
240+
case .finish: return try terminate(id: id)
241+
case .next(let element): return element
242+
}
243+
}
244+
245+
public mutating func next(isolation iterationIsolation: isolated (any Actor)? = #isolation) async throws(Failure) -> Element? {
246+
// early exit if the sequence is terminal already
247+
guard let state else { return nil }
248+
// set up an id for this generation
249+
let id = State.generation(state)
250+
do {
251+
// there are two versions;
252+
// either the tracking has never yet started at all and we need to prime the pump for this specific iterator
253+
// or the tracking has already started and we are going to await a change
254+
if !started {
255+
started = true
256+
return try await trackEmission(isolation: iterationIsolation, state: state, id: id)
257+
} else {
258+
// wait for the willChange (and NOT the value itself)
259+
// since this is going to be on the isolation of the object (e.g. the isolation specified in the initialization)
260+
// this will mean our next await for the emission will ensure the suspension return of the willChange context
261+
// back to the trailing edges of the mutations. In short, this enables the transactionality bounded by the
262+
// isolation of the mutation.
263+
await withIsolatedTaskCancellationHandler(operation: {
264+
await State.willChange(isolation: iterationIsolation, state: state, id: id)
265+
}, onCancel: {
266+
// ensure to clean out our continuation uon cancellation
267+
State.cancel(state, id: id)
268+
}, isolation: iterationIsolation)
269+
return try await trackEmission(isolation: iterationIsolation, state: state, id: id)
270+
}
271+
} catch {
272+
// the user threw a failure in the closure so propigate that outwards and terminate the sequence
273+
return try terminate(throwing: error, id: id)
274+
}
275+
}
276+
}
277+
278+
public func makeAsyncIterator() -> Iterator {
279+
Iterator(state: state, emit: emit)
280+
}
281+
}

test/abi/macOS/arm64/observation.swift

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,48 @@ Added: _$s11Observation0A8TrackingV7changeds10AnyKeyPathCSgvg
4242

4343
// property descriptor for Observation.ObservationTracking.changed : Swift.AnyKeyPath?
4444
Added: _$s11Observation0A8TrackingV7changeds10AnyKeyPathCSgvpMV
45+
46+
// static Observation.Observations.untilFinished(@isolated(any) @Sendable () throws(B) -> Observation.Observations<A, B>.Iteration) -> Observation.Observations<A, B>
47+
Added: _$s11Observation12ObservationsV13untilFinishedyACyxq_GAC9IterationOyxq__GyYbq_YKYAcFZ
48+
49+
// Observation.Observations.makeAsyncIterator() -> Observation.Observations<A, B>.Iterator
50+
Added: _$s11Observation12ObservationsV17makeAsyncIteratorAC0E0Vyxq__GyF
51+
52+
// Observation.Observations.Iterator.next(isolation: isolated Swift.Actor?) async throws(B) -> A?
53+
Added: _$s11Observation12ObservationsV8IteratorV4next9isolationxSgScA_pSgYi_tYaq_YKF
54+
55+
// async function pointer to Observation.Observations.Iterator.next(isolation: isolated Swift.Actor?) async throws(B) -> A?
56+
Added: _$s11Observation12ObservationsV8IteratorV4next9isolationxSgScA_pSgYi_tYaq_YKFTu
57+
58+
// type metadata accessor for Observation.Observations.Iterator
59+
Added: _$s11Observation12ObservationsV8IteratorVMa
60+
61+
// nominal type descriptor for Observation.Observations.Iterator
62+
Added: _$s11Observation12ObservationsV8IteratorVMn
63+
64+
// protocol conformance descriptor for Observation.Observations<A, B>.Iterator : Swift.AsyncIteratorProtocol in Observation
65+
Added: _$s11Observation12ObservationsV8IteratorVyxq__GScIAAMc
66+
67+
// enum case for Observation.Observations.Iteration.next<A, B where A: Swift.Sendable, B: Swift.Error>(Observation.Observations<A, B>.Iteration.Type) -> (A) -> Observation.Observations<A, B>.Iteration
68+
Added: _$s11Observation12ObservationsV9IterationO4nextyAEyxq__GxcAGms8SendableRzs5ErrorR_r0_lFWC
69+
70+
// enum case for Observation.Observations.Iteration.finish<A, B where A: Swift.Sendable, B: Swift.Error>(Observation.Observations<A, B>.Iteration.Type) -> Observation.Observations<A, B>.Iteration
71+
Added: _$s11Observation12ObservationsV9IterationO6finishyAEyxq__GAGms8SendableRzs5ErrorR_r0_lFWC
72+
73+
// type metadata accessor for Observation.Observations.Iteration
74+
Added: _$s11Observation12ObservationsV9IterationOMa
75+
76+
// nominal type descriptor for Observation.Observations.Iteration
77+
Added: _$s11Observation12ObservationsV9IterationOMn
78+
79+
// type metadata accessor for Observation.Observations
80+
Added: _$s11Observation12ObservationsVMa
81+
82+
// nominal type descriptor for Observation.Observations
83+
Added: _$s11Observation12ObservationsVMn
84+
85+
// Observation.Observations.init(@isolated(any) @Sendable () throws(B) -> A) -> Observation.Observations<A, B>
86+
Added: _$s11Observation12ObservationsVyACyxq_GxyYbq_YKYAccfC
87+
88+
// protocol conformance descriptor for Observation.Observations<A, B> : Swift.AsyncSequence in Observation
89+
Added: _$s11Observation12ObservationsVyxq_GSciAAMc

0 commit comments

Comments
 (0)