Skip to content

Commit 802b41d

Browse files
akarnokdakarnokd
authored andcommitted
Zip: emit onCompleted without waiting for request + avoid re-reading
fields
1 parent e901ffa commit 802b41d

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

src/main/java/rx/internal/operators/OperatorZip.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,17 +223,21 @@ public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requ
223223
*/
224224
@SuppressWarnings("unchecked")
225225
void tick() {
226+
final Object[] observers = this.observers;
226227
if (observers == null) {
227228
// nothing yet to do (initial request from Producer)
228229
return;
229230
}
230231
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
232+
final int length = observers.length;
233+
final Observer<? super R> child = this.child;
234+
final AtomicLong requested = this.requested;
231235
do {
232-
// we only emit if requested > 0
233-
while (requested.get() > 0) {
234-
final Object[] vs = new Object[observers.length];
236+
while (true) {
237+
// peek for a potential onCompleted event
238+
final Object[] vs = new Object[length];
235239
boolean allHaveValues = true;
236-
for (int i = 0; i < observers.length; i++) {
240+
for (int i = 0; i < length; i++) {
237241
RxRingBuffer buffer = ((InnerSubscriber) observers[i]).items;
238242
Object n = buffer.peek();
239243

@@ -252,7 +256,8 @@ void tick() {
252256
vs[i] = buffer.getValue(n);
253257
}
254258
}
255-
if (allHaveValues) {
259+
// we only emit if requested > 0 and have all values available
260+
if (requested.get() > 0 && allHaveValues) {
256261
try {
257262
// all have something so emit
258263
child.onNext(zipFunction.call(vs));

src/test/java/rx/internal/operators/OperatorZipTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import java.util.concurrent.TimeUnit;
3737
import java.util.concurrent.atomic.AtomicInteger;
3838

39+
import junit.framework.Assert;
40+
3941
import org.junit.Before;
4042
import org.junit.Test;
4143
import org.mockito.InOrder;
@@ -1266,4 +1268,43 @@ public void onNext(Integer t) {
12661268
ts.assertTerminalEvent();
12671269
ts.assertReceivedOnNext(Arrays.asList(11, 22));
12681270
}
1271+
@Test(timeout = 10000)
1272+
public void testZipRace() {
1273+
Observable<Integer> src = Observable.just(1).subscribeOn(Schedulers.computation());
1274+
for (int i = 0; i < 500000; i++) {
1275+
int value = Observable.zip(src, src, new Func2<Integer, Integer, Integer>() {
1276+
@Override
1277+
public Integer call(Integer t1, Integer t2) {
1278+
return t1 + t2 * 10;
1279+
}
1280+
}).toBlocking().singleOrDefault(0);
1281+
1282+
Assert.assertEquals(11, value);
1283+
}
1284+
}
1285+
/**
1286+
* Request only a single value and don't wait for another request just
1287+
* to emit an onCompleted.
1288+
*/
1289+
@Test
1290+
public void testZipRequest1() {
1291+
Observable<Integer> src = Observable.just(1).subscribeOn(Schedulers.computation());
1292+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
1293+
@Override
1294+
public void onStart() {
1295+
requestMore(1);
1296+
}
1297+
};
1298+
1299+
Observable.zip(src, src, new Func2<Integer, Integer, Integer>() {
1300+
@Override
1301+
public Integer call(Integer t1, Integer t2) {
1302+
return t1 + t2 * 10;
1303+
}
1304+
}).subscribe(ts);
1305+
1306+
ts.awaitTerminalEvent(1, TimeUnit.SECONDS);
1307+
ts.assertNoErrors();
1308+
ts.assertReceivedOnNext(Arrays.asList(11));
1309+
}
12691310
}

0 commit comments

Comments
 (0)