Skip to content

Commit d44f059

Browse files
committed
Add cache(int capacity) to Observable
1 parent 179db94 commit d44f059

File tree

3 files changed

+65
-0
lines changed

3 files changed

+65
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3425,6 +3425,19 @@ public final Observable<T> cache() {
34253425
return create(new OnSubscribeCache<T>(this));
34263426
}
34273427

3428+
/**
3429+
* {@code cache} with initial capacity.
3430+
*
3431+
* @param capacity
3432+
* initial cache size
3433+
* @return an Observable that, when first subscribed to, caches all of its items and notifications for the
3434+
* benefit of subsequent subscribers
3435+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#cache">RxJava Wiki: cache()</a>
3436+
*/
3437+
public final Observable<T> cache(int capacity) {
3438+
return create(new OnSubscribeCache<T>(this, capacity));
3439+
}
3440+
34283441
/**
34293442
* Returns an Observable that emits the items emitted by the source Observable, converted to the specified
34303443
* type.

rxjava-core/src/main/java/rx/internal/operators/OnSubscribeCache.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ public OnSubscribeCache(Observable<? extends T> source) {
5252
this(source, ReplaySubject.<T> create());
5353
}
5454

55+
public OnSubscribeCache(Observable<? extends T> source, int capacity) {
56+
this(source, ReplaySubject.<T> create(capacity));
57+
}
58+
5559
/* accessible to tests */OnSubscribeCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
5660
this.source = source;
5761
this.cache = cache;

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,54 @@ public void call(String v) {
694694
assertEquals(1, counter.get());
695695
}
696696

697+
@Test
698+
public void testCacheWithCapacity() throws InterruptedException {
699+
final AtomicInteger counter = new AtomicInteger();
700+
Observable<String> o = Observable.create(new OnSubscribe<String>() {
701+
702+
@Override
703+
public void call(final Subscriber<? super String> observer) {
704+
new Thread(new Runnable() {
705+
706+
@Override
707+
public void run() {
708+
counter.incrementAndGet();
709+
observer.onNext("one");
710+
observer.onCompleted();
711+
}
712+
}).start();
713+
}
714+
}).cache(1);
715+
716+
// we then expect the following 2 subscriptions to get that same value
717+
final CountDownLatch latch = new CountDownLatch(2);
718+
719+
// subscribe once
720+
o.subscribe(new Action1<String>() {
721+
722+
@Override
723+
public void call(String v) {
724+
assertEquals("one", v);
725+
latch.countDown();
726+
}
727+
});
728+
729+
// subscribe again
730+
o.subscribe(new Action1<String>() {
731+
732+
@Override
733+
public void call(String v) {
734+
assertEquals("one", v);
735+
latch.countDown();
736+
}
737+
});
738+
739+
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
740+
fail("subscriptions did not receive values");
741+
}
742+
assertEquals(1, counter.get());
743+
}
744+
697745
/**
698746
* https://github.com/Netflix/RxJava/issues/198
699747
*

0 commit comments

Comments
 (0)