Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ else()
message(STATUS "_SwiftCollections_SourceDIR not provided, checking out local copy of swift-collections")
FetchContent_Declare(SwiftCollections
GIT_REPOSITORY https://github.com/apple/swift-collections.git
GIT_TAG 1.1.2)
GIT_TAG 1.2.1)
endif()
FetchContent_MakeAvailable(SwiftFoundationICU SwiftCollections)

Expand Down
6 changes: 4 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var dependencies: [Package.Dependency] {
[
.package(
url: "https://github.com/apple/swift-collections",
from: "1.1.0"),
from: "1.2.1"),
.package(
url: "https://github.com/apple/swift-foundation-icu",
branch: "main"),
Expand Down Expand Up @@ -110,6 +110,7 @@ let package = Package(
"_FoundationCShims",
"FoundationMacros",
.product(name: "_RopeModule", package: "swift-collections"),
.product(name: "DequeModule", package: "swift-collections"),
.product(name: "OrderedCollections", package: "swift-collections"),
],
exclude: [
Expand All @@ -128,7 +129,8 @@ let package = Package(
"CMakeLists.txt",
"ProcessInfo/CMakeLists.txt",
"FileManager/CMakeLists.txt",
"URL/CMakeLists.txt"
"URL/CMakeLists.txt",
"NotificationCenter/CMakeLists.txt"
],
cSettings: [
.define("_GNU_SOURCE", .when(platforms: [.linux]))
Expand Down
1 change: 1 addition & 0 deletions Sources/FoundationEssentials/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ add_subdirectory(FileManager)
add_subdirectory(Formatting)
add_subdirectory(JSON)
add_subdirectory(Locale)
add_subdirectory(NotificationCenter)
add_subdirectory(Predicate)
add_subdirectory(ProcessInfo)
add_subdirectory(PropertyList)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

#if FOUNDATION_FRAMEWORK
internal class _NotificationCenterActorQueueManagerNSObjectWrapper: NSObject, @unchecked Sendable {}
#else
internal class _NotificationCenterActorQueueManagerNSObjectWrapper: @unchecked Sendable {}
#endif

#if FOUNDATION_FRAMEWORK
@objc(_NotificationCenterActorQueueManager)
#endif
internal final class _NotificationCenterActorQueueManager: _NotificationCenterActorQueueManagerNSObjectWrapper, @unchecked Sendable {
#if !NO_FILESYSTEM
struct State {
var buffer = [@Sendable () async -> Void]()
var continuation: UnsafeContinuation<(@Sendable () async -> Void)?, Never>?
var isCancelled: Bool = false

static func waitForWork(_ state: LockedState<State>) async -> (@Sendable () async -> Void)? {
return await withTaskCancellationHandler {
return await withUnsafeContinuation { continuation in
let (work, resumeContinuation) = state.withLock { state -> ((@Sendable () async -> Void)?, Bool) in
if state.isCancelled {
return (nil, true)
} else {
if state.buffer.isEmpty {
assert(state.continuation == nil)
state.continuation = continuation
return (nil, false)
} else {
return (state.buffer.removeFirst(), true)
}
}
}
if resumeContinuation {
continuation.resume(returning: work)
}
}
} onCancel: {
state.withLock { state in
state.isCancelled = true
defer {
state.continuation = nil
}
return state.continuation
}?.resume(returning: nil)
}
}
}

let state: LockedState<State>
let workerTask: Task<(), Never>

override init() {
state = LockedState(initialState: State())
workerTask = Task.detached { [state] in
await withDiscardingTaskGroup { group in
while let work = await State.waitForWork(state) {
group.addTask(operation: work)
}
}
}
super.init()
}

deinit {
workerTask.cancel()
}

func enqueue(_ work: @escaping @Sendable () async -> Void) {
state.withLock { state in
state.buffer.append(work)
if let continuation = state.continuation {
state.continuation = nil
let item = state.buffer.removeFirst()
continuation.resume(returning: item)
}
}
}
#endif
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2025 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

#if FOUNDATION_FRAMEWORK
internal import _ForSwiftFoundation
internal import CollectionsInternal
#elseif canImport(DequeModule)
internal import DequeModule
#elseif canImport(_FoundationCollections)
internal import _FoundationCollections
#endif
#if canImport(os)
internal import os.log
#endif

@available(FoundationPreview 6.2, *)
extension NotificationCenter {
/// Returns an asynchronous sequence of messages produced by this center for a given subject and identifier.
/// - Parameters:
/// - subject: The subject to observe. Specify a metatype to observe all values for a given type.
/// - identifier: An identifier representing a specific message type.
/// - limit: The maximum number of messages allowed to buffer.
/// - Returns: An asynchronous sequence of messages produced by this center.
public func messages<Identifier: MessageIdentifier, Message: AsyncMessage>(
of subject: Message.Subject,
for identifier: Identifier,
bufferSize limit: Int = 10
) -> some AsyncSequence<Message, Never> where Identifier.MessageType == Message, Message.Subject: AnyObject {
return AsyncMessageSequence<Message>(self, subject, limit)
}

/// Returns an asynchronous sequence of messages produced by this center for a given subject type and identifier.
/// - Parameters:
/// - subject: The metatype to observe all values for a given type.
/// - identifier: An identifier representing a specific message type.
/// - limit: The maximum number of messages allowed to buffer.
/// - Returns: An asynchronous sequence of messages produced by this center.
public func messages<Identifier: MessageIdentifier, Message: AsyncMessage>(
of subject: Message.Subject.Type,
for identifier: Identifier,
bufferSize limit: Int = 10
) -> some AsyncSequence<Message, Never> where Identifier.MessageType == Message {
return AsyncMessageSequence<Message>(self, nil, limit)
}

/// Returns an asynchronous sequence of messages produced by this center for a given subject and message type.
/// - Parameters:
/// - subject: The subject to observe. Specify a metatype to observe all values for a given type.
/// - messageType: The message type to be observed.
/// - limit: The maximum number of messages allowed to buffer.
/// - Returns: An asynchronous sequence of messages produced by this center.
public func messages<Message: AsyncMessage>(
of subject: Message.Subject? = nil,
for messageType: Message.Type,
bufferSize limit: Int = 10
) -> some AsyncSequence<Message, Never> where Message.Subject: AnyObject {
return AsyncMessageSequence<Message>(self, subject, limit)
}
}

extension NotificationCenter {
fileprivate struct AsyncMessageSequence<Message: NotificationCenter.AsyncMessage>: AsyncSequence, Sendable {
let center: NotificationCenter
nonisolated(unsafe) weak var object: AnyObject?
let bufferSize: Int

init(_ center: NotificationCenter, _ object: AnyObject?, _ bufferSize: Int) {
self.center = center
self.object = object
self.bufferSize = bufferSize
}

func makeAsyncIterator() -> AsyncMessageSequenceIterator<Message> {
return AsyncMessageSequenceIterator(center: center, object: object, bufferSize: bufferSize)
}
}
}

extension NotificationCenter {
fileprivate final class AsyncMessageSequenceIterator<Message: NotificationCenter.AsyncMessage>: AsyncIteratorProtocol, Sendable {
typealias Element = Message
typealias Failure = Never

struct State {
var observer: NotificationCenter.ObservationToken?
var continuations: [UnsafeContinuation<Message?, Never>] = []
var buffer = Deque<Message>(minimumCapacity: 1)
let bufferSize: Int
}

struct Resumption {
let message: Message?
let continuations: [UnsafeContinuation<Message?, Never>]

init(message: Message?, continuation: UnsafeContinuation<Message?, Never>) {
self.message = message
self.continuations = [continuation]
}

init(cancelling: [UnsafeContinuation<Message?, Never>]) {
self.message = nil
self.continuations = cancelling
}

func resume() {
for continuation in continuations {
continuation.resume(returning: message)
}
}
}

let state: LockedState<State>

init(center: NotificationCenter, object: AnyObject?, bufferSize: Int) {
self.state = LockedState(initialState: State(bufferSize: bufferSize))

#if FOUNDATION_FRAMEWORK
let observerBlock: @Sendable (Notification) -> Void = { [weak self] notification in
guard let message: Message = NotificationCenter._messageFromNotification(notification) else { return }

self?.observationCallback(message)
}
#else
let observerBlock: @Sendable (Message) -> Void = { [weak self] message in
self?.observationCallback(message)
}
#endif

let token = center._addObserver(Message.name, object: object, using: observerBlock)

self.state.withLock { _state in
_state.observer = ObservationToken(center: center, token: token)
}
}

deinit {
teardown()
}

func teardown() {
let (observer, resumption) = state.withLock { _state -> (NotificationCenter.ObservationToken?, Resumption) in
let observer = _state.observer
_state.observer = nil
_state.buffer.removeAll(keepingCapacity: false)
defer { _state.continuations.removeAll(keepingCapacity: false) }
return (observer, Resumption(cancelling: _state.continuations))
}

resumption.resume()

if let observer {
observer.remove()
}
}

func observationCallback(_ message: Message) {
state.withLock { _state -> Resumption? in
if _state.buffer.count + 1 > _state.bufferSize {
_state.buffer.removeFirst()
#if canImport(os)
NotificationCenter.logger.fault("Notification center message dropped due to buffer limit. Check sequence iterator frequently or increase buffer size. Message: \(String(describing: Message.self))")
#endif
}
_state.buffer.append(message)

if _state.continuations.isEmpty {
return nil
} else {
return Resumption(message: _state.buffer.removeFirst(), continuation: _state.continuations.removeFirst())
}
}?.resume()
}

func next() async -> Message? {
await withTaskCancellationHandler {
return await withUnsafeContinuation { (continuation: UnsafeContinuation<Message?, Never>) in
state.withLock { _state -> Resumption? in
_state.continuations.append(continuation)
if _state.buffer.isEmpty {
return nil
} else {
return Resumption(message: _state.buffer.removeFirst(), continuation: _state.continuations.removeFirst())
}
}?.resume()
}
} onCancel: {
teardown()
}
}
}
}
Loading