@@ -33,12 +33,10 @@ public final class OperationFinally {
33
33
/**
34
34
* Call a given action when a sequence completes (with or without an
35
35
* exception). The returned observable is exactly as threadsafe as the
36
- * source observable; in particular, any situation allowing the source to
37
- * call onComplete or onError multiple times allows the returned observable
38
- * to call the final action multiple times.
36
+ * source observable.
39
37
* <p/>
40
38
* Note that "finally" is a Java reserved word and cannot be an identifier,
41
- * so we use "finally0 ".
39
+ * so we use "finallyDo ".
42
40
*
43
41
* @param sequence An observable sequence of elements
44
42
* @param action An action to be taken when the sequence is complete or throws an exception
@@ -48,7 +46,7 @@ public final class OperationFinally {
48
46
* the given action will be called.
49
47
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN Observable.Finally method</a>
50
48
*/
51
- public static <T > Func1 <Observer <T >, Subscription > finally0 (final Observable <T > sequence , final Action0 action ) {
49
+ public static <T > Func1 <Observer <T >, Subscription > finallyDo (final Observable <T > sequence , final Action0 action ) {
52
50
return new Func1 <Observer <T >, Subscription >() {
53
51
@ Override
54
52
public Subscription call (Observer <T > observer ) {
@@ -60,26 +58,14 @@ public Subscription call(Observer<T> observer) {
60
58
private static class Finally <T > implements Func1 <Observer <T >, Subscription > {
61
59
private final Observable <T > sequence ;
62
60
private final Action0 finalAction ;
63
- private Subscription s ;
64
61
65
62
Finally (final Observable <T > sequence , Action0 finalAction ) {
66
63
this .sequence = sequence ;
67
64
this .finalAction = finalAction ;
68
65
}
69
66
70
- private final AtomicObservableSubscription Subscription = new AtomicObservableSubscription ();
71
-
72
- private final Subscription actualSubscription = new Subscription () {
73
- @ Override
74
- public void unsubscribe () {
75
- if (null != s )
76
- s .unsubscribe ();
77
- }
78
- };
79
-
80
67
public Subscription call (Observer <T > observer ) {
81
- s = sequence .subscribe (new FinallyObserver (observer ));
82
- return Subscription .wrap (actualSubscription );
68
+ return sequence .subscribe (new FinallyObserver (observer ));
83
69
}
84
70
85
71
private class FinallyObserver implements Observer <T > {
@@ -91,14 +77,20 @@ private class FinallyObserver implements Observer<T> {
91
77
92
78
@ Override
93
79
public void onCompleted () {
94
- observer .onCompleted ();
95
- finalAction .call ();
80
+ try {
81
+ observer .onCompleted ();
82
+ } finally {
83
+ finalAction .call ();
84
+ }
96
85
}
97
86
98
87
@ Override
99
88
public void onError (Exception e ) {
100
- observer .onError (e );
101
- finalAction .call ();
89
+ try {
90
+ observer .onError (e );
91
+ } finally {
92
+ finalAction .call ();
93
+ }
102
94
}
103
95
104
96
@ Override
@@ -117,7 +109,7 @@ public void before() {
117
109
aObserver = mock (Observer .class );
118
110
}
119
111
private void checkActionCalled (Observable <String > input ) {
120
- Observable .create (finally0 (input , aAction0 )).subscribe (aObserver );
112
+ Observable .create (finallyDo (input , aAction0 )).subscribe (aObserver );
121
113
verify (aAction0 , times (1 )).call ();
122
114
}
123
115
@ Test
0 commit comments