@@ -68,65 +68,79 @@ struct observe_on
6868 , destination(std::move(d))
6969 {
7070 }
71+
72+ void finish (std::unique_lock<std::mutex>& guard, typename mode::type end) const {
73+ if (!guard.owns_lock ()) {
74+ abort ();
75+ }
76+ if (current == mode::Errored || current == mode::Disposed) {return ;}
77+ current = end;
78+ queue_type fill_expired;
79+ swap (fill_expired, fill_queue);
80+ queue_type drain_expired;
81+ swap (drain_expired, drain_queue);
82+ RXCPP_UNWIND_AUTO ([&](){guard.lock ();});
83+ guard.unlock ();
84+ lifetime.unsubscribe ();
85+ destination.unsubscribe ();
86+ }
7187
7288 void ensure_processing (std::unique_lock<std::mutex>& guard) const {
7389 if (!guard.owns_lock ()) {
7490 abort ();
7591 }
7692 if (current == mode::Empty) {
7793 current = mode::Processing;
94+
95+ if (!lifetime.is_subscribed () && fill_queue.empty () && drain_queue.empty ()) {
96+ finish (guard, mode::Disposed);
97+ }
7898
7999 auto keepAlive = this ->shared_from_this ();
80-
100+
81101 auto drain = [keepAlive, this ](const rxsc::schedulable& self){
82102 using std::swap;
83103 try {
84- if (drain_queue.empty () || !destination.is_subscribed ()) {
85- std::unique_lock<std::mutex> guard (lock);
86- if (!destination.is_subscribed () ||
87- (!lifetime.is_subscribed () && fill_queue.empty () && drain_queue.empty ())) {
88- current = mode::Disposed;
89- queue_type expired;
90- swap (expired, fill_queue);
91- guard.unlock ();
92- lifetime.unsubscribe ();
93- destination.unsubscribe ();
94- return ;
95- }
96- if (drain_queue.empty ()) {
97- if (fill_queue.empty ()) {
98- current = mode::Empty;
104+ for (;;) {
105+ if (drain_queue.empty () || !destination.is_subscribed ()) {
106+ std::unique_lock<std::mutex> guard (lock);
107+ if (!destination.is_subscribed () ||
108+ (!lifetime.is_subscribed () && fill_queue.empty () && drain_queue.empty ())) {
109+ finish (guard, mode::Disposed);
99110 return ;
100111 }
101- swap (fill_queue, drain_queue);
112+ if (drain_queue.empty ()) {
113+ if (fill_queue.empty ()) {
114+ current = mode::Empty;
115+ return ;
116+ }
117+ swap (fill_queue, drain_queue);
118+ }
102119 }
120+ auto notification = std::move (drain_queue.front ());
121+ drain_queue.pop_front ();
122+ notification->accept (destination);
123+ std::unique_lock<std::mutex> guard (lock);
124+ self ();
125+ if (lifetime.is_subscribed ()) break ;
103126 }
104- auto notification = std::move (drain_queue.front ());
105- drain_queue.pop_front ();
106- notification->accept (destination);
107- self ();
108127 } catch (...) {
109128 destination.on_error (std::current_exception ());
110129 std::unique_lock<std::mutex> guard (lock);
111- current = mode::Errored;
112- queue_type expired;
113- swap (expired, fill_queue);
130+ finish (guard, mode::Errored);
114131 }
115132 };
116133
117134 auto selectedDrain = on_exception (
118135 [&](){return coordinator.act (drain);},
119136 destination);
120137 if (selectedDrain.empty ()) {
121- current = mode::Errored;
122- using std::swap;
123- queue_type expired;
124- swap (expired, fill_queue);
138+ finish (guard, mode::Errored);
125139 return ;
126140 }
127141
128142 auto processor = coordinator.get_worker ();
129-
143+
130144 RXCPP_UNWIND_AUTO ([&](){guard.lock ();});
131145 guard.unlock ();
132146
@@ -143,16 +157,19 @@ struct observe_on
143157
144158 void on_next (source_value_type v) const {
145159 std::unique_lock<std::mutex> guard (state->lock );
160+ if (state->current == mode::Errored || state->current == mode::Disposed) { return ; }
146161 state->fill_queue .push_back (notification_type::on_next (std::move (v)));
147162 state->ensure_processing (guard);
148163 }
149164 void on_error (std::exception_ptr e) const {
150165 std::unique_lock<std::mutex> guard (state->lock );
166+ if (state->current == mode::Errored || state->current == mode::Disposed) { return ; }
151167 state->fill_queue .push_back (notification_type::on_error (e));
152168 state->ensure_processing (guard);
153169 }
154170 void on_completed () const {
155171 std::unique_lock<std::mutex> guard (state->lock );
172+ if (state->current == mode::Errored || state->current == mode::Disposed) { return ; }
156173 state->fill_queue .push_back (notification_type::on_completed ());
157174 state->ensure_processing (guard);
158175 }
@@ -163,7 +180,7 @@ struct observe_on
163180
164181 this_type o (d, std::move (coor), cs);
165182 auto keepAlive = o.state ;
166- cs.add ([keepAlive ](){
183+ cs.add ([= ](){
167184 std::unique_lock<std::mutex> guard (keepAlive->lock );
168185 keepAlive->ensure_processing (guard);
169186 });
@@ -262,6 +279,11 @@ class observe_on_one_worker : public coordination_base
262279 }
263280};
264281
282+ inline observe_on_one_worker observe_on_run_loop (const rxsc::run_loop& rl) {
283+ static observe_on_one_worker r (rxsc::make_run_loop (rl));
284+ return r;
285+ }
286+
265287inline observe_on_one_worker observe_on_event_loop () {
266288 static observe_on_one_worker r (rxsc::make_event_loop ());
267289 return r;
0 commit comments