@@ -18,6 +18,7 @@ package rx.lang.groovy.examples;
18
18
import rx.Observable ;
19
19
import rx.Observer ;
20
20
import rx.Subscription ;
21
+ import rx.subscriptions.BooleanSubscription ;
21
22
import rx.util.functions.Func1 ;
22
23
import java.util.concurrent.ExecutorService ;
23
24
import java.util.concurrent.LinkedBlockingQueue ;
@@ -66,8 +67,9 @@ static void main(String[] args) {
66
67
* [id:1000, title:video-1000-title, length:5428, bookmark:0,
67
68
* rating:[actual:4, average:3, predicted:0]]
68
69
*/
69
- def Observable getVideoGridForDisplay (userId ) {
70
- getListOfLists(userId). mapMany({ VideoList list ->
70
+ Observable getVideoGridForDisplay (userId ) {
71
+ // take the first 5 lists
72
+ getListOfLists(userId). take(5 ). mapMany({ VideoList list ->
71
73
// for each VideoList we want to fetch the videos
72
74
list. getVideos()
73
75
.take(10 ) // we only want the first 10 of each list
@@ -103,29 +105,23 @@ def Observable getVideoGridForDisplay(userId) {
103
105
*
104
106
* Observable<VideoList> is the "push" equivalent to List<VideoList>
105
107
*/
106
- def Observable<VideoList > getListOfLists (userId ) {
108
+ Observable<VideoList > getListOfLists (userId ) {
107
109
return Observable . create({ observer ->
108
- AtomicBoolean isRunning = new AtomicBoolean ( true );
110
+ BooleanSubscription subscription = new BooleanSubscription ();
109
111
// this will happen on a separate thread as it requires a network call
110
- executor. execute(new Runnable () {
111
- def void run () {
112
+ executor. execute({
112
113
// simulate network latency
113
114
Thread . sleep(180 );
114
115
for (i in 0 .. 15 ) {
115
- if (! isRunning. get()) {
116
- // we have received an unsubscribe
116
+ if (subscription. isUnsubscribed()) {
117
117
break ;
118
118
}
119
119
// println("****** emitting list: " + i)
120
120
observer. onNext(new VideoList (i))
121
121
}
122
122
observer. onCompleted();
123
- }
124
123
})
125
- return Observable . createSubscription({
126
- // see https://github.com/Netflix/RxJava/issues/173 for a possibly simpler way of doing this
127
- isRunning. set(false );
128
- });
124
+ return subscription;
129
125
})
130
126
}
131
127
@@ -139,15 +135,15 @@ class VideoList {
139
135
this . listPosition = position
140
136
}
141
137
142
- def String getListName () {
138
+ String getListName () {
143
139
return " ListName-" + listPosition
144
140
}
145
141
146
- def Integer getListPosition () {
142
+ Integer getListPosition () {
147
143
return listPosition
148
144
}
149
145
150
- def Observable<Video > getVideos () {
146
+ Observable<Video > getVideos () {
151
147
return Observable . create({ observer ->
152
148
// we already have the videos once a list is loaded
153
149
// so we won't launch another thread but return
@@ -170,7 +166,7 @@ class Video {
170
166
}
171
167
172
168
// synchronous
173
- def Observable<Map<String , String > > getMetadata () {
169
+ Observable<Map<String , String > > getMetadata () {
174
170
// simulate fetching metadata from an in-memory cache
175
171
// so it will not asynchronously execute on a thread but
176
172
// immediately return an Observable with the data
@@ -184,14 +180,13 @@ class Video {
184
180
}
185
181
186
182
// asynchronous
187
- def Observable<Integer > getBookmark (userId ) {
183
+ Observable<Integer > getBookmark (userId ) {
188
184
// simulate fetching the bookmark for this user
189
185
// that specifies the last played position if
190
186
// this video has been played before
191
187
return Observable . create({ observer ->
192
188
// this will happen on a separate thread as it requires a network call
193
- executor. execute(new Runnable () {
194
- def void run () {
189
+ executor. execute({
195
190
// simulate network latency
196
191
Thread . sleep(4 );
197
192
if (randint(6 ) > 1 ) {
@@ -202,23 +197,20 @@ class Video {
202
197
observer. onNext(randint(4000 ));
203
198
}
204
199
observer. onCompleted();
205
- }
206
200
})
207
201
})
208
202
}
209
203
210
204
// asynchronous
211
- def Observable<VideoRating > getRating (userId ) {
205
+ Observable<VideoRating > getRating (userId ) {
212
206
// simulate fetching the VideoRating for this user
213
207
return Observable . create({ observer ->
214
208
// this will happen on a separate thread as it requires a network call
215
- executor. execute(new Runnable () {
216
- def void run () {
209
+ executor. execute({
217
210
// simulate network latency
218
211
Thread . sleep(10 );
219
212
observer. onNext(new VideoRating (videoId, userId))
220
213
observer. onCompleted();
221
- }
222
214
})
223
215
})
224
216
}
@@ -231,15 +223,15 @@ class VideoRating {
231
223
this . userId = userId;
232
224
}
233
225
234
- def Integer getPredictedStarRating () {
226
+ Integer getPredictedStarRating () {
235
227
return randint(5 )
236
228
}
237
229
238
- def Integer getAverageStarRating () {
230
+ Integer getAverageStarRating () {
239
231
return randint(4 )
240
232
}
241
233
242
- def Integer getActualStarRating () {
234
+ Integer getActualStarRating () {
243
235
return randint(5 )
244
236
}
245
237
}
0 commit comments