|
21 | 21 | import static rx.operators.OperationMerge.*;
|
22 | 22 |
|
23 | 23 | import java.util.ArrayList;
|
| 24 | +import java.util.Arrays; |
| 25 | +import java.util.Iterator; |
24 | 26 | import java.util.List;
|
25 | 27 | import java.util.concurrent.CountDownLatch;
|
26 | 28 | import java.util.concurrent.TimeUnit;
|
|
35 | 37 | import rx.Observable;
|
36 | 38 | import rx.Observer;
|
37 | 39 | import rx.Subscription;
|
| 40 | +import rx.schedulers.Schedulers; |
38 | 41 | import rx.subscriptions.Subscriptions;
|
39 | 42 | import rx.util.functions.Action0;
|
40 | 43 | import rx.util.functions.Action1;
|
@@ -465,4 +468,85 @@ public void unsubscribe() {
|
465 | 468 | };
|
466 | 469 | }
|
467 | 470 | }
|
| 471 | + |
| 472 | + @Test |
| 473 | + public void testWhenMaxConcurrentIsOne() { |
| 474 | + for (int i = 0; i < 100; i++) { |
| 475 | + List<Observable<String>> os = new ArrayList<Observable<String>>(); |
| 476 | + os.add(Observable.from("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread())); |
| 477 | + os.add(Observable.from("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread())); |
| 478 | + os.add(Observable.from("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread())); |
| 479 | + |
| 480 | + List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five"); |
| 481 | + Iterator<String> iter = Observable.merge(os, 1).toBlockingObservable().toIterable().iterator(); |
| 482 | + List<String> actual = new ArrayList<String>(); |
| 483 | + while(iter.hasNext()) { |
| 484 | + actual.add(iter.next()); |
| 485 | + } |
| 486 | + assertEquals(expected, actual); |
| 487 | + } |
| 488 | + } |
| 489 | + |
| 490 | + @Test |
| 491 | + public void testMaxConcurrent() { |
| 492 | + for (int times = 0; times < 100; times++) { |
| 493 | + int observableCount = 100; |
| 494 | + // Test maxConcurrent from 2 to 12 |
| 495 | + int maxConcurrent = 2 + (times % 10); |
| 496 | + AtomicInteger subscriptionCount = new AtomicInteger(0); |
| 497 | + |
| 498 | + List<Observable<String>> os = new ArrayList<Observable<String>>(); |
| 499 | + List<SubscriptionCheckObservable> scos = new ArrayList<SubscriptionCheckObservable>(); |
| 500 | + for (int i = 0; i < observableCount; i++) { |
| 501 | + SubscriptionCheckObservable sco = new SubscriptionCheckObservable( |
| 502 | + subscriptionCount, maxConcurrent); |
| 503 | + scos.add(sco); |
| 504 | + os.add(Observable.create(sco).subscribeOn( |
| 505 | + Schedulers.threadPoolForComputation())); |
| 506 | + } |
| 507 | + |
| 508 | + Iterator<String> iter = Observable.merge(os, maxConcurrent) |
| 509 | + .toBlockingObservable().toIterable().iterator(); |
| 510 | + List<String> actual = new ArrayList<String>(); |
| 511 | + while (iter.hasNext()) { |
| 512 | + actual.add(iter.next()); |
| 513 | + } |
| 514 | + assertEquals(5 * observableCount, actual.size()); |
| 515 | + for (SubscriptionCheckObservable sco : scos) { |
| 516 | + assertFalse(sco.failed); |
| 517 | + } |
| 518 | + } |
| 519 | + } |
| 520 | + |
| 521 | + private static class SubscriptionCheckObservable implements |
| 522 | + Observable.OnSubscribeFunc<String> { |
| 523 | + |
| 524 | + private final AtomicInteger subscriptionCount; |
| 525 | + private final int maxConcurrent; |
| 526 | + volatile boolean failed = false; |
| 527 | + |
| 528 | + SubscriptionCheckObservable(AtomicInteger subscriptionCount, |
| 529 | + int maxConcurrent) { |
| 530 | + this.subscriptionCount = subscriptionCount; |
| 531 | + this.maxConcurrent = maxConcurrent; |
| 532 | + } |
| 533 | + |
| 534 | + @Override |
| 535 | + public Subscription onSubscribe(Observer<? super String> t1) { |
| 536 | + if (subscriptionCount.incrementAndGet() > maxConcurrent) { |
| 537 | + failed = true; |
| 538 | + } |
| 539 | + t1.onNext("one"); |
| 540 | + t1.onNext("two"); |
| 541 | + t1.onNext("three"); |
| 542 | + t1.onNext("four"); |
| 543 | + t1.onNext("five"); |
| 544 | + // We could not decrement subscriptionCount in the unsubscribe method |
| 545 | + // as "unsubscribe" is not guaranteed to be called before the next "subscribe". |
| 546 | + subscriptionCount.decrementAndGet(); |
| 547 | + t1.onCompleted(); |
| 548 | + return Subscriptions.empty(); |
| 549 | + } |
| 550 | + |
| 551 | + } |
468 | 552 | }
|
0 commit comments