Skip to content

Commit 81adc5c

Browse files
committed
Added JavaDoc and moved public helper classes of OperationBuffer class to rx.util package.
1 parent 6f50049 commit 81adc5c

File tree

6 files changed

+185
-33
lines changed

6 files changed

+185
-33
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import rx.operators.AtomicObserver;
4343
import rx.operators.OperationAll;
4444
import rx.operators.OperationBuffer;
45-
import rx.operators.OperationBuffer.BufferClosing;
46-
import rx.operators.OperationBuffer.BufferOpening;
4745
import rx.operators.OperationCache;
4846
import rx.operators.OperationConcat;
4947
import rx.operators.OperationDefer;
@@ -85,6 +83,8 @@
8583
import rx.subjects.Subject;
8684
import rx.subscriptions.BooleanSubscription;
8785
import rx.subscriptions.Subscriptions;
86+
import rx.util.BufferClosing;
87+
import rx.util.BufferOpening;
8888
import rx.util.Range;
8989
import rx.util.Timestamped;
9090
import rx.util.functions.Action0;

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 149 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636
import rx.concurrency.Schedulers;
3737
import rx.concurrency.TestScheduler;
3838
import rx.subscriptions.Subscriptions;
39+
import rx.util.BufferClosing;
40+
import rx.util.BufferClosings;
41+
import rx.util.BufferOpening;
42+
import rx.util.BufferOpenings;
3943
import rx.util.functions.Action0;
4044
import rx.util.functions.Action1;
4145
import rx.util.functions.Func0;
@@ -346,6 +350,15 @@ public Subscription call(final Observer<List<T>> observer) {
346350
};
347351
}
348352

353+
/**
354+
* This {@link BufferObserver} object can be constructed using a {@link Buffers} object,
355+
* a {@link Observer} object, and a {@link BufferCreator} object. The {@link BufferCreator}
356+
* will manage the creation, and in some rare cases emission of internal {@link Buffer} objects
357+
* in the specified {@link Buffers} object. Under normal circumstances the {@link Buffers}
358+
* object specifies when a created {@link Buffer} is emitted.
359+
*
360+
* @param <T> The type of object all internal {@link Buffer} objects record.
361+
*/
349362
private static class BufferObserver<T> implements Observer<T> {
350363

351364
private final Buffers<T> buffers;
@@ -379,11 +392,31 @@ public void onNext(T args) {
379392
}
380393
}
381394

395+
/**
396+
* This interface defines a way which specifies when to create a new internal {@link Buffer} object.
397+
*
398+
* @param <T> The type of object all internal {@link Buffer} objects record.
399+
*/
382400
private interface BufferCreator<T> {
401+
/**
402+
* Signifies a onNext event.
403+
*/
383404
void onValuePushed();
405+
406+
/**
407+
* Signifies a onCompleted or onError event. Should be used to clean up open
408+
* subscriptions and other still running background tasks.
409+
*/
384410
void stop();
385411
}
386412

