2323import rx .Notification ;
2424import rx .Observable ;
2525import rx .Subscriber ;
26- import rx .Subscription ;
2726import rx .exceptions .Exceptions ;
27+ import rx .internal .util .RxRingBuffer ;
2828
2929/**
3030 * Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -47,68 +47,88 @@ private BlockingOperatorToIterator() {
4747 * @return the iterator that could be used to iterate over the elements of the observable.
4848 */
4949 public static <T > Iterator <T > toIterator (Observable <? extends T > source ) {
50- final BlockingQueue < Notification <? extends T >> notifications = new LinkedBlockingQueue < Notification <? extends T > >();
50+ SubscriberIterator < T > subscriber = new SubscriberIterator < T >();
5151
5252 // using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
53- final Subscription subscription = source .materialize ().subscribe (new Subscriber <Notification <? extends T >>() {
54- @ Override
55- public void onCompleted () {
56- // ignore
57- }
53+ source .materialize ().subscribe (subscriber );
54+ return subscriber ;
55+ }
5856
59- @ Override
60- public void onError (Throwable e ) {
61- notifications .offer (Notification .<T >createOnError (e ));
62- }
57+ public static final class SubscriberIterator <T >
58+ extends Subscriber <Notification <? extends T >> implements Iterator <T > {
6359
64- @ Override
65- public void onNext (Notification <? extends T > args ) {
66- notifications .offer (args );
67- }
68- });
60+ static final int LIMIT = 3 * RxRingBuffer .SIZE / 4 ;
6961
70- return new Iterator <T >() {
71- private Notification <? extends T > buf ;
62+ private final BlockingQueue <Notification <? extends T >> notifications ;
63+ private Notification <? extends T > buf ;
64+ private int received ;
7265
73- @ Override
74- public boolean hasNext () {
75- if (buf == null ) {
76- buf = take ();
77- }
78- if (buf .isOnError ()) {
79- throw Exceptions .propagate (buf .getThrowable ());
66+ public SubscriberIterator () {
67+ this .notifications = new LinkedBlockingQueue <Notification <? extends T >>();
68+ }
69+
70+ @ Override
71+ public void onStart () {
72+ request (RxRingBuffer .SIZE );
73+ }
74+
75+ @ Override
76+ public void onCompleted () {
77+ // ignore
78+ }
79+
80+ @ Override
81+ public void onError (Throwable e ) {
82+ notifications .offer (Notification .<T >createOnError (e ));
83+ }
84+
85+ @ Override
86+ public void onNext (Notification <? extends T > args ) {
87+ notifications .offer (args );
88+ }
89+
90+ @ Override
91+ public boolean hasNext () {
92+ if (buf == null ) {
93+ buf = take ();
94+ received ++;
95+ if (received >= LIMIT ) {
96+ request (received );
97+ received = 0 ;
8098 }
81- return !buf .isOnCompleted ();
8299 }
100+ if (buf .isOnError ()) {
101+ throw Exceptions .propagate (buf .getThrowable ());
102+ }
103+ return !buf .isOnCompleted ();
104+ }
83105
84- @ Override
85- public T next () {
86- if (hasNext ()) {
87- T result = buf .getValue ();
88- buf = null ;
89- return result ;
90- }
91- throw new NoSuchElementException ();
106+ @ Override
107+ public T next () {
108+ if (hasNext ()) {
109+ T result = buf .getValue ();
110+ buf = null ;
111+ return result ;
92112 }
113+ throw new NoSuchElementException ();
114+ }
93115
94- private Notification <? extends T > take () {
95- try {
96- Notification <? extends T > poll = notifications .poll ();
97- if (poll != null ) {
98- return poll ;
99- }
100- return notifications .take ();
101- } catch (InterruptedException e ) {
102- subscription .unsubscribe ();
103- throw Exceptions .propagate (e );
116+ private Notification <? extends T > take () {
117+ try {
118+ Notification <? extends T > poll = notifications .poll ();
119+ if (poll != null ) {
120+ return poll ;
104121 }
122+ return notifications .take ();
123+ } catch (InterruptedException e ) {
124+ unsubscribe ();
125+ throw Exceptions .propagate (e );
105126 }
127+ }
106128
107- @ Override
108- public void remove () {
109- throw new UnsupportedOperationException ("Read-only iterator" );
110- }
111- };
129+ @ Override
130+ public void remove () {
131+ throw new UnsupportedOperationException ("Read-only iterator" );
132+ }
112133 }
113-
114134}
0 commit comments