Skip to content

Commit 3609e67

Browse files
committed
Add 'replay' variants to RxScala
1 parent a0ad6c9 commit 3609e67

File tree

3 files changed

+321
-0
lines changed

3 files changed

+321
-0
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,46 @@ class RxScalaDemo extends JUnitSuite {
320320
waitFor(sharedNumbers)
321321
}
322322

323+
@Test def exampleWithReplay2() {
324+
val numbers = Observable.interval(100 millis).take(10)
325+
val sharedNumbers = numbers.replay(3)
326+
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
327+
sharedNumbers.connect
328+
// subscriber 2 subscribes later but only gets the 3 buffered numbers and the following numbers
329+
Thread.sleep(700)
330+
sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n"))
331+
waitFor(sharedNumbers)
332+
}
333+
334+
@Test def exampleWithReplay3() {
335+
val numbers = Observable.interval(100 millis).take(10)
336+
val sharedNumbers = numbers.replay(300 millis)
337+
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
338+
sharedNumbers.connect
339+
// subscriber 2 subscribes later but only gets the buffered numbers and the following numbers
340+
Thread.sleep(700)
341+
sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n"))
342+
waitFor(sharedNumbers)
343+
}
344+
345+
@Test def exampleWithReplay4() {
346+
val numbers = Observable.interval(100 millis).take(10)
347+
val sharedNumbers = numbers.replay(2, 300 millis)
348+
sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n"))
349+
sharedNumbers.connect
350+
// subscriber 2 subscribes later but only gets the buffered numbers and the following numbers
351+
Thread.sleep(700)
352+
sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n"))
353+
waitFor(sharedNumbers)
354+
}
355+
356+
@Test def exampleWithReplay5() {
357+
val numbers = Observable.interval(100 millis).take(10)
358+
val sharedNumbers = numbers.replay[Long, Long]((o: Observable[Long]) => o.map(_ * 2))
359+
sharedNumbers.subscribe(n => println(s"subscriber gets $n"))
360+
waitFor(sharedNumbers)
361+
}
362+
323363
@Test def testSingleOption() {
324364
assertEquals(None, List(1, 2).toObservable.toBlockingObservable.singleOption)
325365
assertEquals(Some(1), List(1).toObservable.toBlockingObservable.singleOption)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,279 @@ trait Observable[+T]
10501050
new ConnectableObservable[T](asJavaObservable.replay())
10511051
}
10521052

