Skip to content

Commit eab66ba

Browse files
Merge pull request #1223 from akarnokd/BoundedReplaySubject520
ReplaySubject enhancement with time and/or size bounds
2 parents 3528e6a + c7ce158 commit eab66ba

File tree

9 files changed

+1178
-896
lines changed

9 files changed

+1178
-896
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4821,7 +4821,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
48214821
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
48224822
@Override
48234823
public final Subject<T, T> call() {
4824-
return OperatorReplay.replayBuffered(bufferSize);
4824+
return ReplaySubject.<T>createWithSize(bufferSize);
48254825
}
48264826
}, selector));
48274827
}
@@ -4889,7 +4889,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
48894889
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
48904890
@Override
48914891
public final Subject<T, T> call() {
4892-
return OperatorReplay.replayWindowed(time, unit, bufferSize, scheduler);
4892+
return ReplaySubject.<T>createWithTimeAndSize(time, unit, bufferSize, scheduler);
48934893
}
48944894
}, selector));
48954895
}
@@ -4920,7 +4920,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
49204920
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
49214921
@Override
49224922
public final Subject<T, T> call() {
4923-
return OperatorReplay.<T> createScheduledSubject(OperatorReplay.<T> replayBuffered(bufferSize), scheduler);
4923+
return OperatorReplay.<T> createScheduledSubject(ReplaySubject.<T>createWithSize(bufferSize), scheduler);
49244924
}
49254925
}, selector));
49264926
}
@@ -4979,7 +4979,7 @@ public final <R> Observable<R> replay(Func1<? super Observable<T>, ? extends Obs
49794979
return create(new OperatorMulticastSelector<T, T, R>(this, new Func0<Subject<T, T>>() {
49804980
@Override
49814981
public final Subject<T, T> call() {
4982-
return OperatorReplay.replayWindowed(time, unit, -1, scheduler);
4982+
return ReplaySubject.<T>createWithTime(time, unit, scheduler);
49834983
}
49844984
}, selector));
49854985
}
@@ -5028,7 +5028,7 @@ public final Subject<T, T> call() {
50285028
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211976.aspx">MSDN: Observable.Replay</a>
50295029
*/
50305030
public final ConnectableObservable<T> replay(int bufferSize) {
5031-
return new OperatorMulticast<T, T>(this, OperatorReplay.<T> replayBuffered(bufferSize));
5031+
return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithSize(bufferSize));
50325032
}
50335033

50345034
/**
@@ -5081,7 +5081,7 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
50815081
if (bufferSize < 0) {
50825082
throw new IllegalArgumentException("bufferSize < 0");
50835083
}
5084-
return new OperatorMulticast<T, T>(this, OperatorReplay.<T> replayWindowed(time, unit, bufferSize, scheduler));
5084+
return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithTimeAndSize(time, unit, bufferSize, scheduler));
50855085
}
50865086

50875087
/**
@@ -5104,7 +5104,7 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
51045104
public final ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler) {
51055105
return new OperatorMulticast<T, T>(this,
51065106
OperatorReplay.createScheduledSubject(
5107-
OperatorReplay.<T> replayBuffered(bufferSize), scheduler));
5107+
ReplaySubject.<T>createWithSize(bufferSize), scheduler));
51085108
}
51095109

51105110
/**
@@ -5148,7 +5148,7 @@ public final ConnectableObservable<T> replay(long time, TimeUnit unit) {
51485148
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211811.aspx">MSDN: Observable.Replay</a>
51495149
*/
51505150
public final ConnectableObservable<T> replay(long time, TimeUnit unit, Scheduler scheduler) {
5151-
return new OperatorMulticast<T, T>(this, OperatorReplay.<T> replayWindowed(time, unit, -1, scheduler));
5151+
return new OperatorMulticast<T, T>(this, ReplaySubject.<T>createWithTime(time, unit, scheduler));
51525152
}
51535153

51545154
/**

0 commit comments

Comments
 (0)