@@ -177,7 +177,10 @@ public void request(long n) {
177177
178178 }
179179
180- private static final class Zip <R > extends AtomicLong {
180+ static final class Zip <R > extends AtomicLong {
181+ /** */
182+ private static final long serialVersionUID = 5995274816189928317L ;
183+
181184 final Observer <? super R > child ;
182185 private final FuncN <? extends R > zipFunction ;
183186 private final CompositeSubscription childSubscription = new CompositeSubscription ();
@@ -186,7 +189,7 @@ private static final class Zip<R> extends AtomicLong {
186189 int emitted = 0 ; // not volatile/synchronized as accessed inside COUNTER_UPDATER block
187190
188191 /* initialized when started in `start` */
189- private Object [] observers ;
192+ private volatile Object [] subscribers ;
190193 private AtomicLong requested ;
191194
192195 public Zip (final Subscriber <? super R > child , FuncN <? extends R > zipFunction ) {
@@ -197,16 +200,18 @@ public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) {
197200
198201 @ SuppressWarnings ("unchecked" )
199202 public void start (@ SuppressWarnings ("rawtypes" ) Observable [] os , AtomicLong requested ) {
200- observers = new Object [os .length ];
201- this .requested = requested ;
203+ final Object [] subscribers = new Object [os .length ];
202204 for (int i = 0 ; i < os .length ; i ++) {
203205 InnerSubscriber io = new InnerSubscriber ();
204- observers [i ] = io ;
206+ subscribers [i ] = io ;
205207 childSubscription .add (io );
206208 }
207-
209+
210+ this .requested = requested ;
211+ this .subscribers = subscribers ; // full memory barrier: release all above
212+
208213 for (int i = 0 ; i < os .length ; i ++) {
209- os [i ].unsafeSubscribe ((InnerSubscriber ) observers [i ]);
214+ os [i ].unsafeSubscribe ((InnerSubscriber ) subscribers [i ]);
210215 }
211216 }
212217
@@ -219,13 +224,13 @@ public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requ
219224 */
220225 @ SuppressWarnings ("unchecked" )
221226 void tick () {
222- final Object [] observers = this .observers ;
223- if (observers == null ) {
227+ final Object [] subscribers = this .subscribers ;
228+ if (subscribers == null ) {
224229 // nothing yet to do (initial request from Producer)
225230 return ;
226231 }
227232 if (getAndIncrement () == 0 ) {
228- final int length = observers .length ;
233+ final int length = subscribers .length ;
229234 final Observer <? super R > child = this .child ;
230235 final AtomicLong requested = this .requested ;
231236 do {
@@ -234,7 +239,7 @@ void tick() {
234239 final Object [] vs = new Object [length ];
235240 boolean allHaveValues = true ;
236241 for (int i = 0 ; i < length ; i ++) {
237- RxRingBuffer buffer = ((InnerSubscriber ) observers [i ]).items ;
242+ RxRingBuffer buffer = ((InnerSubscriber ) subscribers [i ]).items ;
238243 Object n = buffer .peek ();
239244
240245 if (n == null ) {
@@ -265,7 +270,7 @@ void tick() {
265270 return ;
266271 }
267272 // now remove them
268- for (Object obj : observers ) {
273+ for (Object obj : subscribers ) {
269274 RxRingBuffer buffer = ((InnerSubscriber ) obj ).items ;
270275 buffer .poll ();
271276 // eagerly check if the next item on this queue is an onComplete
@@ -278,7 +283,7 @@ void tick() {
278283 }
279284 }
280285 if (emitted > THRESHOLD ) {
281- for (Object obj : observers ) {
286+ for (Object obj : subscribers ) {
282287 ((InnerSubscriber ) obj ).requestMore (emitted );
283288 }
284289 emitted = 0 ;
0 commit comments