Skip to content

Commit 50513c6

Browse files
committed
✨ Adds buffable async publishers.
1 parent 73fc441 commit 50513c6

File tree

9 files changed

+739
-2
lines changed

9 files changed

+739
-2
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<Scheme
3+
LastUpgradeVersion = "1500"
4+
version = "1.7">
5+
<BuildAction
6+
parallelizeBuildables = "YES"
7+
buildImplicitDependencies = "YES">
8+
<BuildActionEntries>
9+
<BuildActionEntry
10+
buildForTesting = "YES"
11+
buildForRunning = "YES"
12+
buildForProfiling = "YES"
13+
buildForArchiving = "YES"
14+
buildForAnalyzing = "YES">
15+
<BuildableReference
16+
BuildableIdentifier = "primary"
17+
BlueprintIdentifier = "BuffableAsyncPublishers"
18+
BuildableName = "BuffableAsyncPublishers"
19+
BlueprintName = "BuffableAsyncPublishers"
20+
ReferencedContainer = "container:">
21+
</BuildableReference>
22+
</BuildActionEntry>
23+
</BuildActionEntries>
24+
</BuildAction>
25+
<TestAction
26+
buildConfiguration = "Debug"
27+
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
28+
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
29+
shouldUseLaunchSchemeArgsEnv = "YES">
30+
<TestPlans>
31+
<TestPlanReference
32+
reference = "container:Tests/BuffableAsyncPublishersTests/BuffableAsyncPublishers.xctestplan"
33+
default = "YES">
34+
</TestPlanReference>
35+
</TestPlans>
36+
</TestAction>
37+
<LaunchAction
38+
buildConfiguration = "Debug"
39+
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
40+
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
41+
launchStyle = "0"
42+
useCustomWorkingDirectory = "NO"
43+
ignoresPersistentStateOnLaunch = "NO"
44+
debugDocumentVersioning = "YES"
45+
debugServiceExtension = "internal"
46+
allowLocationSimulation = "YES">
47+
</LaunchAction>
48+
<ProfileAction
49+
buildConfiguration = "Release"
50+
shouldUseLaunchSchemeArgsEnv = "YES"
51+
savedToolIdentifier = ""
52+
useCustomWorkingDirectory = "NO"
53+
debugDocumentVersioning = "YES">
54+
<MacroExpansion>
55+
<BuildableReference
56+
BuildableIdentifier = "primary"
57+
BlueprintIdentifier = "BuffableAsyncPublishers"
58+
BuildableName = "BuffableAsyncPublishers"
59+
BlueprintName = "BuffableAsyncPublishers"
60+
ReferencedContainer = "container:">
61+
</BuildableReference>
62+
</MacroExpansion>
63+
</ProfileAction>
64+
<AnalyzeAction
65+
buildConfiguration = "Debug">
66+
</AnalyzeAction>
67+
<ArchiveAction
68+
buildConfiguration = "Release"
69+
revealArchiveInOrganizer = "YES">
70+
</ArchiveAction>
71+
</Scheme>

.swiftpm/xcode/xcshareddata/xcschemes/swift-nibbles-Package.xcscheme

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@
127127
shouldUseLaunchSchemeArgsEnv = "YES">
128128
<TestPlans>
129129
<TestPlanReference
130-
reference = "container:Tests/swift-nibbles-Package.xctestplan">
130+
reference = "container:Tests/swift-nibbles-Package.xctestplan"
131+
default = "YES">
131132
</TestPlanReference>
132133
</TestPlans>
133134
<Testables>
@@ -171,6 +172,16 @@
171172
ReferencedContainer = "container:">
172173
</BuildableReference>
173174
</TestableReference>
175+
<TestableReference
176+
skipped = "NO">
177+
<BuildableReference
178+
BuildableIdentifier = "primary"
179+
BlueprintIdentifier = "BuffableAsyncPublishersTests"
180+
BuildableName = "BuffableAsyncPublishersTests"
181+
BlueprintName = "BuffableAsyncPublishersTests"
182+
ReferencedContainer = "container:">
183+
</BuildableReference>
184+
</TestableReference>
174185
</Testables>
175186
</TestAction>
176187
<LaunchAction

