Skip to content

Commit e4bd5dd

Browse files
Merge pull request #612 from akarnokd/ReplayAdditionalOps
Replay additional overloads
2 parents 034cf47 + 5915fb3 commit e4bd5dd

File tree

4 files changed

+1591
-0
lines changed

4 files changed

+1591
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import rx.operators.OperationOnExceptionResumeNextViaObservable;
7474
import rx.operators.OperationParallel;
7575
import rx.operators.OperationParallelMerge;
76+
import rx.operators.OperationReplay;
7677
import rx.operators.OperationRetry;
7778
import rx.operators.OperationSample;
7879
import rx.operators.OperationScan;
@@ -526,6 +527,24 @@ public <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends R> su
526527
return OperationMulticast.multicast(this, subject);
527528
}
528529

530+
/**
531+
* Returns an observable sequence that contains the elements of a sequence
532+
* produced by multicasting the source sequence within a selector function.
533+
*
534+
* @param subjectFactory the subject factory
535+
* @param selector The selector function which can use the multicasted
536+
* source sequence subject to the policies enforced by the
537+
* created subject.
538+
* @return the Observable sequence that contains the elements of a sequence
539+
* produced by multicasting the source sequence within a selector function.
540+
*
541+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229708.aspx'>MSDN: Observable.Multicast</a>
542+
*/
543+
public <TIntermediate, TResult> Observable<TResult> multicast(
544+
final Func0<? extends Subject<? super T, ? extends TIntermediate>> subjectFactory,
545+
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
546+
return OperationMulticast.multicast(this, subjectFactory, selector);
547+
}
529548
/**
530549
* Allow the {@link RxJavaErrorHandler} to receive the exception from
531550
* onError.
@@ -4308,7 +4327,327 @@ public <R> Observable<List<T>> maxBy(Func1<T, R> selector, Comparator<? super R>
43084327
public ConnectableObservable<T> replay() {
43094328
return OperationMulticast.multicast(this, ReplaySubject.<T> create());
43104329
}
4330+
4331+
/**
4332+
* Returns a {@link ConnectableObservable} that shares a single subscription
4333+
* to the underlying Observable that will replay all of its items and
4334+
* notifications to any future {@link Observer} on the given scheduler.
4335+
*
4336+
* @param scheduler the scheduler where the Observers will receive the events
4337+
* @return a {@link ConnectableObservable} that shares a single subscription
4338+
* to the underlying Observable that will replay all of its items and
4339+
* notifications to any future {@link Observer} on the given scheduler
4340+
*
4341+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211699.aspx'>MSDN: Observable.Replay</a>
4342+
*/
4343+
public ConnectableObservable<T> replay(Scheduler scheduler) {
4344+
return OperationMulticast.multicast(this, OperationReplay.createScheduledSubject(ReplaySubject.<T>create(), scheduler));
4345+
}
4346+
4347+
/**
4348+
* Returns a connectable observable sequence that shares a single subscription
4349+
* to the underlying sequence replaying bufferSize notifications.
4350+
*
4351+
* @param bufferSize the buffer size
4352+
* @return a connectable observable sequence that shares a single subscription
4353+
* to the underlying sequence replaying bufferSize notifications
4354+
*
4355+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211976.aspx'>MSDN: Observable.Replay</a>
4356+
*/
4357+
public ConnectableObservable<T> replay(int bufferSize) {
4358+
return OperationMulticast.multicast(this, OperationReplay.<T>replayBuffered(bufferSize));
4359+
}
4360+
4361+
/**
4362+
* Returns a connectable observable sequence that shares a single
4363+
* subscription to the underlying sequence replaying bufferSize notifications.
4364+
*
4365+
* @param bufferSize the buffer size
4366+
* @param scheduler the scheduler where the Observers will receive the events
4367+
* @return a connectable observable sequence that shares a single
4368+
* subscription to the underlying sequence replaying bufferSize notifications
4369+
*
4370+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229814.aspx'>MSDN: Observable.Replay</a>
4371+
*/
4372+
public ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
4373+
return OperationMulticast.multicast(this,
4374+
OperationReplay.createScheduledSubject(
4375+
OperationReplay.<T>replayBuffered(bufferSize), scheduler));
4376+
}
4377+
4378+
/**
4379+
* Returns a connectable observable sequence that shares a single
4380+
* subscription to the underlying sequence replaying all notifications within window.
4381+
*
4382+
* @param time the window length
4383+
* @param unit the window length time unit
4384+
* @return a connectable observable sequence that shares a single
4385+
* subscription to the underlying sequence replaying all notifications within window
4386+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229232.aspx'>MSDN: Observable.Replay</a>
4387+
*/
4388+
public ConnectableObservable<T> replay(long time, TimeUnit unit) {
4389+
return replay(time, unit, Schedulers.threadPoolForComputation());
4390+
}
4391+
4392+
/**
4393+
* Returns a connectable observable sequence that shares a single
4394+
* subscription to the underlying sequence replaying all notifications within window.
4395+
*
4396+
* @param time the window length
4397+
* @param unit the window length time unit
4398+
* @param scheduler the scheduler which is used as a time source for the window
4399+
* @return a connectable observable sequence that shares a single
4400+
* subscription to the underlying sequence replaying all notifications within window
4401+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211811.aspx'>MSDN: Observable.Replay</a>
4402+
*/
4403+
public ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
4404+
return OperationMulticast.multicast(this, OperationReplay.<T>replayWindowed(time, unit, -1, scheduler));
4405+
}
4406+
4407+
/**
4408+
* Returns a connectable observable sequence that shares a single
4409+
* subscription to the underlying sequence replaying bufferSize notifications within window.
4410+
*
4411+
* @param bufferSize the buffer size
4412+
* @param time the window length
4413+
* @param unit the window length time unit
4414+
* @return Returns a connectable observable sequence that shares a single
4415+
* subscription to the underlying sequence replaying bufferSize notifications within window
4416+
*
4417+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229874.aspx'>MSDN: Observable.Replay</a>
4418+
*/
4419+
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit) {
4420+
return replay(bufferSize, time, unit, Schedulers.threadPoolForComputation());
4421+
}
4422+
4423+
/**
4424+
* Returns a connectable observable sequence that shares a single
4425+
* subscription to the underlying sequence replaying bufferSize notifications within window.
4426+
*
4427+
* @param bufferSize the buffer size
4428+
* @param time the window length
4429+
* @param unit the window length time unit
4430+
* @param scheduler the scheduler which is used as a time source for the window
4431+
* @return a connectable observable sequence that shares a single
4432+
* subscription to the underlying sequence replaying bufferSize notifications within window
4433+
*
4434+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211759.aspx'>MSDN: Observable.Replay</a>
4435+
*/
4436+
public ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler) {
4437+
if (bufferSize < 0) {
4438+
throw new IllegalArgumentException("bufferSize < 0");
4439+
}
4440+
return OperationMulticast.multicast(this, OperationReplay.<T>replayWindowed(time, unit, bufferSize, scheduler));
4441+
}
4442+
4443+
/**
4444+
* Returns an observable sequence that is the result of invoking the selector
4445+
* on a connectable observable sequence that shares a single subscription to
4446+
* the underlying sequence and starts with initial value.
4447+
*
4448+
* @param <R> the return element type
4449+
* @param selector The selector function which can use the multicasted
4450+
* this sequence as many times as needed, without causing
4451+
* multiple subscriptions to this sequence.
4452+
* @return an observable sequence that is the result of invoking the selector
4453+
* on a connectable observable sequence that shares a single subscription to
4454+
* the underlying sequence and starts with initial value
4455+
*
4456+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229653.aspx'>MSDN: Observable.Replay</a>
4457+
*/
4458+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
4459+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4460+
@Override
4461+
public Subject<T, T> call() {
4462+
return ReplaySubject.create();
4463+
}
4464+
}, selector);
4465+
}
43114466