413+
/**
414+
* This {@link BufferCreator} creates a new {@link Buffer} when it is initialized, but
415+
* provides no additional functionality. This class should primarily be used when the
416+
* internal {@link Buffer} is closed externally.
417+
*
418+
* @param <T> The type of object all internal {@link Buffer} objects record.
419+
*/
387420
private static class SingleBufferCreator<T> implements BufferCreator<T> {
388421

389422
public SingleBufferCreator(Buffers<T> buffers) {
@@ -401,6 +434,13 @@ public void stop() {
401434
}
402435
}
403436

437+
/**
438+
* This {@link BufferCreator} creates a new {@link Buffer} whenever it receives an
439+
* object from the provided {@link Observable} created with the
440+
* bufferClosingSelector {@link Func0}.
441+
*
442+
* @param <T> The type of object all internal {@link Buffer} objects record.
443+
*/
404444
private static class ObservableBasedSingleBufferCreator<T> implements BufferCreator<T> {
405445

406446
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
@@ -437,11 +477,19 @@ public void stop() {
437477
}
438478
}
439479

480+
/**
481+
* This {@link BufferCreator} creates a new {@link Buffer} whenever it receives
482+
* an object from the provided bufferOpenings {@link Observable}, and closes the corresponding
483+
* {@link Buffer} object when it receives an object from the provided {@link Observable} created
484+
* with the bufferClosingSelector {@link Func1}.
485+
*
486+
* @param <T> The type of object all internal {@link Buffer} objects record.
487+
*/
440488
private static class ObservableBasedMultiBufferCreator<T> implements BufferCreator<T> {
441489

442490
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
443491

444-
public ObservableBasedMultiBufferCreator(final Buffers<T> buffers, Observable<BufferOpening> bufferOpenings, final Func1<BufferOpening, Observable<BufferClosing>> bufferClosingSelector) {
492+
public ObservableBasedMultiBufferCreator(final OverlappingBuffers<T> buffers, Observable<BufferOpening> bufferOpenings, final Func1<BufferOpening, Observable<BufferClosing>> bufferClosingSelector) {
445493
subscription.wrap(bufferOpenings.subscribe(new Action1<BufferOpening>() {
446494
@Override
447495
public void call(BufferOpening opening) {
@@ -469,6 +517,12 @@ public void stop() {
469517
}
470518
}
471519

520+
/**
521+
* This {@link BufferCreator} creates a new {@link Buffer} every time after a fixed
522+
* period of time has elapsed.
523+
*
524+
* @param <T> The type of object all internal {@link Buffer} objects record.
525+
*/
472526
private static class TimeBasedBufferCreator<T> implements BufferCreator<T> {
473527

474528
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
@@ -502,6 +556,12 @@ public void stop() {
502556
}
503557
}
504558

559+
/**
560+
* This {@link BufferCreator} creates a new {@link Buffer} every time after it has
561+
* seen a certain amount of elements.
562+
*
563+
* @param <T> The type of object all internal {@link Buffer} objects record.
564+
*/
505565
private static class SkippingBufferCreator<T> implements BufferCreator<T> {
506566

507567
private final AtomicInteger skipped = new AtomicInteger(1);
@@ -527,6 +587,12 @@ public void stop() {
527587
}
528588
}
529589

590+
/**
591+
* This class is an extension on the {@link Buffers} class which only supports one
592+
* active (not yet emitted) internal {@link Buffer} object.
593+
*
594+
* @param <T> The type of object all internal {@link Buffer} objects record.
595+
*/
530596
private static class NonOverlappingBuffers<T> extends Buffers<T> {
531597

532598
private final Object lock = new Object();
@@ -550,13 +616,27 @@ public void pushValue(T value) {
550616
}
551617
}
552618

619+
/**
620+
* This class is an extension on the {@link Buffers} class which actually has no additional
621+
* behavior than its super class. Classes extending this class, are expected to support
622+
* two or more active (not yet emitted) internal {@link Buffer} objects.
623+
*
624+
* @param <T> The type of object all internal {@link Buffer} objects record.
625+
*/
553626
private static class OverlappingBuffers<T> extends Buffers<T> {
554-
555627
public OverlappingBuffers(Observer<List<T>> observer) {
556628
super(observer);
557629
}
558630
}
559631

632+
/**
633+
* This class is an extension on the {@link Buffers} class. Every internal buffer has
634+
* a has a maximum time to live and a maximum internal capacity. When the buffer has
635+
* reached the end of its life, or reached its maximum internal capacity it is
636+
* automatically emitted.
637+
*
638+
* @param <T> The type of object all internal {@link Buffer} objects record.
639+
*/
560640
private static class TimeAndSizeBasedBuffers<T> extends Buffers<T> {
561641

562642
private final ConcurrentMap<Buffer<T>, Subscription> subscriptions = new ConcurrentHashMap<Buffer<T>, Subscription>();
@@ -615,6 +695,13 @@ public void pushValue(T value) {
615695
}
616696
}
617697

698+
/**
699+
* This class is an extension on the {@link Buffers} class. Every internal buffer has
700+
* a has a maximum time to live. When the buffer has reached the end of its life it is
701+
* automatically emitted.
702+
*
703+
* @param <T> The type of object all internal {@link Buffer} objects record.
704+
*/
618705
private static class TimeBasedBuffers<T> extends OverlappingBuffers<T> {
619706

620707
private final ConcurrentMap<Buffer<T>, Subscription> subscriptions = new ConcurrentHashMap<Buffer<T>, Subscription>();
@@ -649,6 +736,13 @@ public void emitBuffer(Buffer<T> buffer) {
649736
}
650737
}
651738

739+
/**
740+
* This class is an extension on the {@link Buffers} class. Every internal buffer has
741+
* a fixed maximum capacity. When the buffer has reached its maximum capacity it is
742+
* automatically emitted.
743+
*
744+
* @param <T> The type of object all internal {@link Buffer} objects record.
745+
*/
652746
private static class SizeBasedBuffers<T> extends Buffers<T> {
653747

654748
private final int size;
@@ -674,28 +768,55 @@ public void pushValue(T value) {
674768
}
675769
}
676770

771+
/**
772+
* This class represents an object which contains and manages multiple {@link Buffer} objects.
773+
*
774+
* @param <T> The type of objects which the internal {@link Buffer} objects record.
775+
*/
677776
private static class Buffers<T> {
678777

679778
private final Queue<Buffer<T>> buffers = new ConcurrentLinkedQueue<Buffer<T>>();
680779
private final Observer<List<T>> observer;
681780

781+
/**
782+
* Constructs a new {@link Buffers} object for the specified {@link Observer}.
783+
*
784+
* @param observer
785+
* The {@link Observer} to which this object will emit its internal
786+
* {@link Buffer} objects to when requested.
787+
*/
682788
public Buffers(Observer<List<T>> observer) {
683789
this.observer = observer;
684790
}
685791

792+
/**
793+
* This method will instantiate a new {@link Buffer} object and register it internally.
794+
*
795+
* @return
796+
* The constructed empty {@link Buffer} object.
797+
*/
686798
public Buffer<T> createBuffer() {
687799
Buffer<T> buffer = new Buffer<T>();
688800
buffers.add(buffer);
689801
return buffer;
690802
}
691803

804+
/**
805+
* This method emits all not yet emitted {@link Buffer} objects.
806+
*/
692807
public void emitAllBuffers() {
693808
Buffer<T> buffer;
694809
while ((buffer = buffers.poll()) != null) {
695810
observer.onNext(buffer.getContents());
696811
}
697812
}
698813

814+
/**
815+
* This method emits the specified {@link Buffer} object.
816+
*
817+
* @param buffer
818+
* The {@link Buffer} to emit.
819+
*/
699820
public void emitBuffer(Buffer<T> buffer) {
700821
if (!buffers.remove(buffer)) {
701822
// Concurrency issue: Buffer is already emitted!
@@ -704,10 +825,20 @@ public void emitBuffer(Buffer<T> buffer) {
704825
observer.onNext(buffer.getContents());
705826
}
706827

828+
/**
829+
* @return
830+
* The oldest (in case there are multiple) {@link Buffer} object.
831+
*/
707832
public Buffer<T> getBuffer() {
708833
return buffers.peek();
709834
}
710835

836+
/**
837+
* This method pushes a value to all not yet emitted {@link Buffer} objects.
838+
*
839+
* @param value
840+
* The value to push to all not yet emitted {@link Buffer} objects.
841+
*/
711842
public void pushValue(T value) {
712843
List<Buffer<T>> copy = new ArrayList<Buffer<T>>(buffers);
713844
for (Buffer<T> buffer : copy) {
@@ -716,47 +847,34 @@ public void pushValue(T value) {
716847
}
717848
}
718849

850+
/**
851+
* This class represents a single buffer: A sequence of recorded values.
852+
*
853+
* @param <T> The type of objects which this {@link Buffer} can hold.
854+
*/
719855
private static class Buffer<T> {
720856
private final List<T> contents = new ArrayList<T>();
721857

858+
/**
859+
* Appends a specified value to the {@link Buffer}.
860+
*
861+
* @param value
862+
* The value to append to the {@link Buffer}.
863+
*/
722864
public void pushValue(T value) {
723865
contents.add(value);
724866
}
725867

868+
/**
869+
* @return
870+
* The mutable underlying {@link List} which contains all the
871+
* recorded values in this {@link Buffer} object.
872+
*/
726873
public List<T> getContents() {
727874
return contents;
728875
}
729876
}
730877

731-
public interface BufferOpening {
732-
// Tagging interface for objects which can open buffers.
733-
}
734-
735-
public interface BufferClosing {
736-
// Tagging interface for objects which can close buffers.
737-
}
738-
739-
public static class BufferOpenings {
740-
741-
public static BufferOpening create() {
742-
return new BufferOpening() {};
743-
}
744-
745-
private BufferOpenings() {
746-
// Prevent instantation.
747-
}
748-
}
749-
public static class BufferClosings {
750-
751-
public static BufferClosing create() {
752-
return new BufferClosing() {};
753-
}
754-
755-
private BufferClosings() {
756-
// Prevent instantation.
757-
}
758-
}
759-
760878
public static class UnitTest {
761879

762880
private Observer<List<String>> observer;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package rx.util;
2+
3+
public interface BufferClosing {
4+
// Tagging interface for objects which can close buffers.
5+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package rx.util;
2+
3+
public class BufferClosings {
4+
5+
public static BufferClosing create() {
6+
return new BufferClosing() {};
7+
}
8+
9+
private BufferClosings() {
10+
// Prevent instantation.
11+
}
12+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package rx.util;
2+
3+
public interface BufferOpening {
4+
// Tagging interface for objects which can open buffers.
5+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package rx.util;
2+
3+
public class BufferOpenings {
4+
5+
public static BufferOpening create() {
6+
return new BufferOpening() {};
7+
}
8+
9+
private BufferOpenings() {
10+
// Prevent instantation.
11+
}
12+
}

0 commit comments

Comments
 (0)