1053+
/**
1054+
* Returns an Observable that emits items that are the results of invoking a specified selector on the items
1055+
* emitted by a `ConnectableObservable` that shares a single subscription to the source Observable.
1056+
* <p>
1057+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.f.png">
1058+
*
1059+
* @param selector the selector function, which can use the multicasted sequence as many times as needed, without
1060+
* causing multiple subscriptions to the Observable
1061+
* @return an Observable that emits items that are the results of invoking the selector on a `ConnectableObservable`
1062+
* that shares a single subscription to the source Observable
1063+
*/
1064+
def replay[U >: T, R](selector: Observable[U] => Observable[R]): Observable[R] = {
1065+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1066+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1067+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1068+
toScalaObservable[R](thisJava.replay(fJava))
1069+
}
1070+
1071+
/**
1072+
* Returns an Observable that emits items that are the results of invoking a specified selector on items
1073+
* emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
1074+
* replaying `bufferSize` notifications.
1075+
* <p>
1076+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.fn.png">
1077+
*
1078+
* @param selector the selector function, which can use the multicasted sequence as many times as needed, without
1079+
* causing multiple subscriptions to the Observable
1080+
* @param bufferSize the buffer size that limits the number of items the connectable observable can replay
1081+
* @return an Observable that emits items that are the results of invoking the selector on items emitted by
1082+
* a `ConnectableObservable` that shares a single subscription to the source Observable replaying
1083+
* no more than `bufferSize` items
1084+
*/
1085+
def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int): Observable[R] = {
1086+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1087+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1088+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1089+
toScalaObservable[R](thisJava.replay(fJava, bufferSize))
1090+
}
1091+
1092+
/**
1093+
* Returns an Observable that emits items that are the results of invoking a specified selector on items
1094+
* emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
1095+
* replaying no more than `bufferSize` items that were emitted within a specified time window.
1096+
* <p>
1097+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.fnt.png">
1098+
*
1099+
* @param selector a selector function, which can use the multicasted sequence as many times as needed, without
1100+
* causing multiple subscriptions to the Observable
1101+
* @param bufferSize the buffer size that limits the number of items the connectable observable can replay
1102+
* @param time the duration of the window in which the replayed items must have been emitted
1103+
* @return an Observable that emits items that are the results of invoking the selector on items emitted by
1104+
* a `ConnectableObservable` that shares a single subscription to the source Observable, and
1105+
* replays no more than `bufferSize` items that were emitted within the window defined by `time`
1106+
*/
1107+
def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int, time: Duration): Observable[R] = {
1108+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1109+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1110+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1111+
toScalaObservable[R](thisJava.replay(fJava, bufferSize, time.length, time.unit))
1112+
}
1113+
1114+
/**
1115+
* Returns an Observable that emits items that are the results of invoking a specified selector on items
1116+
* emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
1117+
* replaying no more than `bufferSize` items that were emitted within a specified time window.
1118+
* <p>
1119+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.fnts.png">
1120+
*
1121+
* @param selector a selector function, which can use the multicasted sequence as many times as needed, without
1122+
* causing multiple subscriptions to the Observable
1123+
* @param bufferSize the buffer size that limits the number of items the connectable observable can replay
1124+
* @param time the duration of the window in which the replayed items must have been emitted
1125+
* @param scheduler the Scheduler that is the time source for the window
1126+
* @return an Observable that emits items that are the results of invoking the selector on items emitted by
1127+
* a `ConnectableObservable` that shares a single subscription to the source Observable, and
1128+
* replays no more than `bufferSize` items that were emitted within the window defined by `time`
1129+
* @throws IllegalArgumentException if `bufferSize` is less than zero
1130+
*/
1131+
def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int, time: Duration, scheduler: Scheduler): Observable[R] = {
1132+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1133+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1134+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1135+
toScalaObservable[R](thisJava.replay(fJava, bufferSize, time.length, time.unit, scheduler))
1136+
}
1137+
1138+
/**
1139+
* Returns an Observable that emits items that are the results of invoking a specified selector on items
1140+
* emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
1141+
* replaying a maximum of `bufferSize` items.
1142+
* <p>
1143+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.fns.png">
1144+
*
1145+
* @param selector a selector function, which can use the multicasted sequence as many times as needed, without
1146+
* causing multiple subscriptions to the Observable
1147+
* @param bufferSize the buffer size that limits the number of items the connectable observable can replay
1148+
* @param scheduler the Scheduler on which the replay is observed
1149+
* @return an Observable that emits items that are the results of invoking the selector on items emitted by
1150+
* a `ConnectableObservable` that shares a single subscription to the source Observable,
1151+
* replaying no more than `bufferSize` notifications
1152+
*/
1153+
def replay[U >: T, R](selector: Observable[U] => Observable[R], bufferSize: Int, scheduler: Scheduler): Observable[R] = {
1154+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1155+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1156+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1157+
toScalaObservable[R](thisJava.replay(fJava, bufferSize, scheduler))
1158+
}
1159+
1160+
/**
1161+
* Returns an Observable that emits items that are the results of invoking a specified selector on items
1162+
* emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
1163+
* replaying all items that were emitted within a specified time window.
1164+
* <p>
1165+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.ft.png">
1166+
*
1167+
* @param selector a selector function, which can use the multicasted sequence as many times as needed, without
1168+
* causing multiple subscriptions to the Observable
1169+
* @param time the duration of the window in which the replayed items must have been emitted
1170+
* @return an Observable that emits items that are the results of invoking the selector on items emitted by
1171+
* a `ConnectableObservable` that shares a single subscription to the source Observable,
1172+
* replaying all items that were emitted within the window defined by `time`
1173+
*/
1174+
def replay[U >: T, R](selector: Observable[U] => Observable[R], time: Duration, scheduler: Scheduler): Observable[R] = {
1175+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1176+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1177+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1178+
toScalaObservable[R](thisJava.replay(fJava, time.length, time.unit, scheduler))
1179+
}
1180+
1181+
/**
1182+
* Returns an Observable that emits items that are the results of invoking a specified selector on items
1183+
* emitted by a `ConnectableObservable` that shares a single subscription to the source Observable.
1184+
* <p>
1185+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.fs.png">
1186+
*
1187+
* @param selector a selector function, which can use the multicasted sequence as many times as needed, without
1188+
* causing multiple subscriptions to the Observable
1189+
* @param scheduler the Scheduler where the replay is observed
1190+
* @return an Observable that emits items that are the results of invoking the selector on items emitted by
1191+
* a `ConnectableObservable` that shares a single subscription to the source Observable,
1192+
* replaying all items
1193+
*/
1194+
def replay[U >: T, R](selector: Observable[U] => Observable[R], scheduler: Scheduler): Observable[R] = {
1195+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1196+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1197+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1198+
toScalaObservable[R](thisJava.replay(fJava, scheduler))
1199+
}
1200+
1201+
/**
1202+
* Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
1203+
* replays at most `bufferSize` items that were emitted during a specified time window.
1204+
* <p>
1205+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.nt.png">
1206+
*
1207+
* @param bufferSize the buffer size that limits the number of items that can be replayed
1208+
* @param time the duration of the window in which the replayed items must have been emitted
1209+
* @return a `ConnectableObservable` that shares a single subscription to the source Observable and
1210+
* replays at most `bufferSize` items that were emitted during the window defined by `time`
1211+
*/
1212+
def replay(bufferSize: Int, time: Duration): ConnectableObservable[T] = {
1213+
new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit))
1214+
}
1215+
1216+
/**
1217+
* Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
1218+
* that replays a maximum of `bufferSize` items that are emitted within a specified time window.
1219+
* <p>
1220+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.nts.png">
1221+
*
1222+
* @param bufferSize the buffer size that limits the number of items that can be replayed
1223+
* @param time the duration of the window in which the replayed items must have been emitted
1224+
* @param scheduler the scheduler that is used as a time source for the window
1225+
* @return a `ConnectableObservable` that shares a single subscription to the source Observable and
1226+
* replays at most `bufferSize` items that were emitted during the window defined by `time``
1227+
* @throws IllegalArgumentException if `bufferSize` is less than zero
1228+
*/
1229+
def replay(bufferSize: Int, time: Duration, scheduler: Scheduler): ConnectableObservable[T] = {
1230+
new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit, scheduler))
1231+
}
1232+
1233+
/**
1234+
* Returns an Observable that emits items that are the results of invoking a specified selector on items
1235+
* emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
1236+
* replaying all items that were emitted within a specified time window.
1237+
* <p>
1238+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.ft.png">
1239+
*
1240+
* @param selector a selector function, which can use the multicasted sequence as many times as needed, without
1241+
* causing multiple subscriptions to the Observable
1242+
* @param time the duration of the window in which the replayed items must have been emitted
1243+
* @return an Observable that emits items that are the results of invoking the selector on items emitted by
1244+
* a `ConnectableObservable` that shares a single subscription to the source Observable,
1245+
* replaying all items that were emitted within the window defined by `time``
1246+
*/
1247+
def replay[U >: T, R](selector: Observable[U] => Observable[R], time: Duration): Observable[R] = {
1248+
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
1249+
val fJava: Func1[rx.Observable[U], rx.Observable[R]] =
1250+
(jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
1251+
toScalaObservable[R](thisJava.replay(fJava, time.length, time.unit))
1252+
}
1253+
1254+
/**
1255+
* Returns a `ConnectableObservable` that shares a single subscription to the source Observable that
1256+
* replays at most `bufferSize` items emitted by that Observable.
1257+
* <p>
1258+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.n.png">
1259+
*
1260+
* @param bufferSize the buffer size that limits the number of items that can be replayed
1261+
* @return a `ConnectableObservable` that shares a single subscription to the source Observable and
1262+
* replays at most `bufferSize` items emitted by that Observable
1263+
*/
1264+
def replay(bufferSize: Int): ConnectableObservable[T] = {
1265+
new ConnectableObservable[T](asJavaObservable.replay(bufferSize))
1266+
}
1267+
1268+
/**
1269+
* Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
1270+
* replays at most `bufferSize` items emitted by that Observable.
1271+
* <p>
1272+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.ns.png">
1273+
*
1274+
* @param bufferSize the buffer size that limits the number of items that can be replayed
1275+
* @param scheduler the scheduler on which the Observers will observe the emitted items
1276+
* @return a `ConnectableObservable` that shares a single subscription to the source Observable and
1277+
* replays at most `bufferSize` items that were emitted by the Observable
1278+
*/
1279+
def replay(bufferSize: Int, scheduler: Scheduler): ConnectableObservable[T] = {
1280+
new ConnectableObservable[T](asJavaObservable.replay(bufferSize, scheduler))
1281+
}
1282+
1283+
/**
1284+
* Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
1285+
* replays all items emitted by that Observable within a specified time window.
1286+
* <p>
1287+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.t.png">
1288+
*
1289+
* @param time the duration of the window in which the replayed items must have been emitted
1290+
* @return a `ConnectableObservable` that shares a single subscription to the source Observable and
1291+
* replays the items that were emitted during the window defined by `time`
1292+
*/
1293+
def replay(time: Duration): ConnectableObservable[T] = {
1294+
new ConnectableObservable[T](asJavaObservable.replay(time.length, time.unit))
1295+
}
1296+
1297+
/**
1298+
* Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
1299+
* replays all items emitted by that Observable within a specified time window.
1300+
* <p>
1301+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.ts.png">
1302+
*
1303+
* @param time the duration of the window in which the replayed items must have been emitted
1304+
* @param scheduler the Scheduler that is the time source for the window
1305+
* @return a `ConnectableObservable` that shares a single subscription to the source Observable and
1306+
* replays the items that were emitted during the window defined by `time`
1307+
*/
1308+
def replay(time: Duration, scheduler: Scheduler): ConnectableObservable[T] = {
1309+
new ConnectableObservable[T](asJavaObservable.replay(time.length, time.unit, scheduler))
1310+
}
1311+
1312+
/**
1313+
* Returns a `ConnectableObservable` that shares a single subscription to the source Observable that
1314+
* will replay all of its items and notifications to any future `Observer` on the given `Scheduler`.
1315+
* <p>
1316+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.s.png">
1317+
*
1318+
* @param scheduler the Scheduler on which the Observers will observe the emitted items
1319+
* @return a `ConnectableObservable` that shares a single subscription to the source Observable that
1320+
* will replay all of its items and notifications to any future `bserver` on the given `Scheduler`
1321+
*/
1322+
def replay(scheduler: Scheduler): ConnectableObservable[T] = {
1323+
new ConnectableObservable[T](asJavaObservable.replay(scheduler))
1324+
}
1325+
10531326
/**
10541327
* This method has similar behavior to [[rx.lang.scala.Observable.replay]] except that this auto-subscribes to
10551328
* the source Observable rather than returning a start function and an Observable.

0 commit comments

Comments
 (0)