Skip to content

Commit 5915fb3

Browse files
committed
Operation: Replay additional overloads
1 parent dcbcf8a commit 5915fb3

File tree

4 files changed

+790
-24
lines changed

4 files changed

+790
-24
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 331 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import rx.operators.OperationOnExceptionResumeNextViaObservable;
7474
import rx.operators.OperationParallel;
7575
import rx.operators.OperationParallelMerge;
76+
import rx.operators.OperationReplay;
7677
import rx.operators.OperationRetry;
7778
import rx.operators.OperationSample;
7879
import rx.operators.OperationScan;
@@ -526,6 +527,24 @@ public <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends R> su
526527
return OperationMulticast.multicast(this, subject);
527528
}
528529

530+
/**
531+
* Returns an observable sequence that contains the elements of a sequence
532+
* produced by multicasting the source sequence within a selector function.
533+
*
534+
* @param subjectFactory the subject factory
535+
* @param selector The selector function which can use the multicasted
536+
* source sequence subject to the policies enforced by the
537+
* created subject.
538+
* @return the Observable sequence that contains the elements of a sequence
539+
* produced by multicasting the source sequence within a selector function.
540+
*
541+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229708.aspx'>MSDN: Observable.Multicast</a>
542+
*/
543+
public <TIntermediate, TResult> Observable<TResult> multicast(
544+
final Func0<? extends Subject<? super T, ? extends TIntermediate>> subjectFactory,
545+
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
546+
return OperationMulticast.multicast(this, subjectFactory, selector);
547+
}
529548
/**
530549
* Allow the {@link RxJavaErrorHandler} to receive the exception from
531550
* onError.
@@ -4277,20 +4296,327 @@ public <R> Observable<List<T>> maxBy(Func1<T, R> selector, Comparator<? super R>
42774296
public ConnectableObservable<T> replay() {
42784297
return OperationMulticast.multicast(this, ReplaySubject.<T> create());
42794298
}
4280-
4299+
42814300
/**
42824301
* Returns a {@link ConnectableObservable} that shares a single subscription
42834302
* to the underlying Observable that will replay all of its items and
4284-
* notifications to any future {@link Observer} on the given scheduler
4303+
* notifications to any future {@link Observer} on the given scheduler.
42854304
*
4286-
* @param scheduler
4287-
* @return
4305+
* @param scheduler the scheduler where the Observers will receive the events
4306+
* @return a {@link ConnectableObservable} that shares a single subscription
4307+
* to the underlying Observable that will replay all of its items and
4308+
* notifications to any future {@link Observer} on the given scheduler
42884309
*
42894310
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211699.aspx'>MSDN: Observable.Replay</a>
42904311
*/
42914312
public ConnectableObservable<T> replay(Scheduler scheduler) {
4292-
return OperationMulticast.multicast(this, ReplaySubject.<T> create());
4313+
return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject.<T>create(), scheduler));
4314+
}
4315+
4316+
/**
4317+
* Returns a connectable observable sequence that shares a single subscription
4318+
* to the underlying sequence replaying bufferSize notifications.
4319+
*
4320+
* @param bufferSize the buffer size
4321+
* @return a connectable observable sequence that shares a single subscription
4322+
* to the underlying sequence replaying bufferSize notifications
4323+
*
4324+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211976.aspx'>MSDN: Observable.Replay</a>
4325+
*/
4326+
public ConnectableObservable<T> replay(int bufferSize) {
4327+
return OperationMulticast.multicast(this, OperationReplay.<T>replayBuffered(bufferSize));
4328+
}
4329+
4330+
/**
4331+
* Returns a connectable observable sequence that shares a single
4332+
* subscription to the underlying sequence replaying bufferSize notifications.
4333+
*
4334+
* @param bufferSize the buffer size
4335+
* @param scheduler the scheduler where the Observers will receive the events
4336+
* @return a connectable observable sequence that shares a single
4337+
* subscription to the underlying sequence replaying bufferSize notifications
4338+
*
4339+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229814.aspx'>MSDN: Observable.Replay</a>
4340+
*/
4341+
public ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
4342+
return OperationMulticast.multicast(this,
4343+
OperationReplay.createScheduledSubject(
4344+
OperationReplay.<T>replayBuffered(bufferSize), scheduler));
4345+
}
4346+
4347+
/**
4348+
* Returns a connectable observable sequence that shares a single
4349+
* subscription to the underlying sequence replaying all notifications within window.
4350+
*
4351+
* @param time the window length
4352+
* @param unit the window length time unit
4353+
* @return a connectable observable sequence that shares a single
4354+
* subscription to the underlying sequence replaying all notifications within window
4355+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229232.aspx'>MSDN: Observable.Replay</a>
4356+
*/
4357+
public ConnectableObservable<T> replay(long time, TimeUnit unit) {
4358+
return replay(time, unit, Schedulers.threadPoolForComputation());
4359+
}
4360+
4361+
/**
4362+
* Returns a connectable observable sequence that shares a single
4363+
* subscription to the underlying sequence replaying all notifications within window.
4364+
*
4365+
* @param time the window length
4366+
* @param unit the window length time unit
4367+
* @param scheduler the scheduler which is used as a time source for the window
4368+
* @return a connectable observable sequence that shares a single
4369+
* subscription to the underlying sequence replaying all notifications within window
4370+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211811.aspx'>MSDN: Observable.Replay</a>
4371+
*/
4372+
public ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
4373+
return OperationMulticast.multicast(this, OperationReplay.<T>replayWindowed(time, unit, -1, scheduler));
4374+
}
4375+
4376+
/**
4377+
* Returns a connectable observable sequence that shares a single
4378+
* subscription to the underlying sequence replaying bufferSize notifications within window.
4379+
*
4380+
* @param bufferSize the buffer size
4381+
* @param time the window length
4382+
* @param unit the window length time unit
4383+
* @return Returns a connectable observable sequence that shares a single
4384+
* subscription to the underlying sequence replaying bufferSize notifications within window
4385+
*
4386+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229874.aspx'>MSDN: Observable.Replay</a>
4387+
*/
4388+
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit) {
4389+
return replay(bufferSize, time, unit, Schedulers.threadPoolForComputation());
4390+
}
4391+
4392+
/**
4393+
* Returns a connectable observable sequence that shares a single
4394+
* subscription to the underlying sequence replaying bufferSize notifications within window.
4395+
*
4396+
* @param bufferSize the buffer size
4397+
* @param time the window length
4398+
* @param unit the window length time unit
4399+
* @param scheduler the scheduler which is used as a time source for the window
4400+
* @return a connectable observable sequence that shares a single
4401+
* subscription to the underlying sequence replaying bufferSize notifications within window
4402+
*
4403+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211759.aspx'>MSDN: Observable.Replay</a>
4404+
*/
4405+
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
4406+
if (bufferSize < 0) {
4407+
throw new IllegalArgumentException("bufferSize < 0");
4408+
}
4409+
return OperationMulticast.multicast(this, OperationReplay.<T>replayWindowed(time, unit, bufferSize, scheduler));
4410+
}
4411+
4412+
/**
4413+
* Returns an observable sequence that is the result of invoking the selector
4414+
* on a connectable observable sequence that shares a single subscription to
4415+
* the underlying sequence and starts with initial value.
4416+
*
4417+
* @param <R> the return element type
4418+
* @param selector The selector function which can use the multicasted
4419+
* this sequence as many times as needed, without causing
4420+
* multiple subscriptions to this sequence.
4421+
* @return an observable sequence that is the result of invoking the selector
4422+
* on a connectable observable sequence that shares a single subscription to
4423+
* the underlying sequence and starts with initial value
4424+
*
4425+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229653.aspx'>MSDN: Observable.Replay</a>
4426+
*/
4427+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
4428+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4429+
@Override
4430+
public Subject<T, T> call() {
4431+
return ReplaySubject.create();
4432+
}
4433+
}, selector);
4434+
}
4435+
4436+
/**
4437+
* Returns an observable sequence that is the result of invoking the
4438+
* selector on a connectable observable sequence that shares a single
4439+
* subscription to the underlying sequence replaying all notifications.
4440+
*
4441+
* @param <R> the return element type
4442+
* @param selector The selector function which can use the multicasted
4443+
* this sequence as many times as needed, without causing
4444+
* multiple subscriptions to this sequence.
4445+
* @param scheduler the scheduler where the replay is observed
4446+
* @return an observable sequence that is the result of invoking the
4447+
* selector on a connectable observable sequence that shares a single
4448+
* subscription to the underlying sequence replaying all notifications
4449+
*
4450+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211644.aspx'>MSDN: Observable.Replay</a>
4451+
*/
4452+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final Scheduler scheduler) {
4453+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4454+
@Override
4455+
public Subject<T, T> call() {
4456+
return OperationReplay.createScheduledSubject(ReplaySubject.<T>create(), scheduler);
4457+
}
4458+
}, selector);
4459+
}
4460+
4461+
/**
4462+
* Returns an observable sequence that is the result of invoking the
4463+
* selector on a connectable observable sequence that shares a single
4464+
* subscription to the underlying sequence replaying bufferSize notifications.
4465+
*
4466+
* @param <R> the return element type
4467+
* @param selector The selector function which can use the multicasted
4468+
* this sequence as many times as needed, without causing
4469+
* multiple subscriptions to this sequence.
4470+
* @param bufferSize the buffer size
4471+
* @return an observable sequence that is the result of invoking the
4472+
* selector on a connectable observable sequence that shares a single
4473+
* subscription to the underlying sequence replaying bufferSize notifications
4474+
*
4475+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211675.aspx'>MSDN: Observable.Replay</a>
4476+
*/
4477+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize) {
4478+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4479+
@Override
4480+
public Subject<T, T> call() {
4481+
return OperationReplay.replayBuffered(bufferSize);
4482+
}
4483+
}, selector);
4484+
}
4485+
4486+
/**
4487+
* Returns an observable sequence that is the result of invoking the
4488+
* selector on a connectable observable sequence that shares a single
4489+
* subscription to the underlying sequence replaying bufferSize notifications.
4490+
*
4491+
* @param <R> the return element type
4492+
* @param selector The selector function which can use the multicasted
4493+
* this sequence as many times as needed, without causing
4494+
* multiple subscriptions to this sequence.
4495+
* @param bufferSize the buffer size
4496+
* @param scheduler the scheduler where the replay is observed
4497+
* @return an observable sequence that is the result of invoking the
4498+
* selector on a connectable observable sequence that shares a single
4499+
* subscription to the underlying sequence replaying bufferSize notifications
4500+
*
4501+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229928.aspx'>MSDN: Observable.Replay</a>
4502+
*/
4503+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final Scheduler scheduler) {
4504+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4505+
@Override
4506+
public Subject<T, T> call() {
4507+
return OperationReplay.<T>createScheduledSubject(OperationReplay.<T>replayBuffered(bufferSize), scheduler);
4508+
}
4509+
}, selector);
4510+
}
4511+
4512+
/**
4513+
* Returns an observable sequence that is the result of invoking
4514+
* the selector on a connectable observable sequence that shares a single
4515+
* subscription to the underlying sequence replaying all notifications within window.
4516+
*
4517+
* @param <R> the return element type
4518+
* @param selector The selector function which can use the multicasted
4519+
* this sequence as many times as needed, without causing
4520+
* multiple subscriptions to this sequence.
4521+
* @param time the window length
4522+
* @param unit the window length time unit
4523+
* @return an observable sequence that is the result of invoking
4524+
* the selector on a connectable observable sequence that shares a single
4525+
* subscription to the underlying sequence replaying all notifications within window
4526+
*
4527+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229526.aspx'>MSDN: Observable.Replay</a>
4528+
*/
4529+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, long time, TimeUnit unit) {
4530+
return replay(selector, time, unit, Schedulers.threadPoolForComputation());
42934531
}
4532+
4533+
/**
4534+
* Returns an observable sequence that is the result of invoking the
4535+
* selector on a connectable observable sequence that shares a single
4536+
* subscription to the underlying sequence replaying all notifications within window.
4537+
*
4538+
* @param <R> the return element type
4539+
* @param selector The selector function which can use the multicasted
4540+
* this sequence as many times as needed, without causing
4541+
* multiple subscriptions to this sequence.
4542+
* @param time the window length
4543+
* @param unit the window length time unit
4544+
* @param scheduler the scheduler which is used as a time source for the window
4545+
* @return an observable sequence that is the result of invoking the
4546+
* selector on a connectable observable sequence that shares a single
4547+
* subscription to the underlying sequence replaying all notifications within window
4548+
*
4549+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh244327.aspx'>MSDN: Observable.Replay</a>
4550+
*/
4551+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler) {
4552+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4553+
@Override
4554+
public Subject<T, T> call() {
4555+
return OperationReplay.replayWindowed(time, unit, -1, scheduler);
4556+
}
4557+
}, selector);
4558+
}
4559+
4560+
/**
4561+
* Returns an observable sequence that is the result of invoking the
4562+
* selector on a connectable observable sequence that shares a single
4563+
* subscription to the underlying sequence replaying bufferSize notifications
4564+
* within window.
4565+
*
4566+
* @param <R> the return element type
4567+
* @param selector The selector function which can use the multicasted
4568+
* this sequence as many times as needed, without causing
4569+
* multiple subscriptions to this sequence.
4570+
* @param bufferSize the buffer size
4571+
* @param time the window length
4572+
* @param unit the window length time unit
4573+
*
4574+
* @return an observable sequence that is the result of invoking the
4575+
* selector on a connectable observable sequence that shares a single
4576+
* subscription to the underlying sequence replaying bufferSize notifications
4577+
* within window
4578+
*
4579+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh228952.aspx'>MSDN: Observable.Replay</a>
4580+
*/
4581+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, int bufferSize, long time, TimeUnit unit) {
4582+
return replay(selector, bufferSize, time, unit, Schedulers.threadPoolForComputation());
4583+
}
4584+
4585+
4586+
/**
4587+
* Returns an observable sequence that is the result of invoking the
4588+
* selector on a connectable observable sequence that shares a single
4589+
* subscription to the underlying sequence replaying bufferSize notifications
4590+
* within window.
4591+
*
4592+
* @param <R> the return element type
4593+
* @param selector The selector function which can use the multicasted
4594+
* this sequence as many times as needed, without causing
4595+
* multiple subscriptions to this sequence.
4596+
* @param bufferSize the buffer size
4597+
* @param time the window length
4598+
* @param unit the window length time unit
4599+
* @param scheduler the scheduler which is used as a time source for the window
4600+
*
4601+
* @return an observable sequence that is the result of invoking the
4602+
* selector on a connectable observable sequence that shares a single
4603+
* subscription to the underlying sequence replaying bufferSize notifications
4604+
* within window
4605+
*
4606+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229404.aspx'>MSDN: Observable.Replay</a>
4607+
*/
4608+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
4609+
if (bufferSize < 0) {
4610+
throw new IllegalArgumentException("bufferSize < 0");
4611+
}
4612+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4613+
@Override
4614+
public Subject<T, T> call() {
4615+
return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler);
4616+
}
4617+
}, selector);
4618+
}
4619+
42944620
/**
42954621
* Retry subscription to the source Observable when it calls
42964622
* <code>onError</code> up to a certain number of retries.

0 commit comments

Comments
 (0)