Skip to content

Commit 419c0fe

Browse files
committed
Join patterns extension for 4..9 and N arity joins.
1 parent 8213986 commit 419c0fe

36 files changed

+3333
-264
lines changed

rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan0.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
public abstract class ActivePlan0 {
2525
protected final Map<JoinObserver, JoinObserver> joinObservers = new HashMap<JoinObserver, JoinObserver>();
2626

27-
public abstract void match();
27+
protected abstract void match();
2828

2929
protected void addJoinObserver(JoinObserver joinObserver) {
3030
joinObservers.put(joinObserver, joinObserver);

rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan1.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,22 @@
2222
/**
2323
* Represents an active plan.
2424
*/
25-
public class ActivePlan1<T1> extends ActivePlan0 {
25+
public final class ActivePlan1<T1> extends ActivePlan0 {
2626
private final Action1<T1> onNext;
2727
private final Action0 onCompleted;
28-
private final JoinObserver1<T1> first;
28+
private final JoinObserver1<T1> jo1;
2929

30-
public ActivePlan1(JoinObserver1<T1> first, Action1<T1> onNext, Action0 onCompleted) {
30+
ActivePlan1(JoinObserver1<T1> jo1, Action1<T1> onNext, Action0 onCompleted) {
3131
this.onNext = onNext;
3232
this.onCompleted = onCompleted;
33-
this.first = first;
34-
addJoinObserver(first);
33+
this.jo1 = jo1;
34+
addJoinObserver(jo1);
3535
}
3636

3737
@Override
38-
public void match() {
39-
if (!first.queue().isEmpty()) {
40-
Notification<T1> n1 = first.queue().peek();
38+
protected void match() {
39+
if (!jo1.queue().isEmpty()) {
40+
Notification<T1> n1 = jo1.queue().peek();
4141
if (n1.isOnCompleted()) {
4242
onCompleted.call();
4343
} else {

rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan2.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,26 @@
2222
/**
2323
* Represents an active plan.
2424
*/
25-
public class ActivePlan2<T1, T2> extends ActivePlan0 {
25+
public final class ActivePlan2<T1, T2> extends ActivePlan0 {
2626
private final Action2<T1, T2> onNext;
2727
private final Action0 onCompleted;
28-
private final JoinObserver1<T1> first;
29-
private final JoinObserver1<T2> second;
28+
private final JoinObserver1<T1> jo1;
29+
private final JoinObserver1<T2> jo2;
3030

31-
public ActivePlan2(JoinObserver1<T1> first, JoinObserver1<T2> second, Action2<T1, T2> onNext, Action0 onCompleted) {
31+
ActivePlan2(JoinObserver1<T1> jo1, JoinObserver1<T2> jo2, Action2<T1, T2> onNext, Action0 onCompleted) {
3232
this.onNext = onNext;
3333
this.onCompleted = onCompleted;
34-
this.first = first;
35-
this.second = second;
36-
addJoinObserver(first);
37-
addJoinObserver(second);
34+
this.jo1 = jo1;
35+
this.jo2 = jo2;
36+
addJoinObserver(jo1);
37+
addJoinObserver(jo2);
3838
}
3939

4040
@Override
41-
public void match() {
42-
if (!first.queue().isEmpty() && !second.queue().isEmpty()) {
43-
Notification<T1> n1 = first.queue().peek();
44-
Notification<T2> n2 = second.queue().peek();
41+
protected void match() {
42+
if (!jo1.queue().isEmpty() && !jo2.queue().isEmpty()) {
43+
Notification<T1> n1 = jo1.queue().peek();
44+
Notification<T2> n2 = jo2.queue().peek();
4545

4646
if (n1.isOnCompleted() || n2.isOnCompleted()) {
4747
onCompleted.call();

rxjava-contrib/rxjava-joins/src/main/java/rx/joins/ActivePlan3.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
/**
2323
* Represents an active plan.
2424
*/
25-
public class ActivePlan3<T1, T2, T3> extends ActivePlan0 {
25+
public final class ActivePlan3<T1, T2, T3> extends ActivePlan0 {
2626
private final Action3<T1, T2, T3> onNext;
2727
private final Action0 onCompleted;
2828
private final JoinObserver1<T1> first;
2929
private final JoinObserver1<T2> second;
3030
private final JoinObserver1<T3> third;
3131

32-
public ActivePlan3(JoinObserver1<T1> first,
32+
ActivePlan3(JoinObserver1<T1> first,
3333
JoinObserver1<T2> second,
3434
JoinObserver1<T3> third,
3535
Action3<T1, T2, T3> onNext,
@@ -45,7 +45,7 @@ public ActivePlan3(JoinObserver1<T1> first,
4545
}
4646

4747
@Override
48-
public void match() {
48+
protected void match() {
4949
if (!first.queue().isEmpty()
5050
&& !second.queue().isEmpty()
5151
&& !third.queue().isEmpty()) {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.functions.Action0;
20+
import rx.functions.Action4;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public final class ActivePlan4<T1, T2, T3, T4> extends ActivePlan0 {
26+
private final Action4<T1, T2, T3, T4> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> jo1;
29+
private final JoinObserver1<T2> jo2;
30+
private final JoinObserver1<T3> jo3;
31+
private final JoinObserver1<T4> jo4;
32+
33+
ActivePlan4(
34+
JoinObserver1<T1> jo1,
35+
JoinObserver1<T2> jo2,
36+
JoinObserver1<T3> jo3,
37+
JoinObserver1<T4> jo4,
38+
Action4<T1, T2, T3, T4> onNext,
39+
Action0 onCompleted) {
40+
this.onNext = onNext;
41+
this.onCompleted = onCompleted;
42+
this.jo1 = jo1;
43+
this.jo2 = jo2;
44+
this.jo3 = jo3;
45+
this.jo4 = jo4;
46+
addJoinObserver(jo1);
47+
addJoinObserver(jo2);
48+
addJoinObserver(jo3);
49+
addJoinObserver(jo4);
50+
}
51+
52+
@Override
53+
protected void match() {
54+
if (!jo1.queue().isEmpty()
55+
&& !jo2.queue().isEmpty()
56+
&& !jo3.queue().isEmpty()
57+
&& !jo4.queue().isEmpty()) {
58+
Notification<T1> n1 = jo1.queue().peek();
59+
Notification<T2> n2 = jo2.queue().peek();
60+
Notification<T3> n3 = jo3.queue().peek();
61+
Notification<T4> n4 = jo4.queue().peek();
62+
63+
if (n1.isOnCompleted()
64+
|| n2.isOnCompleted()
65+
|| n3.isOnCompleted()
66+
|| n4.isOnCompleted()) {
67+
onCompleted.call();
68+
} else {
69+
dequeue();
70+
onNext.call(n1.getValue(), n2.getValue(), n3.getValue(), n4.getValue());
71+
}
72+
}
73+
}
74+
75+
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.functions.Action0;
20+
import rx.functions.Action5;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public final class ActivePlan5<T1, T2, T3, T4, T5> extends ActivePlan0 {
26+
private final Action5<T1, T2, T3, T4, T5> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> jo1;
29+
private final JoinObserver1<T2> jo2;
30+
private final JoinObserver1<T3> jo3;
31+
private final JoinObserver1<T4> jo4;
32+
private final JoinObserver1<T5> jo5;
33+
34+
ActivePlan5(
35+
JoinObserver1<T1> jo1,
36+
JoinObserver1<T2> jo2,
37+
JoinObserver1<T3> jo3,
38+
JoinObserver1<T4> jo4,
39+
JoinObserver1<T5> jo5,
40+
Action5<T1, T2, T3, T4, T5> onNext,
41+
Action0 onCompleted) {
42+
this.onNext = onNext;
43+
this.onCompleted = onCompleted;
44+
this.jo1 = jo1;
45+
this.jo2 = jo2;
46+
this.jo3 = jo3;
47+
this.jo4 = jo4;
48+
this.jo5 = jo5;
49+
addJoinObserver(jo1);
50+
addJoinObserver(jo2);
51+
addJoinObserver(jo3);
52+
addJoinObserver(jo4);
53+
addJoinObserver(jo5);
54+
}
55+
56+
@Override
57+
protected void match() {
58+
if (!jo1.queue().isEmpty()
59+
&& !jo2.queue().isEmpty()
60+
&& !jo3.queue().isEmpty()
61+
&& !jo4.queue().isEmpty()
62+
&& !jo5.queue().isEmpty()
63+
) {
64+
Notification<T1> n1 = jo1.queue().peek();
65+
Notification<T2> n2 = jo2.queue().peek();
66+
Notification<T3> n3 = jo3.queue().peek();
67+
Notification<T4> n4 = jo4.queue().peek();
68+
Notification<T5> n5 = jo5.queue().peek();
69+
70+
if (n1.isOnCompleted()
71+
|| n2.isOnCompleted()
72+
|| n3.isOnCompleted()
73+
|| n4.isOnCompleted()
74+
|| n5.isOnCompleted()
75+
) {
76+
onCompleted.call();
77+
} else {
78+
dequeue();
79+
onNext.call(
80+
n1.getValue(),
81+
n2.getValue(),
82+
n3.getValue(),
83+
n4.getValue(),
84+
n5.getValue()
85+
);
86+
}
87+
}
88+
}
89+
90+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.joins;
17+
18+
import rx.Notification;
19+
import rx.functions.Action0;
20+
import rx.functions.Action6;
21+
22+
/**
23+
* Represents an active plan.
24+
*/
25+
public final class ActivePlan6<T1, T2, T3, T4, T5, T6> extends ActivePlan0 {
26+
private final Action6<T1, T2, T3, T4, T5, T6> onNext;
27+
private final Action0 onCompleted;
28+
private final JoinObserver1<T1> jo1;
29+
private final JoinObserver1<T2> jo2;
30+
private final JoinObserver1<T3> jo3;
31+
private final JoinObserver1<T4> jo4;
32+
private final JoinObserver1<T5> jo5;
33+
private final JoinObserver1<T6> jo6;
34+
35+
ActivePlan6(
36+
JoinObserver1<T1> jo1,
37+
JoinObserver1<T2> jo2,
38+
JoinObserver1<T3> jo3,
39+
JoinObserver1<T4> jo4,
40+
JoinObserver1<T5> jo5,
41+
JoinObserver1<T6> jo6,
42+
Action6<T1, T2, T3, T4, T5, T6> onNext,
43+
Action0 onCompleted) {
44+
this.onNext = onNext;
45+
this.onCompleted = onCompleted;
46+
this.jo1 = jo1;
47+
this.jo2 = jo2;
48+
this.jo3 = jo3;
49+
this.jo4 = jo4;
50+
this.jo5 = jo5;
51+
this.jo6 = jo6;
52+
addJoinObserver(jo1);
53+
addJoinObserver(jo2);
54+
addJoinObserver(jo3);
55+
addJoinObserver(jo4);
56+
addJoinObserver(jo5);
57+
addJoinObserver(jo6);
58+
}
59+
60+
@Override
61+
protected void match() {
62+
if (!jo1.queue().isEmpty()
63+
&& !jo2.queue().isEmpty()
64+
&& !jo3.queue().isEmpty()
65+
&& !jo4.queue().isEmpty()
66+
&& !jo5.queue().isEmpty()
67+
&& !jo6.queue().isEmpty()
68+
) {
69+
Notification<T1> n1 = jo1.queue().peek();
70+
Notification<T2> n2 = jo2.queue().peek();
71+
Notification<T3> n3 = jo3.queue().peek();
72+
Notification<T4> n4 = jo4.queue().peek();
73+
Notification<T5> n5 = jo5.queue().peek();
74+
Notification<T6> n6 = jo6.queue().peek();
75+
76+
if (n1.isOnCompleted()
77+
|| n2.isOnCompleted()
78+
|| n3.isOnCompleted()
79+
|| n4.isOnCompleted()
80+
|| n5.isOnCompleted()
81+
|| n6.isOnCompleted()
82+
) {
83+
onCompleted.call();
84+
} else {
85+
dequeue();
86+
onNext.call(
87+
n1.getValue(),
88+
n2.getValue(),
89+
n3.getValue(),
90+
n4.getValue(),
91+
n5.getValue(),
92+
n6.getValue()
93+
);
94+
}
95+
}
96+
}
97+
98+
}

0 commit comments

Comments
 (0)