Skip to content

Commit c37addb

Browse files
authored
Introduce Notification Messages API (#1452)
Introduce Notification Messages API
1 parent 85dcd62 commit c37addb

12 files changed

+2359
-2
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ else()
8686
message(STATUS "_SwiftCollections_SourceDIR not provided, checking out local copy of swift-collections")
8787
FetchContent_Declare(SwiftCollections
8888
GIT_REPOSITORY https://github.com/apple/swift-collections.git
89-
GIT_TAG 1.1.2)
89+
GIT_TAG 1.1.6)
9090
endif()
9191
FetchContent_MakeAvailable(SwiftFoundationICU SwiftCollections)
9292

Package.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ let package = Package(
120120
"_FoundationCShims",
121121
"FoundationMacros",
122122
.product(name: "_RopeModule", package: "swift-collections"),
123+
.product(name: "DequeModule", package: "swift-collections"),
123124
.product(name: "OrderedCollections", package: "swift-collections"),
124125
],
125126
exclude: [
@@ -138,7 +139,8 @@ let package = Package(
138139
"CMakeLists.txt",
139140
"ProcessInfo/CMakeLists.txt",
140141
"FileManager/CMakeLists.txt",
141-
"URL/CMakeLists.txt"
142+
"URL/CMakeLists.txt",
143+
"NotificationCenter/CMakeLists.txt"
142144
],
143145
cSettings: [
144146
.define("_GNU_SOURCE", .when(platforms: [.linux]))

Sources/FoundationEssentials/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ add_subdirectory(FileManager)
4242
add_subdirectory(Formatting)
4343
add_subdirectory(JSON)
4444
add_subdirectory(Locale)
45+
add_subdirectory(NotificationCenter)
4546
add_subdirectory(Predicate)
4647
add_subdirectory(ProcessInfo)
4748
add_subdirectory(PropertyList)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#if FOUNDATION_FRAMEWORK
14+
internal class _NotificationCenterActorQueueManagerNSObjectWrapper: NSObject, @unchecked Sendable {}
15+
#else
16+
internal class _NotificationCenterActorQueueManagerNSObjectWrapper: @unchecked Sendable {}
17+
#endif
18+
19+
#if FOUNDATION_FRAMEWORK
20+
@objc(_NotificationCenterActorQueueManager)
21+
#endif
22+
internal final class _NotificationCenterActorQueueManager: _NotificationCenterActorQueueManagerNSObjectWrapper, @unchecked Sendable {
23+
#if !NO_FILESYSTEM
24+
struct State {
25+
var buffer = [@Sendable () async -> Void]()
26+
var continuation: UnsafeContinuation<(@Sendable () async -> Void)?, Never>?
27+
var isCancelled: Bool = false
28+
29+
static func waitForWork(_ state: LockedState<State>) async -> (@Sendable () async -> Void)? {
30+
return await withTaskCancellationHandler {
31+
return await withUnsafeContinuation { continuation in
32+
let (work, resumeContinuation) = state.withLock { state -> ((@Sendable () async -> Void)?, Bool) in
33+
if state.isCancelled {
34+
return (nil, true)
35+
} else {
36+
if state.buffer.isEmpty {
37+
assert(state.continuation == nil)
38+
state.continuation = continuation
39+
return (nil, false)
40+
} else {
41+
return (state.buffer.removeFirst(), true)
42+
}
43+
}
44+
}
45+
if resumeContinuation {
46+
continuation.resume(returning: work)
47+
}
48+
}
49+
} onCancel: {
50+
state.withLock { state in
51+
state.isCancelled = true
52+
defer {
53+
state.continuation = nil
54+
}
55+
return state.continuation
56+
}?.resume(returning: nil)
57+
}
58+
}
59+
}
60+
61+
let state: LockedState<State>
62+
let workerTask: Task<(), Never>
63+
64+
override init() {
65+
state = LockedState(initialState: State())
66+
workerTask = Task.detached { [state] in
67+
await withDiscardingTaskGroup { group in
68+
while let work = await State.waitForWork(state) {
69+
group.addTask(operation: work)
70+
}
71+
}
72+
}
73+
super.init()
74+
}
75+
76+
deinit {
77+
workerTask.cancel()
78+
}
79+
80+
func enqueue(_ work: @escaping @Sendable () async -> Void) {
81+
state.withLock { state in
82+
state.buffer.append(work)
83+
if let continuation = state.continuation {
84+
state.continuation = nil
85+
let item = state.buffer.removeFirst()
86+
continuation.resume(returning: item)
87+
}
88+
}
89+
}
90+
#endif
91+
}
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift.org open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
//
11+
//===----------------------------------------------------------------------===//
12+
13+
#if FOUNDATION_FRAMEWORK
14+
internal import _ForSwiftFoundation
15+
internal import CollectionsInternal
16+
#elseif canImport(DequeModule)
17+
internal import DequeModule
18+
#elseif canImport(_FoundationCollections)
19+
internal import _FoundationCollections
20+
#endif
21+
#if canImport(os)
22+
internal import os.log
23+
#endif
24+
25+
@available(FoundationPreview 6.2, *)
26+
extension NotificationCenter {
27+
/// Returns an asynchronous sequence of messages produced by this center for a given subject and identifier.
28+
/// - Parameters:
29+
/// - subject: The subject to observe. Specify a metatype to observe all values for a given type.
30+
/// - identifier: An identifier representing a specific message type.
31+
/// - limit: The maximum number of messages allowed to buffer.
32+
/// - Returns: An asynchronous sequence of messages produced by this center.
33+
public func messages<Identifier: MessageIdentifier, Message: AsyncMessage>(
34+
of subject: Message.Subject,
35+
for identifier: Identifier,
36+
bufferSize limit: Int = 10
37+
) -> some AsyncSequence<Message, Never> where Identifier.MessageType == Message, Message.Subject: AnyObject {
38+
return AsyncMessageSequence<Message>(self, subject, limit)
39+
}
40+
41+
/// Returns an asynchronous sequence of messages produced by this center for a given subject type and identifier.
42+
/// - Parameters:
43+
/// - subject: The metatype to observe all values for a given type.
44+
/// - identifier: An identifier representing a specific message type.
45+
/// - limit: The maximum number of messages allowed to buffer.
46+
/// - Returns: An asynchronous sequence of messages produced by this center.
47+
public func messages<Identifier: MessageIdentifier, Message: AsyncMessage>(
48+
of subject: Message.Subject.Type,
49+
for identifier: Identifier,
50+
bufferSize limit: Int = 10
51+
) -> some AsyncSequence<Message, Never> where Identifier.MessageType == Message {
52+
return AsyncMessageSequence<Message>(self, nil, limit)
53+
}
54+
55+
/// Returns an asynchronous sequence of messages produced by this center for a given subject and message type.
56+
/// - Parameters:
57+
/// - subject: The subject to observe. Specify a metatype to observe all values for a given type.
58+
/// - messageType: The message type to be observed.
59+
/// - limit: The maximum number of messages allowed to buffer.
60+
/// - Returns: An asynchronous sequence of messages produced by this center.
61+
public func messages<Message: AsyncMessage>(
62+
of subject: Message.Subject? = nil,
63+
for messageType: Message.Type,
64+
bufferSize limit: Int = 10
65+
) -> some AsyncSequence<Message, Never> where Message.Subject: AnyObject {
66+
return AsyncMessageSequence<Message>(self, subject, limit)
67+
}
68+
}
69+
70+
extension NotificationCenter {
71+
fileprivate struct AsyncMessageSequence<Message: NotificationCenter.AsyncMessage>: AsyncSequence, Sendable {
72+
let center: NotificationCenter
73+
nonisolated(unsafe) weak var object: AnyObject?
74+
let bufferSize: Int
75+
76+
init(_ center: NotificationCenter, _ object: AnyObject?, _ bufferSize: Int) {
77+
self.center = center
78+
self.object = object
79+
self.bufferSize = bufferSize
80+
}
81+
82+
func makeAsyncIterator() -> AsyncMessageSequenceIterator<Message> {
83+
return AsyncMessageSequenceIterator(center: center, object: object, bufferSize: bufferSize)
84+
}
85+
}
86+
}
87+
88+
extension NotificationCenter {
89+
fileprivate final class AsyncMessageSequenceIterator<Message: NotificationCenter.AsyncMessage>: AsyncIteratorProtocol, Sendable {
90+
typealias Element = Message
91+
typealias Failure = Never
92+
93+
struct State {
94+
var observer: NotificationCenter.ObservationToken?
95+
var continuations: [UnsafeContinuation<Message?, Never>] = []
96+
var buffer = Deque<Message>(minimumCapacity: 1)
97+
let bufferSize: Int
98+
}
99+
100+
struct Resumption {
101+
let message: Message?
102+
let continuations: [UnsafeContinuation<Message?, Never>]
103+
104+
init(message: Message?, continuation: UnsafeContinuation<Message?, Never>) {
105+
self.message = message
106+
self.continuations = [continuation]
107+
}
108+
109+
init(cancelling: [UnsafeContinuation<Message?, Never>]) {
110+
self.message = nil
111+
self.continuations = cancelling
112+
}
113+
114+
func resume() {
115+
for continuation in continuations {
116+
continuation.resume(returning: message)
117+
}
118+
}
119+
}
120+
121+
let state: LockedState<State>
122+
123+
init(center: NotificationCenter, object: AnyObject?, bufferSize: Int) {
124+
self.state = LockedState(initialState: State(bufferSize: bufferSize))
125+
126+
#if FOUNDATION_FRAMEWORK
127+
let observerBlock: @Sendable (Notification) -> Void = { [weak self] notification in
128+
guard let message: Message = NotificationCenter._messageFromNotification(notification) else { return }
129+
130+
self?.observationCallback(message)
131+
}
132+
#else
133+
let observerBlock: @Sendable (Message) -> Void = { [weak self] message in
134+
self?.observationCallback(message)
135+
}
136+
#endif
137+
138+
let token = center._addObserver(Message.name, object: object, using: observerBlock)
139+
140+
self.state.withLock { _state in
141+
_state.observer = ObservationToken(center: center, token: token)
142+
}
143+
}
144+
145+
deinit {
146+
teardown()
147+
}
148+
149+
func teardown() {
150+
let (observer, resumption) = state.withLock { _state -> (NotificationCenter.ObservationToken?, Resumption) in
151+
let observer = _state.observer
152+
_state.observer = nil
153+
_state.buffer.removeAll(keepingCapacity: false)
154+
defer { _state.continuations.removeAll(keepingCapacity: false) }
155+
return (observer, Resumption(cancelling: _state.continuations))
156+
}
157+
158+
resumption.resume()
159+
160+
if let observer {
161+
observer.remove()
162+
}
163+
}
164+
165+
func observationCallback(_ message: Message) {
166+
state.withLock { _state -> Resumption? in
167+
if _state.buffer.count + 1 > _state.bufferSize {
168+
_state.buffer.removeFirst()
169+
#if canImport(os)
170+
NotificationCenter.logger.fault("Notification center message dropped due to buffer limit. Check sequence iterator frequently or increase buffer size. Message: \(String(describing: Message.self))")
171+
#endif
172+
}
173+
_state.buffer.append(message)
174+
175+
if _state.continuations.isEmpty {
176+
return nil
177+
} else {
178+
return Resumption(message: _state.buffer.removeFirst(), continuation: _state.continuations.removeFirst())
179+
}
180+
}?.resume()
181+
}
182+
183+
func next() async -> Message? {
184+
await withTaskCancellationHandler {
185+
return await withUnsafeContinuation { (continuation: UnsafeContinuation<Message?, Never>) in
186+
state.withLock { _state -> Resumption? in
187+
_state.continuations.append(continuation)
188+
if _state.buffer.isEmpty {
189+
return nil
190+
} else {
191+
return Resumption(message: _state.buffer.removeFirst(), continuation: _state.continuations.removeFirst())
192+
}
193+
}?.resume()
194+
}
195+
} onCancel: {
196+
teardown()
197+
}
198+
}
199+
}
200+
}

0 commit comments

Comments
 (0)