Skip to content

Commit 59bb911

Browse files
OperationJoinTest - fixing
1 parent 8926255 commit 59bb911

File tree

2 files changed

+32
-28
lines changed

2 files changed

+32
-28
lines changed

rxjava-core/src/main/java/rx/joins/JoinObserver1.java

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public final class JoinObserver1<T> extends Observer<Notification<T>> implements
3737
private final Action1<Throwable> onError;
3838
private final List<ActivePlan0> activePlans;
3939
private final Queue<Notification<T>> queue;
40-
// private volatile boolean done;
4140
private final AtomicBoolean subscribed = new AtomicBoolean(false);
4241
private final SafeObserver<Notification<T>> safeObserver;
4342

@@ -74,6 +73,30 @@ public void dequeue() {
7473
queue.remove();
7574
}
7675

76+
77+
@Override
78+
public void onNext(Notification<T> args) {
79+
safeObserver.onNext(args);
80+
}
81+
82+
@Override
83+
public void onError(Throwable e) {
84+
safeObserver.onError(e);
85+
}
86+
87+
@Override
88+
public void onCompleted() {
89+
safeObserver.onCompleted();
90+
}
91+
92+
void removeActivePlan(ActivePlan0 activePlan) {
93+
activePlans.remove(activePlan);
94+
if (activePlans.isEmpty()) {
95+
unsubscribe();
96+
}
97+
}
98+
99+
77100
private final class InnerObserver extends Observer<Notification<T>> {
78101

79102
@Override
@@ -105,26 +128,4 @@ public void onCompleted() {
105128
}
106129
}
107130

108-
@Override
109-
public void onNext(Notification<T> args) {
110-
safeObserver.onNext(args);
111-
}
112-
113-
@Override
114-
public void onError(Throwable e) {
115-
safeObserver.onError(e);
116-
}
117-
118-
@Override
119-
public void onCompleted() {
120-
safeObserver.onCompleted();
121-
}
122-
123-
void removeActivePlan(ActivePlan0 activePlan) {
124-
activePlans.remove(activePlan);
125-
if (activePlans.isEmpty()) {
126-
unsubscribe();
127-
}
128-
}
129-
130-
}
131+
}

rxjava-core/src/test/java/rx/operators/OperationJoinsTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,13 @@ public void whenComplicated() {
366366
PublishSubject<Integer> zs = PublishSubject.create();
367367

368368
Observable<Integer> m = Observable.when(
369-
xs.toObservable().and(ys.toObservable()).then(add2),
370-
xs.toObservable().and(zs.toObservable()).then(mul2),
371-
ys.toObservable().and(zs.toObservable()).then(sub2)
369+
xs.toObservable().and(ys.toObservable()).then(add2), // 1+4=5, 2+5=7, 3+6=9
370+
xs.toObservable().and(zs.toObservable()).then(mul2), // 1*7=7, 2*8=16, 3*9=27
371+
ys.toObservable().and(zs.toObservable()).then(sub2) // 4-7=-3, 5-8=-3, 6-9=-3
372372
);
373+
374+
// 5, 7, 9, 7, 16, 27, -3, -3, -3
375+
373376
TestObserver<Object> to = new TestObserver<Object>(observer);
374377
m.subscribe(to);
375378

@@ -394,7 +397,7 @@ public void whenComplicated() {
394397
zs.onCompleted(); // t == 300
395398

396399
System.out.println("Events: " + to.getOnNextEvents());
397-
400+
398401
InOrder inOrder = inOrder(observer);
399402
inOrder.verify(observer, times(1)).onNext(1 * 7);
400403
inOrder.verify(observer, times(1)).onNext(2 * 8);

0 commit comments

Comments
 (0)