Skip to content

Commit d718b17

Browse files
committed
Convert any Signal, Producer or Property to a Combine publisher via publisher().
1 parent e27ccdb commit d718b17

File tree

2 files changed

+189
-0
lines changed

2 files changed

+189
-0
lines changed

ReactiveSwift.xcodeproj/project.pbxproj

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@
6666
9A1D067D1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
6767
9A1D067E1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
6868
9A1D067F1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; };
69+
9A5865DF244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; };
70+
9A5865E0244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; };
71+
9A5865E1244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; };
72+
9A5865E2244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; };
6973
9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
7074
9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
7175
9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; };
@@ -240,6 +244,7 @@
240244
9A1A4F981E16961C006F3039 /* ValidatingPropertySpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingPropertySpec.swift; sourceTree = "<group>"; };
241245
9A1B824020835EEC00EB7C09 /* ResultExtensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ResultExtensions.swift; sourceTree = "<group>"; };
242246
9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UnidirectionalBindingSpec.swift; sourceTree = "<group>"; };
247+
9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CombineInteroperability.swift; sourceTree = "<group>"; };
243248
9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UninhabitedTypeGuards.swift; sourceTree = "<group>"; };
244249
9A681A9D1E5A241B00B097CF /* DeprecationSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DeprecationSpec.swift; sourceTree = "<group>"; };
245250
9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingProperty.swift; sourceTree = "<group>"; };
@@ -454,6 +459,7 @@
454459
9A090C131DA0309E00EE97CA /* Reactive.swift */,
455460
D0C312C819EF2A5800984962 /* Scheduler.swift */,
456461
C79B647B1CD52E23003F2376 /* EventLogger.swift */,
462+
9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */,
457463
D03B4A3919F4C25F009E02AC /* Signals */,
458464
D03B4A3B19F4C281009E02AC /* Extensions */,
459465
9ABCB1841D2A5B5A00BCA243 /* Deprecations+Removals.swift */,
@@ -869,6 +875,7 @@
869875
57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */,
870876
57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */,
871877
9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
878+
9A5865E2244CEF9800AADB58 /* CombineInteroperability.swift in Sources */,
872879
57A4D1B91BA13D7A00F7D4B1 /* Action.swift in Sources */,
873880
57A4D1BA1BA13D7A00F7D4B1 /* Property.swift in Sources */,
874881
9A090C171DA0309E00EE97CA /* Reactive.swift in Sources */,
@@ -924,6 +931,7 @@
924931
A9B315BE1B3940810001CB9C /* Event.swift in Sources */,
925932
A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */,
926933
9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */,
934+
9A5865E1244CEF9800AADB58 /* CombineInteroperability.swift in Sources */,
927935
A9B315C11B3940810001CB9C /* Action.swift in Sources */,
928936
A9B315C21B3940810001CB9C /* Property.swift in Sources */,
929937
9A090C161DA0309E00EE97CA /* Reactive.swift in Sources */,
@@ -952,6 +960,7 @@
952960
D0C312D319EF2A5800984962 /* Disposable.swift in Sources */,
953961
9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */,
954962
EBCC7DBC1BBF010C00A2AE92 /* Observer.swift in Sources */,
963+
9A5865DF244CEF9800AADB58 /* CombineInteroperability.swift in Sources */,
955964
D03B4A3D19F4C39A009E02AC /* FoundationExtensions.swift in Sources */,
956965
9A090C141DA0309E00EE97CA /* Reactive.swift in Sources */,
957966
D08C54B31A69A2AE00AD8286 /* Signal.swift in Sources */,
@@ -1007,6 +1016,7 @@
10071016
D0C312D419EF2A5800984962 /* Disposable.swift in Sources */,
10081017
D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */,
10091018
9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */,
1019+
9A5865E0244CEF9800AADB58 /* CombineInteroperability.swift in Sources */,
10101020
9ABCB1861D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */,
10111021
EBCC7DBD1BBF01E100A2AE92 /* Observer.swift in Sources */,
10121022
9A090C151DA0309E00EE97CA /* Reactive.swift in Sources */,
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#if canImport(Combine)
2+
import Combine
3+
4+
extension SignalProducerConvertible {
5+
@available(macOS 10.15, *)
6+
@available(iOS 13.0, *)
7+
@available(tvOS 13.0, *)
8+
@available(macCatalyst 13.0, *)
9+
@available(watchOS 6.0, *)
10+
public func publisher() -> ProducerPublisher<Value, Error> {
11+
ProducerPublisher(base: producer)
12+
}
13+
}
14+
15+
@available(macOS 10.15, *)
16+
@available(iOS 13.0, *)
17+
@available(tvOS 13.0, *)
18+
@available(macCatalyst 13.0, *)
19+
@available(watchOS 6.0, *)
20+
public struct ProducerPublisher<Output, Failure: Swift.Error>: Publisher {
21+
public let base: SignalProducer<Output, Failure>
22+
23+
public init(base: SignalProducer<Output, Failure>) {
24+
self.base = base
25+
}
26+
27+
public func receive<S>(subscriber: S) where S : Subscriber, Output == S.Input, Failure == S.Failure {
28+
let subscription = Subscription(subscriber: subscriber, base: base)
29+
subscription.bootstrap()
30+
}
31+
32+
final class Subscription<S: Subscriber>: Combine.Subscription where Output == S.Input, Failure == S.Failure {
33+
let subscriber: S
34+
let base: SignalProducer<Output, Failure>
35+
let state: Atomic<State>
36+
37+
init(subscriber: S, base: SignalProducer<Output, Failure>) {
38+
self.subscriber = subscriber
39+
self.base = base
40+
self.state = Atomic(State())
41+
}
42+
43+
func bootstrap() {
44+
subscriber.receive(subscription: self)
45+
}
46+
47+
func request(_ incoming: Subscribers.Demand) {
48+
let response: DemandResponse = state.modify { state in
49+
guard state.hasCancelled == false else {
50+
return .noAction
51+
}
52+
53+
guard state.hasStarted else {
54+
state.hasStarted = true
55+
state.requested = incoming
56+
return .startUpstream
57+
}
58+
59+
state.requested = state.requested + incoming
60+
let unsatified = state.requested - state.satisfied
61+
62+
if let max = unsatified.max {
63+
let dequeueCount = Swift.min(state.buffer.count, max)
64+
state.satisfied += dequeueCount
65+
66+
defer { state.buffer.removeFirst(dequeueCount) }
67+
return .satisfyDemand(Array(state.buffer.prefix(dequeueCount)))
68+
} else {
69+
defer { state.buffer = [] }
70+
return .satisfyDemand(state.buffer)
71+
}
72+
}
73+
74+
switch response {
75+
case let .satisfyDemand(output):
76+
var demand: Subscribers.Demand = .none
77+
78+
for output in output {
79+
demand += subscriber.receive(output)
80+
}
81+
82+
if demand != .none {
83+
request(demand)
84+
}
85+
86+
case .startUpstream:
87+
let disposable = base.start { [weak self] event in
88+
guard let self = self else { return }
89+
90+
switch event {
91+
case let .value(output):
92+
let (shouldSendImmediately, isDemandUnlimited): (Bool, Bool) = self.state.modify { state in
93+
guard state.hasCancelled == false else { return (false, false) }
94+
95+
let unsatified = state.requested - state.satisfied
96+
97+
if let count = unsatified.max, count >= 1 {
98+
assert(state.buffer.count == 0)
99+
state.satisfied += 1
100+
return (true, false)
101+
} else if unsatified == .unlimited {
102+
assert(state.buffer.isEmpty)
103+
return (true, true)
104+
} else {
105+
assert(state.requested == state.satisfied)
106+
state.buffer.append(output)
107+
return (false, false)
108+
}
109+
}
110+
111+
if shouldSendImmediately {
112+
let demand = self.subscriber.receive(output)
113+
114+
if isDemandUnlimited == false && demand != .none {
115+
self.request(demand)
116+
}
117+
}
118+
119+
case .completed, .interrupted:
120+
self.cancel()
121+
self.subscriber.receive(completion: .finished)
122+
123+
case let .failed(error):
124+
self.cancel()
125+
self.subscriber.receive(completion: .failure(error))
126+
}
127+
}
128+
129+
let shouldDispose: Bool = state.modify { state in
130+
guard state.hasCancelled == false else { return true }
131+
state.producerSubscription = disposable
132+
return false
133+
}
134+
135+
if shouldDispose {
136+
disposable.dispose()
137+
}
138+
139+
case .noAction:
140+
break
141+
}
142+
}
143+
144+
func cancel() {
145+
let disposable = state.modify { $0.cancel() }
146+
disposable?.dispose()
147+
}
148+
149+
struct State {
150+
var requested: Subscribers.Demand = .none
151+
var satisfied: Subscribers.Demand = .none
152+
153+
var buffer: [Output] = []
154+
155+
var producerSubscription: Disposable?
156+
var hasStarted = false
157+
var hasCancelled = false
158+
159+
init() {
160+
producerSubscription = nil
161+
hasStarted = false
162+
hasCancelled = false
163+
}
164+
165+
mutating func cancel() -> Disposable? {
166+
hasCancelled = true
167+
defer { producerSubscription = nil }
168+
return producerSubscription
169+
}
170+
}
171+
172+
enum DemandResponse {
173+
case startUpstream
174+
case satisfyDemand([Output])
175+
case noAction
176+
}
177+
}
178+
}
179+
#endif

0 commit comments

Comments
 (0)