Package.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,17 @@ let package = Package(
1111
.watchOS(.v9),
1212
],
1313
products: [
14+
.library(name: "BuffableAsyncPublishers", targets: ["BuffableAsyncPublishers"]),
1415
.library(name: "Cache", targets: ["Cache"]),
1516
.library(name: "Extensions", targets: ["Extensions"]),
1617
.library(name: "HTTPNetworking", targets: ["HTTPNetworking"]),
1718
.library(name: "Identified", targets: ["Identified"]),
1819
.plugin(name: "Create TCA Feature", targets: ["Create TCA Feature"])
1920
],
2021
targets: [
22+
.target(name: "BuffableAsyncPublishers"),
23+
.testTarget(name: "BuffableAsyncPublishersTests", dependencies: ["BuffableAsyncPublishers"]),
24+
2125
.target(name: "Cache"),
2226
.testTarget(name: "CacheTests", dependencies: ["Cache"]),
2327

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
//
2+
// MIT License
3+
//
4+
// Copyright (c) 2023 Connor Ricks
5+
//
6+
// Permission is hereby granted, free of charge, to any person obtaining a copy
7+
// of this software and associated documentation files (the "Software"), to deal
8+
// in the Software without restriction, including without limitation the rights
9+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
// copies of the Software, and to permit persons to whom the Software is
11+
// furnished to do so, subject to the following conditions:
12+
//
13+
// The above copyright notice and this permission notice shall be included in all
14+
// copies or substantial portions of the Software.
15+
//
16+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
// SOFTWARE.
23+
24+
import Combine
25+
import Foundation
26+
27+
// MARK: - BuffableAsyncPublisher
28+
29+
/// A publisher that exposes its elements as an asynchronous sequence with a specified buffering policy..
30+
///
31+
/// `BuffableAsyncPublisher` conforms to <doc://com.apple.documentation/documentation/Swift/AsyncSequence>, which allows callers to receive values with the `for`-`await`-`in` syntax, rather than attaching a ``Subscriber``.
32+
///
33+
/// Use the ``values(bufferingPolicy:)`` property of the ``Combine/Publisher`` protocol to wrap an existing publisher with an instance of this type.
34+
public class BuffableAsyncPublisher<P> : AsyncSequence where P: Publisher, P.Failure == Never {
35+
36+
/// The type of element produced by this asynchronous sequence.
37+
public typealias Element = P.Output
38+
39+
/// The iterator produced by this publisher.
40+
public typealias AsyncIterator = BuffableAsyncPublisher<P>
41+
42+
// MARK: Properties
43+
44+
private let stream: AsyncStream<Element>
45+
46+
private lazy var iterator = stream.makeAsyncIterator()
47+
48+
private var cancellable: AnyCancellable?
49+
50+
// MARK: Initializers
51+
52+
/// Creates a publisher that exposes elements received from an upstream publisher as an asynchronous sequence.
53+
///
54+
/// - Parameter publisher: An upstream publisher. The asynchronous publisher converts elements received from this publisher into an asynchronous sequence.
55+
public init(_ publisher: P, bufferingPolicy: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) {
56+
var subscription: AnyCancellable? = nil
57+
stream = AsyncStream(bufferingPolicy: bufferingPolicy) { continuation in
58+
subscription = publisher
59+
.handleEvents(receiveCancel: {
60+
continuation.finish()
61+
})
62+
.sink(receiveCompletion: { completion in
63+
switch completion {
64+
case .finished:
65+
continuation.finish()
66+
}
67+
}, receiveValue: { value in
68+
continuation.yield(value)
69+
})
70+
}
71+
72+
cancellable = subscription
73+
}
74+
75+
// MARK: AsyncSequence
76+
77+
public func makeAsyncIterator() -> Self { self }
78+
}
79+
80+
// MARK: - BuffableAsyncPublisher + AsyncIteratorProtocol
81+
82+
extension BuffableAsyncPublisher: AsyncIteratorProtocol {
83+
public func next() async -> Element? {
84+
await iterator.next()
85+
}
86+
}
87+
88+
// MARK: - BuffableAsyncPublisher + Cancellable
89+
90+
extension BuffableAsyncPublisher: Cancellable {
91+
public func cancel() {
92+
cancellable?.cancel()
93+
cancellable = nil
94+
}
95+
}
96+
97+
// MARK: - Publisher + BuffableAsyncPublisher
98+
99+
extension Publisher where Failure == Never {
100+
/// The elements produced by the publisher, as an asynchronous sequence.
101+
/// - Parameter bufferingPolicy: By providing a buffering policy, you can customize the behavior when sequence publishes values faster than they can be handled.
102+
///
103+
/// This property provides an ``BuffableAsyncPublisher``, which allows you to use the Swift `async`-`await` syntax to receive the publisher's elements. Because ``BuffableAsyncPublisher`` conforms to <doc://com.apple.documentation/documentation/Swift/AsyncSequence>, you iterate over its elements with a `for`-`await`-`in` loop, rather than attaching a subscriber.
104+
func values(bufferingPolicy: AsyncStream<Output>.Continuation.BufferingPolicy) -> BuffableAsyncPublisher<Self> {
105+
BuffableAsyncPublisher(self, bufferingPolicy: bufferingPolicy)
106+
}
107+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
//
2+
// MIT License
3+
//
4+
// Copyright (c) 2023 Connor Ricks
5+
//
6+
// Permission is hereby granted, free of charge, to any person obtaining a copy
7+
// of this software and associated documentation files (the "Software"), to deal
8+
// in the Software without restriction, including without limitation the rights
9+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
// copies of the Software, and to permit persons to whom the Software is
11+
// furnished to do so, subject to the following conditions:
12+
//
13+
// The above copyright notice and this permission notice shall be included in all
14+
// copies or substantial portions of the Software.
15+
//
16+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
// SOFTWARE.
23+
24+
import Combine
25+
import Foundation
26+
27+
// MARK: - BuffableAsyncThrowingPublisher
28+
29+
/// A publisher that exposes its elements as a asynchronous throwing sequence with a specified buffering policy..
30+
///
31+
/// `BuffableAsyncThrowingPublisher` conforms to <doc://com.apple.documentation/documentation/Swift/AsyncSequence>, which allows callers to receive values with the `for`-`await`-`in` syntax, rather than attaching a ``Subscriber``.
32+
///
33+
/// Use the ``values(bufferingPolicy:)`` property of the ``Combine/Publisher`` protocol to wrap an existing publisher with an instance of this type.
34+
public class BuffableAsyncThrowingPublisher<P>: AsyncSequence where P: Publisher {
35+
36+
/// The type of element produced by this asynchronous sequence.
37+
public typealias Element = P.Output
38+
39+
/// The iterator produced by this publisher.
40+
public typealias AsyncIterator = BuffableAsyncThrowingPublisher<P>
41+
42+
// MARK: Properties
43+
44+
private let stream: AsyncThrowingStream<Element, Error>
45+
46+
private lazy var iterator = stream.makeAsyncIterator()
47+
48+
private var cancellable: AnyCancellable?
49+
50+
// MARK: Initializers
51+
52+
/// Creates a publisher that exposes elements received from an upstream publisher as an asynchronous throwing sequence.
53+
///
54+
/// - Parameter publisher: An upstream publisher. The asynchronous publisher converts elements received from this publisher into an asynchronous throwing sequence.
55+
public init(_ publisher: P, bufferingPolicy: AsyncThrowingStream<Element, Error>.Continuation.BufferingPolicy = .unbounded) {
56+
var subscription: AnyCancellable? = nil
57+
58+
stream = AsyncThrowingStream(bufferingPolicy: bufferingPolicy) { continuation in
59+
subscription = publisher
60+
.handleEvents(receiveCancel: {
61+
continuation.finish(throwing: nil)
62+
})
63+
.sink(receiveCompletion: { completion in
64+
switch completion {
65+
case .finished:
66+
continuation.finish()
67+
case .failure(let error):
68+
continuation.finish(throwing: error)
69+
}
70+
}, receiveValue: { value in
71+
continuation.yield(value)
72+
})
73+
}
74+
75+
cancellable = subscription
76+
}
77+
78+
// MARK: AsyncSequence
79+
80+
public func makeAsyncIterator() -> Self { self }
81+
}
82+
83+
// MARK: - BuffableAsyncThrowingPublisher + AsyncIteratorProtocol
84+
85+
extension BuffableAsyncThrowingPublisher: AsyncIteratorProtocol {
86+
public func next() async throws -> Element? {
87+
try await iterator.next()
88+
}
89+
}
90+
91+
// MARK: - BuffableAsyncThrowingPublisher + Cancellable
92+
93+
extension BuffableAsyncThrowingPublisher: Cancellable {
94+
public func cancel() {
95+
cancellable?.cancel()
96+
cancellable = nil
97+
}
98+
}
99+
100+
// MARK: - Publisher + BuffableAsyncThrowingPublisher
101+
102+
extension Publisher {
103+
/// The elements produced by the publisher, as an asynchronous throwing sequence.
104+
/// - Parameter bufferingPolicy: By providing a buffering policy, you can customize the behavior when sequence publishes values faster than they can be handled.
105+
///
106+
/// This property provides an ``BuffableAsyncThrowingPublisher``, which allows you to use the Swift `async`-`await` syntax to receive the publisher's elements. Because ``BuffableAsyncThrowingPublisher`` conforms to <doc://com.apple.documentation/documentation/Swift/AsyncSequence>, you iterate over its elements with a `for`-`await`-`in` loop, rather than attaching a subscriber.
107+
@_disfavoredOverload
108+
func values(bufferingPolicy: AsyncThrowingStream<Output, Error>.Continuation.BufferingPolicy) -> BuffableAsyncThrowingPublisher<Self> {
109+
BuffableAsyncThrowingPublisher(self, bufferingPolicy: bufferingPolicy)
110+
}
111+
}

Sources/Extensions/Publisher+Sinks.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ extension Publisher {
6868
/// - receiveError: The closure to execute on completion due to an error.
6969
/// - receiveCompletion: The closure to execute on normal completion.
7070
/// - Returns: A cancellable instance, which you use when you end assignment of the received value. Deallocation of the result will tear down the subscription stream.
71+
@_disfavoredOverload
7172
public func sink(
7273
receiveValue: @escaping (Output) -> Void,
7374
receiveError: @escaping (Failure) -> Void,
@@ -117,6 +118,7 @@ extension Publisher {
117118
/// - receiveCompletion: The closure to execute on normal completion.
118119
/// - bag: The `DisposableBag` that should store this subscriber.
119120
/// - Returns: A cancellable instance, which you use when you end assignment of the received value. Deallocation of the result will tear down the subscription stream.
121+
@_disfavoredOverload
120122
@discardableResult
121123
public func sink(
122124
receiveValue: @escaping (Output) -> Void,
@@ -196,7 +198,7 @@ private extension Subscribers.Sink {
196198
switch completion {
197199
case .failure(let error):
198200
receiveError(error)
199-
case.finished:
201+
case .finished:
200202
receiveCompletion?()
201203
}
202204
},

0 commit comments

Comments
 (0)