Skip to content

Commit 62aaa8e

Browse files
committed
Introduce Notification Messages API
1 parent 22d5467 commit 62aaa8e

File tree

9 files changed

+2325
-0
lines changed

9 files changed

+2325
-0
lines changed

Package.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ let package = Package(
110110
"_FoundationCShims",
111111
"FoundationMacros",
112112
.product(name: "_RopeModule", package: "swift-collections"),
113+
.product(name: "DequeModule", package: "swift-collections"),
113114
.product(name: "OrderedCollections", package: "swift-collections"),
114115
],
115116
exclude: [
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#if FOUNDATION_FRAMEWORK
14+
internal class _NotificationCenterActorQueueManagerNSObjectWrapper: NSObject, @unchecked Sendable {}
15+
#else
16+
internal class _NotificationCenterActorQueueManagerNSObjectWrapper: @unchecked Sendable {}
17+
#endif
18+
19+
#if FOUNDATION_FRAMEWORK
20+
@objc(_NotificationCenterActorQueueManager)
21+
#endif
22+
internal final class _NotificationCenterActorQueueManager: _NotificationCenterActorQueueManagerNSObjectWrapper, @unchecked Sendable {
23+
#if !NO_FILESYSTEM
24+
struct State {
25+
var buffer = [@Sendable () async -> Void]()
26+
var continuation: UnsafeContinuation<(@Sendable () async -> Void)?, Never>?
27+
var isCancelled: Bool = false
28+
29+
static func waitForWork(_ state: LockedState<State>) async -> (@Sendable () async -> Void)? {
30+
return await withTaskCancellationHandler {
31+
return await withUnsafeContinuation { continuation in
32+
let (work, resumeContinuation) = state.withLock { state -> ((@Sendable () async -> Void)?, Bool) in
33+
if state.isCancelled {
34+
return (nil, true)
35+
} else {
36+
if state.buffer.isEmpty {
37+
assert(state.continuation == nil)
38+
state.continuation = continuation
39+
return (nil, false)
40+
} else {
41+
return (state.buffer.removeFirst(), true)
42+
}
43+
}
44+
}
45+
if resumeContinuation {
46+
continuation.resume(returning: work)
47+
}
48+
}
49+
} onCancel: {
50+
state.withLock { state in
51+
state.isCancelled = true
52+
defer {
53+
state.continuation = nil
54+
}
55+
return state.continuation
56+
}?.resume(returning: nil)
57+
}
58+
}
59+
}
60+
61+
let state: LockedState<State>
62+
let workerTask: Task<(), Never>
63+
64+
override init() {
65+
state = LockedState(initialState: State())
66+
workerTask = Task.detached { [state] in
67+
await withDiscardingTaskGroup { group in
68+
while let work = await State.waitForWork(state) {
69+
group.addTask(operation: work)
70+
}
71+
}
72+
}
73+
super.init()
74+
}
75+
76+
deinit {
77+
workerTask.cancel()
78+
}
79+
80+
func enqueue(_ work: @escaping @Sendable () async -> Void) {
81+
state.withLock { state in
82+
state.buffer.append(work)
83+
if let continuation = state.continuation {
84+
state.continuation = nil
85+
let item = state.buffer.removeFirst()
86+
continuation.resume(returning: item)
87+
}
88+
}
89+
}
90+
#endif
91+
}
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#if FOUNDATION_FRAMEWORK
14+
internal import _ForSwiftFoundation
15+
internal import CollectionsInternal
16+
#elseif canImport(DequeModule)
17+
internal import DequeModule
18+
#endif
19+
#if canImport(os)
20+
internal import os.log
21+
#endif
22+
23+
@available(FoundationPreview 6.2, *)
24+
extension NotificationCenter {
25+
/// Returns an asynchronous sequence of messages produced by this center for a given subject and identifier.
26+
/// - Parameters:
27+
/// - subject: The subject to observe. Specify a metatype to observe all values for a given type.
28+
/// - identifier: An identifier representing a specific message type.
29+
/// - limit: The maximum number of messages allowed to buffer.
30+
/// - Returns: An asynchronous sequence of messages produced by this center.
31+
public func messages<Identifier: MessageIdentifier, Message: AsyncMessage>(
32+
of subject: Message.Subject,
33+
for identifier: Identifier,
34+
bufferSize limit: Int = 10
35+
) -> some AsyncSequence<Message, Never> where Identifier.MessageType == Message, Message.Subject: AnyObject {
36+
return AsyncMessageSequence<Message>(self, subject, limit)
37+
}
38+
39+
/// Returns an asynchronous sequence of messages produced by this center for a given subject type and identifier.
40+
/// - Parameters:
41+
/// - subject: The metatype to observe all values for a given type.
42+
/// - identifier: An identifier representing a specific message type.
43+
/// - limit: The maximum number of messages allowed to buffer.
44+
/// - Returns: An asynchronous sequence of messages produced by this center.
45+
public func messages<Identifier: MessageIdentifier, Message: AsyncMessage>(
46+
of subject: Message.Subject.Type,
47+
for identifier: Identifier,
48+
bufferSize limit: Int = 10
49+
) -> some AsyncSequence<Message, Never> where Identifier.MessageType == Message {
50+
return AsyncMessageSequence<Message>(self, nil, limit)
51+
}
52+
53+
/// Returns an asynchronous sequence of messages produced by this center for a given subject and message type.
54+
/// - Parameters:
55+
/// - subject: The subject to observe. Specify a metatype to observe all values for a given type.
56+
/// - messageType: The message type to be observed.
57+
/// - limit: The maximum number of messages allowed to buffer.
58+
/// - Returns: An asynchronous sequence of messages produced by this center.
59+
public func messages<Message: AsyncMessage>(
60+
of subject: Message.Subject? = nil,
61+
for messageType: Message.Type,
62+
bufferSize limit: Int = 10
63+
) -> some AsyncSequence<Message, Never> where Message.Subject: AnyObject {
64+
return AsyncMessageSequence<Message>(self, subject, limit)
65+
}
66+
}
67+
68+
extension NotificationCenter {
69+
fileprivate struct AsyncMessageSequence<Message: NotificationCenter.AsyncMessage>: AsyncSequence, Sendable {
70+
let center: NotificationCenter
71+
nonisolated(unsafe) weak var object: AnyObject?
72+
let bufferSize: Int
73+
74+
init(_ center: NotificationCenter, _ object: AnyObject?, _ bufferSize: Int) {
75+
self.center = center
76+
self.object = object
77+
self.bufferSize = bufferSize
78+
}
79+
80+
func makeAsyncIterator() -> AsyncMessageSequenceIterator<Message> {
81+
return AsyncMessageSequenceIterator(center: center, object: object, bufferSize: bufferSize)
82+
}
83+
}
84+
}
85+
86+
extension NotificationCenter {
87+
fileprivate final class AsyncMessageSequenceIterator<Message: NotificationCenter.AsyncMessage>: AsyncIteratorProtocol, Sendable {
88+
typealias Element = Message
89+
typealias Failure = Never
90+
91+
struct State {
92+
var observer: NotificationCenter.ObservationToken?
93+
var continuations: [UnsafeContinuation<Message?, Never>] = []
94+
var buffer = Deque<Message>(minimumCapacity: 1)
95+
let bufferSize: Int
96+
}
97+
98+
struct Resumption {
99+
let message: Message?
100+
let continuations: [UnsafeContinuation<Message?, Never>]
101+
102+
init(message: Message?, continuation: UnsafeContinuation<Message?, Never>) {
103+
self.message = message
104+
self.continuations = [continuation]
105+
}
106+
107+
init(cancelling: [UnsafeContinuation<Message?, Never>]) {
108+
self.message = nil
109+
self.continuations = cancelling
110+
}
111+
112+
func resume() {
113+
for continuation in continuations {
114+
continuation.resume(returning: message)
115+
}
116+
}
117+
}
118+
119+
let state: LockedState<State>
120+
121+
init(center: NotificationCenter, object: AnyObject?, bufferSize: Int) {
122+
self.state = LockedState(initialState: State(bufferSize: bufferSize))
123+
124+
#if FOUNDATION_FRAMEWORK
125+
let observerBlock: @Sendable (Notification) -> Void = { [weak self] notification in
126+
guard let message: Message = NotificationCenter._messageFromNotification(notification) else { return }
127+
128+
self?.observationCallback(message)
129+
}
130+
#else
131+
let observerBlock: @Sendable (Message) -> Void = { [weak self] message in
132+
self?.observationCallback(message)
133+
}
134+
#endif
135+
136+
let token = center._addObserver(Message.name, object: object, using: observerBlock)
137+
138+
self.state.withLock { _state in
139+
_state.observer = ObservationToken(center: center, token: token)
140+
}
141+
}
142+
143+
deinit {
144+
teardown()
145+
}
146+
147+
func teardown() {
148+
let (observer, resumption) = state.withLock { _state -> (NotificationCenter.ObservationToken?, Resumption) in
149+
let observer = _state.observer
150+
_state.observer = nil
151+
_state.buffer.removeAll(keepingCapacity: false)
152+
defer { _state.continuations.removeAll(keepingCapacity: false) }
153+
return (observer, Resumption(cancelling: _state.continuations))
154+
}
155+
156+
resumption.resume()
157+
158+
if let observer {
159+
observer.remove()
160+
}
161+
}
162+
163+
func observationCallback(_ message: Message) {
164+
state.withLock { _state -> Resumption? in
165+
if _state.buffer.count + 1 > _state.bufferSize {
166+
_state.buffer.removeFirst()
167+
#if canImport(os)
168+
NotificationCenter.logger.fault("Notification center message dropped due to buffer limit. Check sequence iterator frequently or increase buffer size. Message: \(String(describing: Message.self))")
169+
#endif
170+
}
171+
_state.buffer.append(message)
172+
173+
if _state.continuations.isEmpty {
174+
return nil
175+
} else {
176+
return Resumption(message: _state.buffer.removeFirst(), continuation: _state.continuations.removeFirst())
177+
}
178+
}?.resume()
179+
}
180+
181+
func next() async -> Message? {
182+
await withTaskCancellationHandler {
183+
return await withUnsafeContinuation { (continuation: UnsafeContinuation<Message?, Never>) in
184+
state.withLock { _state -> Resumption? in
185+
_state.continuations.append(continuation)
186+
if _state.buffer.isEmpty {
187+
return nil
188+
} else {
189+
return Resumption(message: _state.buffer.removeFirst(), continuation: _state.continuations.removeFirst())
190+
}
191+
}?.resume()
192+
}
193+
} onCancel: {
194+
teardown()
195+
}
196+
}
197+
}
198+
}

0 commit comments

Comments
 (0)