1
+ /*
2
+ * Copyright 2008-present MongoDB, Inc.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+
1
17
package org .mongodb .scala .internal
2
18
3
19
import org .mongodb .scala .{ BaseSpec , Observable , Observer }
@@ -13,17 +29,11 @@ class FlatMapObservableTest extends BaseSpec with Futures with Eventually {
13
29
val completedCounter = new AtomicInteger (0 )
14
30
Observable (1 to 100 )
15
31
.flatMap(
16
- x =>
17
- (observer : Observer [_ >: Int ]) => {
18
- Future (()).onComplete(_ => {
19
- observer.onNext(x)
20
- observer.onComplete()
21
- })
22
- }
32
+ x => createObservable(x)
23
33
)
24
34
.subscribe(
25
35
_ => (),
26
- p.failure,
36
+ e => p.failure(e) ,
27
37
() => {
28
38
completedCounter.incrementAndGet()
29
39
Thread .sleep(100 )
@@ -35,4 +45,13 @@ class FlatMapObservableTest extends BaseSpec with Futures with Eventually {
35
45
assert(completedCounter.get() == 1 , s " ${completedCounter.get()}" )
36
46
Thread .sleep(1000 )
37
47
}
48
+
49
+ private def createObservable (x : Int ): Observable [Int ] = new Observable [Int ] {
50
+ override def subscribe (observer : Observer [_ >: Int ]): Unit = {
51
+ Future (()).onComplete(_ => {
52
+ observer.onNext(x)
53
+ observer.onComplete()
54
+ })
55
+ }
56
+ }
38
57
}
0 commit comments