1+ /**
2+ * Copyright 2015 Netflix, Inc.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+ * use this file except in compliance with the License. You may obtain a copy of
6+ * the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+ * License for the specific language governing permissions and limitations under
14+ * the License.
15+ */
16+ package rx .internal .producers ;
17+
18+ import rx .*;
19+
20+ /**
21+ * Producer that allows changing an underlying producer atomically and correctly resume with the accumulated
22+ * requests.
23+ */
24+ public final class ProducerArbiter implements Producer {
25+ long requested ;
26+ Producer currentProducer ;
27+
28+ boolean emitting ;
29+ long missedRequested ;
30+ long missedProduced ;
31+ Producer missedProducer ;
32+
33+ static final Producer NULL_PRODUCER = new Producer () {
34+ @ Override
35+ public void request (long n ) {
36+
37+ }
38+ };
39+
40+ @ Override
41+ public void request (long n ) {
42+ if (n < 0 ) {
43+ throw new IllegalArgumentException ("n >= 0 required" );
44+ }
45+ if (n == 0 ) {
46+ return ;
47+ }
48+ synchronized (this ) {
49+ if (emitting ) {
50+ missedRequested += n ;
51+ return ;
52+ }
53+ emitting = true ;
54+ }
55+ boolean skipFinal = false ;
56+ try {
57+ long r = requested ;
58+ long u = r + n ;
59+ if (u < 0 ) {
60+ u = Long .MAX_VALUE ;
61+ }
62+ requested = u ;
63+
64+ Producer p = currentProducer ;
65+ if (p != null ) {
66+ p .request (n );
67+ }
68+
69+ emitLoop ();
70+ skipFinal = true ;
71+ } finally {
72+ if (!skipFinal ) {
73+ synchronized (this ) {
74+ emitting = false ;
75+ }
76+ }
77+ }
78+ }
79+
80+ public void produced (long n ) {
81+ if (n <= 0 ) {
82+ throw new IllegalArgumentException ("n > 0 required" );
83+ }
84+ synchronized (this ) {
85+ if (emitting ) {
86+ missedProduced += n ;
87+ return ;
88+ }
89+ emitting = true ;
90+ }
91+
92+ boolean skipFinal = false ;
93+ try {
94+ long r = requested ;
95+ if (r != Long .MAX_VALUE ) {
96+ long u = r - n ;
97+ if (u < 0 ) {
98+ throw new IllegalStateException ();
99+ }
100+ requested = u ;
101+ }
102+
103+ emitLoop ();
104+ skipFinal = true ;
105+ } finally {
106+ if (!skipFinal ) {
107+ synchronized (this ) {
108+ emitting = false ;
109+ }
110+ }
111+ }
112+ }
113+
114+ public void setProducer (Producer newProducer ) {
115+ synchronized (this ) {
116+ if (emitting ) {
117+ missedProducer = newProducer == null ? NULL_PRODUCER : newProducer ;
118+ return ;
119+ }
120+ emitting = true ;
121+ }
122+ boolean skipFinal = false ;
123+ try {
124+ currentProducer = newProducer ;
125+ if (newProducer != null ) {
126+ newProducer .request (requested );
127+ }
128+
129+ emitLoop ();
130+ skipFinal = true ;
131+ } finally {
132+ if (!skipFinal ) {
133+ synchronized (this ) {
134+ emitting = false ;
135+ }
136+ }
137+ }
138+ }
139+
140+ public void emitLoop () {
141+ for (;;) {
142+ long localRequested ;
143+ long localProduced ;
144+ Producer localProducer ;
145+ synchronized (this ) {
146+ localRequested = missedRequested ;
147+ localProduced = missedProduced ;
148+ localProducer = missedProducer ;
149+ if (localRequested == 0L
150+ && localProduced == 0L
151+ && localProducer == null ) {
152+ emitting = false ;
153+ return ;
154+ }
155+ missedRequested = 0L ;
156+ missedProduced = 0L ;
157+ missedProducer = null ;
158+ }
159+
160+ long r = requested ;
161+
162+ if (r != Long .MAX_VALUE ) {
163+ long u = r + localRequested ;
164+ if (u < 0 || u == Long .MAX_VALUE ) {
165+ r = Long .MAX_VALUE ;
166+ requested = r ;
167+ } else {
168+ long v = u - localProduced ;
169+ if (v < 0 ) {
170+ throw new IllegalStateException ("more produced than requested" );
171+ }
172+ r = v ;
173+ requested = v ;
174+ }
175+ }
176+ if (localProducer != null ) {
177+ if (localProducer == NULL_PRODUCER ) {
178+ currentProducer = null ;
179+ } else {
180+ currentProducer = localProducer ;
181+ localProducer .request (r );
182+ }
183+ } else {
184+ Producer p = currentProducer ;
185+ if (p != null && localRequested != 0L ) {
186+ p .request (localRequested );
187+ }
188+ }
189+ }
190+ }
191+ }
0 commit comments