4467+
/**
4468+
* Returns an observable sequence that is the result of invoking the
4469+
* selector on a connectable observable sequence that shares a single
4470+
* subscription to the underlying sequence replaying all notifications.
4471+
*
4472+
* @param <R> the return element type
4473+
* @param selector The selector function which can use the multicasted
4474+
* this sequence as many times as needed, without causing
4475+
* multiple subscriptions to this sequence.
4476+
* @param scheduler the scheduler where the replay is observed
4477+
* @return an observable sequence that is the result of invoking the
4478+
* selector on a connectable observable sequence that shares a single
4479+
* subscription to the underlying sequence replaying all notifications
4480+
*
4481+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211644.aspx'>MSDN: Observable.Replay</a>
4482+
*/
4483+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final Scheduler scheduler) {
4484+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4485+
@Override
4486+
public Subject<T, T> call() {
4487+
return OperationReplay.createScheduledSubject(ReplaySubject.<T>create(), scheduler);
4488+
}
4489+
}, selector);
4490+
}
4491+
4492+
/**
4493+
* Returns an observable sequence that is the result of invoking the
4494+
* selector on a connectable observable sequence that shares a single
4495+
* subscription to the underlying sequence replaying bufferSize notifications.
4496+
*
4497+
* @param <R> the return element type
4498+
* @param selector The selector function which can use the multicasted
4499+
* this sequence as many times as needed, without causing
4500+
* multiple subscriptions to this sequence.
4501+
* @param bufferSize the buffer size
4502+
* @return an observable sequence that is the result of invoking the
4503+
* selector on a connectable observable sequence that shares a single
4504+
* subscription to the underlying sequence replaying bufferSize notifications
4505+
*
4506+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211675.aspx'>MSDN: Observable.Replay</a>
4507+
*/
4508+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize) {
4509+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4510+
@Override
4511+
public Subject<T, T> call() {
4512+
return OperationReplay.replayBuffered(bufferSize);
4513+
}
4514+
}, selector);
4515+
}
4516+
4517+
/**
4518+
* Returns an observable sequence that is the result of invoking the
4519+
* selector on a connectable observable sequence that shares a single
4520+
* subscription to the underlying sequence replaying bufferSize notifications.
4521+
*
4522+
* @param <R> the return element type
4523+
* @param selector The selector function which can use the multicasted
4524+
* this sequence as many times as needed, without causing
4525+
* multiple subscriptions to this sequence.
4526+
* @param bufferSize the buffer size
4527+
* @param scheduler the scheduler where the replay is observed
4528+
* @return an observable sequence that is the result of invoking the
4529+
* selector on a connectable observable sequence that shares a single
4530+
* subscription to the underlying sequence replaying bufferSize notifications
4531+
*
4532+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229928.aspx'>MSDN: Observable.Replay</a>
4533+
*/
4534+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final Scheduler scheduler) {
4535+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4536+
@Override
4537+
public Subject<T, T> call() {
4538+
return OperationReplay.<T>createScheduledSubject(OperationReplay.<T>replayBuffered(bufferSize), scheduler);
4539+
}
4540+
}, selector);
4541+
}
4542+
4543+
/**
4544+
* Returns an observable sequence that is the result of invoking
4545+
* the selector on a connectable observable sequence that shares a single
4546+
* subscription to the underlying sequence replaying all notifications within window.
4547+
*
4548+
* @param <R> the return element type
4549+
* @param selector The selector function which can use the multicasted
4550+
* this sequence as many times as needed, without causing
4551+
* multiple subscriptions to this sequence.
4552+
* @param time the window length
4553+
* @param unit the window length time unit
4554+
* @return an observable sequence that is the result of invoking
4555+
* the selector on a connectable observable sequence that shares a single
4556+
* subscription to the underlying sequence replaying all notifications within window
4557+
*
4558+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229526.aspx'>MSDN: Observable.Replay</a>
4559+
*/
4560+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, long time, TimeUnit unit) {
4561+
return replay(selector, time, unit, Schedulers.threadPoolForComputation());
4562+
}
4563+
4564+
/**
4565+
* Returns an observable sequence that is the result of invoking the
4566+
* selector on a connectable observable sequence that shares a single
4567+
* subscription to the underlying sequence replaying all notifications within window.
4568+
*
4569+
* @param <R> the return element type
4570+
* @param selector The selector function which can use the multicasted
4571+
* this sequence as many times as needed, without causing
4572+
* multiple subscriptions to this sequence.
4573+
* @param time the window length
4574+
* @param unit the window length time unit
4575+
* @param scheduler the scheduler which is used as a time source for the window
4576+
* @return an observable sequence that is the result of invoking the
4577+
* selector on a connectable observable sequence that shares a single
4578+
* subscription to the underlying sequence replaying all notifications within window
4579+
*
4580+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh244327.aspx'>MSDN: Observable.Replay</a>
4581+
*/
4582+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler) {
4583+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4584+
@Override
4585+
public Subject<T, T> call() {
4586+
return OperationReplay.replayWindowed(time, unit, -1, scheduler);
4587+
}
4588+
}, selector);
4589+
}
4590+
4591+
/**
4592+
* Returns an observable sequence that is the result of invoking the
4593+
* selector on a connectable observable sequence that shares a single
4594+
* subscription to the underlying sequence replaying bufferSize notifications
4595+
* within window.
4596+
*
4597+
* @param <R> the return element type
4598+
* @param selector The selector function which can use the multicasted
4599+
* this sequence as many times as needed, without causing
4600+
* multiple subscriptions to this sequence.
4601+
* @param bufferSize the buffer size
4602+
* @param time the window length
4603+
* @param unit the window length time unit
4604+
*
4605+
* @return an observable sequence that is the result of invoking the
4606+
* selector on a connectable observable sequence that shares a single
4607+
* subscription to the underlying sequence replaying bufferSize notifications
4608+
* within window
4609+
*
4610+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh228952.aspx'>MSDN: Observable.Replay</a>
4611+
*/
4612+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, int bufferSize, long time, TimeUnit unit) {
4613+
return replay(selector, bufferSize, time, unit, Schedulers.threadPoolForComputation());
4614+
}
4615+
4616+
4617+
/**
4618+
* Returns an observable sequence that is the result of invoking the
4619+
* selector on a connectable observable sequence that shares a single
4620+
* subscription to the underlying sequence replaying bufferSize notifications
4621+
* within window.
4622+
*
4623+
* @param <R> the return element type
4624+
* @param selector The selector function which can use the multicasted
4625+
* this sequence as many times as needed, without causing
4626+
* multiple subscriptions to this sequence.
4627+
* @param bufferSize the buffer size
4628+
* @param time the window length
4629+
* @param unit the window length time unit
4630+
* @param scheduler the scheduler which is used as a time source for the window
4631+
*
4632+
* @return an observable sequence that is the result of invoking the
4633+
* selector on a connectable observable sequence that shares a single
4634+
* subscription to the underlying sequence replaying bufferSize notifications
4635+
* within window
4636+
*
4637+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229404.aspx'>MSDN: Observable.Replay</a>
4638+
*/
4639+
public <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Observable<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
4640+
if (bufferSize < 0) {
4641+
throw new IllegalArgumentException("bufferSize < 0");
4642+
}
4643+
return OperationMulticast.multicast(this, new Func0<Subject<T, T>>() {
4644+
@Override
4645+
public Subject<T, T> call() {
4646+
return OperationReplay.replayWindowed(time, unit, bufferSize, scheduler);
4647+
}
4648+
}, selector);
4649+
}
4650+
43124651
/**
43134652
* Retry subscription to the source Observable when it calls
43144653
* <code>onError</code> up to a certain number of retries.

0 commit comments

Comments
 (0)