1313 * See the License for the specific language governing permissions and
1414 * limitations under the License.
1515 */
16-
1716package rx .internal .operators ;
1817
1918import java .util .Random ;
19+
2020import static org .junit .Assert .assertEquals ;
2121import static org .junit .Assert .assertTrue ;
2222import static org .junit .Assert .fail ;
2323
2424import java .util .concurrent .CountDownLatch ;
25+ import java .util .concurrent .Executors ;
2526import java .util .concurrent .TimeUnit ;
2627import java .util .concurrent .atomic .AtomicInteger ;
2728
29+ import org .junit .Ignore ;
2830import org .junit .Test ;
2931
3032import rx .Observable ;
3133import rx .Observable .OnSubscribe ;
3234import rx .Observer ;
35+ import rx .Scheduler ;
3336import rx .Subscriber ;
3437import rx .functions .Func1 ;
3538import rx .observables .GroupedObservable ;
3841
3942public class OperatorPivotTest {
4043
41- @ Test
44+ @ Test ( timeout = 10000 )
4245 public void testPivotEvenAndOdd () throws InterruptedException {
43- Observable <GroupedObservable <Boolean , Integer >> o1 = Observable .range (1 , 10 ).groupBy (modKeySelector ).subscribeOn (Schedulers .newThread ());
44- Observable <GroupedObservable <Boolean , Integer >> o2 = Observable .range (11 , 10 ).groupBy (modKeySelector ).subscribeOn (Schedulers .newThread ());
46+ for (int i =0 ; i <1000 ; i ++) {
47+ System .out .println ("------------------------------------------ testPivotEvenAndOdd -------------------------------------------" );
48+ Observable <GroupedObservable <Boolean , Integer >> o1 = Observable .range (1 , 10 ).groupBy (modKeySelector ).subscribeOn (Schedulers .computation ());
49+ Observable <GroupedObservable <Boolean , Integer >> o2 = Observable .range (11 , 10 ).groupBy (modKeySelector ).subscribeOn (Schedulers .computation ());
4550 Observable <GroupedObservable <String , GroupedObservable <Boolean , Integer >>> groups = Observable .from (GroupedObservable .from ("o1" , o1 ), GroupedObservable .from ("o2" , o2 ));
4651 Observable <GroupedObservable <Boolean , GroupedObservable <String , Integer >>> pivoted = Observable .pivot (groups );
4752
@@ -53,10 +58,12 @@ public void testPivotEvenAndOdd() throws InterruptedException {
5358
5459 @ Override
5560 public Observable <String > call (final GroupedObservable <Boolean , GroupedObservable <String , Integer >> outerGroup ) {
61+ System .out .println ("Outer Group: " + outerGroup .getKey ());
5662 return outerGroup .flatMap (new Func1 <GroupedObservable <String , Integer >, Observable <String >>() {
5763
5864 @ Override
5965 public Observable <String > call (final GroupedObservable <String , Integer > innerGroup ) {
66+ System .out .println ("Inner Group: " + innerGroup .getKey ());
6067 return innerGroup .map (new Func1 <Integer , String >() {
6168
6269 @ Override
@@ -94,14 +101,16 @@ public void onNext(String t) {
94101
95102 });
96103
97- if (!latch .await (800 , TimeUnit .MILLISECONDS )) {
104+ if (!latch .await (20000000 , TimeUnit .MILLISECONDS )) {
98105 System .out .println ("xxxxxxxxxxxxxxxxxx> TIMED OUT <xxxxxxxxxxxxxxxxxxxx" );
99106 System .out .println ("Received count: " + count .get ());
100107 fail ("Timed Out" );
101108 }
102109
103110 System .out .println ("Received count: " + count .get ());
111+ // TODO sometimes this test fails and gets 15 instead of 20 so there is a bug somewhere
104112 assertEquals (20 , count .get ());
113+ }
105114 }
106115
107116 /**
@@ -112,7 +121,7 @@ public void onNext(String t) {
112121 * It's NOT easy to understand though, and easy to end up with far more data consumed than expected, because pivot by definition
113122 * is inverting the data so we can not unsubscribe from the parent until all children are done since the top key becomes the leaf once pivoted.
114123 */
115- @ Test
124+ @ Test ( timeout = 10000 )
116125 public void testUnsubscribeFromGroups () throws InterruptedException {
117126 AtomicInteger counter1 = new AtomicInteger ();
118127 AtomicInteger counter2 = new AtomicInteger ();
@@ -221,7 +230,7 @@ public String call(Integer i) {
221230 *
222231 * Then a subsequent step can merge them if desired and add serialization, such as merge(even.o1, even.o2) to become a serialized "even"
223232 */
224- @ Test
233+ @ Test ( timeout = 10000 )
225234 public void testConcurrencyAndSerialization () throws InterruptedException {
226235 final AtomicInteger maxOuterConcurrency = new AtomicInteger ();
227236 final AtomicInteger maxGroupConcurrency = new AtomicInteger ();
0 commit comments