@@ -157,12 +157,31 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
157
157
def ' should not call onComplete after unsubscribe is called' () {
158
158
given :
159
159
def block = getBlock()
160
- def observer = new TestObserver ()
160
+ def observer = new TestObserver (new Observer () {
161
+ private Subscription subscription
162
+
163
+ @Override
164
+ void onSubscribe (final Subscription subscription ) {
165
+ this . subscription = subscription
166
+ }
167
+
168
+ @Override
169
+ void onNext (final Object result ) {
170
+ subscription. unsubscribe()
171
+ }
172
+
173
+ @Override
174
+ void onError (final Throwable e ) {
175
+ }
176
+
177
+ @Override
178
+ void onComplete () {
179
+ }
180
+ })
161
181
observe(block). subscribe(observer)
162
182
163
183
when :
164
184
observer. requestMore(1 )
165
- observer. getSubscription(). unsubscribe()
166
185
167
186
then :
168
187
observer. assertUnsubscribed()
@@ -174,12 +193,16 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
174
193
given :
175
194
def block = getBlock()
176
195
def observer = new TestObserver (new Observer () {
196
+ private Subscription subscription
197
+
177
198
@Override
178
199
void onSubscribe (final Subscription subscription ) {
200
+ this . subscription = subscription
179
201
}
180
202
181
203
@Override
182
204
void onNext (final Object result ) {
205
+ subscription. unsubscribe()
183
206
throw new MongoException (' Failure' )
184
207
}
185
208
@@ -195,13 +218,11 @@ class SingleResultCallbackSubscriptionSpecification extends Specification {
195
218
196
219
when :
197
220
observer. requestMore(1 )
198
- observer. getSubscription(). unsubscribe()
199
221
200
222
then :
201
- observer. assertUnsubscribed()
202
223
observer. assertReceivedOnNext([1 ])
203
- observer. assertErrored ()
204
- observer. assertTerminalEvent ()
224
+ observer. assertUnsubscribed ()
225
+ observer. assertNoTerminalEvent ()
205
226
}
206
227
207
228
0 commit comments