Skip to content

Commit 05a79a8

Browse files
add debounce/throttle methods
1 parent 0a9c485 commit 05a79a8

File tree

1 file changed

+158
-0
lines changed

1 file changed

+158
-0
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scalaimpl/Observable.scala

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,164 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
12591259
val thatJava: rx.Observable[_ <: U] = that.asJava
12601260
new Observable[U](rx.Observable.merge(thisJava, thatJava))
12611261
}
1262+
1263+
/**
1264+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1265+
* <p>
1266+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1267+
* <p>
1268+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
1269+
* <p>
1270+
* Information on debounce vs throttle:
1271+
* <p>
1272+
* <ul>
1273+
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
1274+
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
1275+
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
1276+
* </ul>
1277+
*
1278+
* @param timeout
1279+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1280+
*
1281+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1282+
* @see {@link #debounce}
1283+
*/
1284+
def throttleWithTimeout(timeout: Duration): Observable[T] = {
1285+
new Observable[T](asJava.throttleWithTimeout(timeout.length, timeout.unit))
1286+
}
1287+
1288+
/**
1289+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1290+
* <p>
1291+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1292+
* <p>
1293+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
1294+
* <p>
1295+
* Information on debounce vs throttle:
1296+
* <p>
1297+
* <ul>
1298+
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
1299+
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
1300+
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
1301+
* </ul>
1302+
*
1303+
* @param timeout
1304+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1305+
*
1306+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1307+
* @see {@link #throttleWithTimeout};
1308+
*/
1309+
def debounce(timeout: Duration): Observable[T] = {
1310+
new Observable[T](asJava.debounce(timeout.length, timeout.unit))
1311+
}
1312+
1313+
/**
1314+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1315+
* <p>
1316+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1317+
* <p>
1318+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
1319+
* <p>
1320+
* Information on debounce vs throttle:
1321+
* <p>
1322+
* <ul>
1323+
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
1324+
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
1325+
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
1326+
* </ul>
1327+
*
1328+
* @param timeout
1329+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1330+
* @param scheduler
1331+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1332+
* @return Observable which performs the throttle operation.
1333+
* @see {@link #throttleWithTimeout};
1334+
*/
1335+
def debounce(timeout: Duration, scheduler: Scheduler): Observable[T] = {
1336+
new Observable[T](asJava.debounce(timeout.length, timeout.unit, scheduler))
1337+
}
1338+
1339+
/**
1340+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1341+
* <p>
1342+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1343+
* <p>
1344+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
1345+
*
1346+
* @param timeout
1347+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1348+
* @param scheduler
1349+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1350+
* @return Observable which performs the throttle operation.
1351+
* @see {@link #debounce}
1352+
*/
1353+
def throttleWithTimeout(timeout: Duration, scheduler: Scheduler): Observable[T] = {
1354+
new Observable[T](asJava.throttleWithTimeout(timeout.length, timeout.unit, scheduler))
1355+
}
1356+
1357+
/**
1358+
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
1359+
* <p>
1360+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
1361+
* <p>
1362+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
1363+
*
1364+
* @param skipDuration
1365+
* Time to wait before sending another value after emitting last value.
1366+
* @param scheduler
1367+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1368+
* @return Observable which performs the throttle operation.
1369+
*/
1370+
def throttleFirst(skipDuration: Duration, scheduler: Scheduler): Observable[T] = {
1371+
new Observable[T](asJava.throttleFirst(skipDuration.length, skipDuration.unit, scheduler))
1372+
}
1373+
1374+
/**
1375+
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
1376+
* <p>
1377+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
1378+
* <p>
1379+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
1380+
*
1381+
* @param skipDuration
1382+
* Time to wait before sending another value after emitting last value.
1383+
* @return Observable which performs the throttle operation.
1384+
*/
1385+
def throttleFirst(skipDuration: Duration): Observable[T] = {
1386+
new Observable[T](asJava.throttleFirst(skipDuration.length, skipDuration.unit))
1387+
}
1388+
1389+
/**
1390+
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
1391+
* <p>
1392+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
1393+
* <p>
1394+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
1395+
*
1396+
* @param intervalDuration
1397+
* Duration of windows within with the last value will be chosen.
1398+
* @return Observable which performs the throttle operation.
1399+
* @see {@link #sample(long, TimeUnit)}
1400+
*/
1401+
def throttleLast(intervalDuration: Duration): Observable[T] = {
1402+
new Observable[T](asJava.throttleLast(intervalDuration.length, intervalDuration.unit))
1403+
}
1404+
1405+
/**
1406+
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
1407+
* <p>
1408+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
1409+
* <p>
1410+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
1411+
*
1412+
* @param intervalDuration
1413+
* Duration of windows within with the last value will be chosen.
1414+
* @return Observable which performs the throttle operation.
1415+
* @see {@link #sample(long, TimeUnit, Scheduler)}
1416+
*/
1417+
def throttleLast(intervalDuration: Duration, scheduler: Scheduler): Observable[T] = {
1418+
new Observable[T](asJava.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler))
1419+
}
12621420

12631421
/**
12641422
* Converts an Observable into a {@link BlockingObservable} (an Observable with blocking

0 commit comments

Comments
 (0)