Skip to content

Commit a2ac4aa

Browse files
committed
Use a custom SuspendableClassifier
1 parent dd94e7b commit a2ac4aa

File tree

4 files changed

+94
-66
lines changed

4 files changed

+94
-66
lines changed

rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/ChannelObservable.java

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
import rx.Scheduler;
2929
import rx.util.Exceptions;
3030
import rx.util.OnErrorNotImplementedException;
31+
import rx.util.functions.Action2;
32+
import rx.util.functions.Actions;
33+
import rx.util.functions.Func1;
34+
import rx.util.functions.Functions;
3135

3236
/**
3337
* This class contains static methods that connect {@link Observable}s and {@link Channel}s.
@@ -122,45 +126,21 @@ public void onError(Throwable e) {
122126
public final static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, Observable<T> o) {
123127
final ChannelWithErrors<T> channel = new ChannelWithErrors<T>(Channels.newChannel(bufferSize, policy));
124128

125-
o.subscribe(new Observer<T>() {
126-
@Override
127-
@Suspendable
128-
public void onNext(T t) {
129-
try {
130-
channel.sendPort().send(t);
131-
} catch (InterruptedException ex) {
132-
Strand.interrupted();
133-
} catch (SuspendExecution ex) {
134-
throw new AssertionError(ex);
135-
}
136-
}
129+
System.out.println(Functions.fromFunc(new Func1<String, String>() {
137130

138131
@Override
139-
public void onCompleted() {
140-
channel.sendPort().close();
132+
public String call(String t1) {
133+
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
141134
}
135+
}));
136+
System.out.println(Actions.toFunc(new Action2<String, String>() {
142137

143138
@Override
144-
public void onError(Throwable e) {
145-
channel.error(e);
139+
public void call(String t1, String t2) {
140+
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
146141
}
147-
});
148-
return channel.receivePort();
149-
}
150-
151-
/**
152-
* Creates a {@link ReceivePort} subscribed to an {@link Observable}.
153-
* <p>
154-
* @param <T> the type of messages emitted by the observable and received on the channel.
155-
* @param bufferSize the channel's buffer size
156-
* @param policy the channel's {@link Channels.OverflowPolicy OverflowPolicy}
157-
* @param o the observable
158-
* @param scheduler the scheduler used to emit the observable's events
159-
* @return A new channel with the given buffer size and overflow policy that will receive all events emitted by the observable.
160-
*/
161-
public final static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, Observable<T> o, Scheduler scheduler) {
162-
final ChannelWithErrors<T> channel = new ChannelWithErrors<T>(Channels.newChannel(bufferSize, policy));
163-
142+
}));
143+
164144
o.subscribe(new Observer<T>() {
165145
@Override
166146
@Suspendable
@@ -183,7 +163,7 @@ public void onCompleted() {
183163
public void onError(Throwable e) {
184164
channel.error(e);
185165
}
186-
}, scheduler);
166+
});
187167
return channel.receivePort();
188168
}
189169

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.quasar;
17+
18+
import co.paralleluniverse.fibers.instrument.MethodDatabase;
19+
import co.paralleluniverse.fibers.instrument.SimpleSuspendableClassifier;
20+
import co.paralleluniverse.fibers.instrument.SuspendableClassifier;
21+
import java.util.Arrays;
22+
import java.util.HashSet;
23+
import java.util.Set;
24+
25+
public class RxSuspendableClassifier implements SuspendableClassifier {
26+
private static final Set<String> CORE_PACKAGES = new HashSet<String>(Arrays.asList(new String[]{
27+
"rx", "rx.joins", "rx.observables", "rx.observers", "rx.operators", "rx.plugins", "rx.schedulers",
28+
"rx.subjects", "rx.subscriptions", "rx.util", "rx.util.functions"
29+
}));
30+
31+
private static final Set<String> EXCEPTIONS = new HashSet<String>(Arrays.asList(new String[]{
32+
"rx/observers/SynchronizedObserver",
33+
"rx/schedulers/AbstractSchedulerTests$ConcurrentObserverValidator",}));
34+
35+
private static final Set<String> OBSERVER_METHODS = new HashSet<String>(Arrays.asList(new String[]{
36+
"onNext(Ljava/lang/Object;)V", "onCompleted()V", "onError(Ljava/lang/Throwable;)V"
37+
}));
38+
39+
private static final String FUNCTION_METHOD = "call";
40+
41+
@Override
42+
public MethodDatabase.SuspendableType isSuspendable(MethodDatabase db, String className, String superClassName, String[] interfaces, String methodName, String methodDesc, String methodSignature, String[] methodExceptions) {
43+
MethodDatabase.SuspendableType s = null;
44+
if (isCoreRx(className) && !EXCEPTIONS.contains(className)) {
45+
if (isObserverImplementation(db, className, superClassName, interfaces, methodName, methodDesc))
46+
s = MethodDatabase.SuspendableType.SUSPENDABLE;
47+
else if (isUtilFunction(db, className, superClassName, interfaces, methodName, methodDesc))
48+
s = MethodDatabase.SuspendableType.SUSPENDABLE;
49+
}
50+
// System.out.println("-- " + className + "." + methodName + ": " + s);
51+
return s;
52+
}
53+
54+
private boolean isCoreRx(String className) {
55+
return CORE_PACKAGES.contains(packageOf(className));
56+
}
57+
58+
private static boolean isObserverImplementation(MethodDatabase db, String className, String superClassName, String[] interfaces, String methodName, String methodDesc) {
59+
return !className.equals("rx/Observer")
60+
&& OBSERVER_METHODS.contains(methodName + methodDesc)
61+
&& SimpleSuspendableClassifier.extendsOrImplements("rx/Observer", db, className, superClassName, interfaces);
62+
}
63+
64+
private static boolean isUtilFunction(MethodDatabase db, String className, String superClassName, String[] interfaces, String methodName, String methodDesc) {
65+
return (className.startsWith("rx/util/functions/Functions") || className.startsWith("rx/util/functions/Actions"))
66+
&& methodName.equals(FUNCTION_METHOD)
67+
&& (SimpleSuspendableClassifier.extendsOrImplements("rx/util/functions/Function", db, className, superClassName, interfaces)
68+
|| SimpleSuspendableClassifier.extendsOrImplements("rx/util/functions/Action", db, className, superClassName, interfaces));
69+
}
70+
71+
private static String packageOf(String className) {
72+
try {
73+
return className.substring(0, className.lastIndexOf('/')).replace('/', '.');
74+
} catch (RuntimeException e) {
75+
System.err.println("???? " + className);
76+
throw e;
77+
}
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
rx.quasar.RxSuspendableClassifier
Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1 @@
1-
rx.Observer.onNext
2-
rx.Observer.onError
3-
rx.Observer.onCompleted
41
rx.Observable.subscribe
5-
rx.util.functions.Actions.Actions$1.call
6-
rx.util.functions.Actions.Actions$2.call
7-
rx.util.functions.Actions.Actions$3.call
8-
rx.util.functions.Actions.Actions$4.call
9-
rx.util.functions.Actions.Actions$5.call
10-
rx.util.functions.Actions.Actions$6.call
11-
rx.util.functions.Actions.Actions$7.call
12-
rx.util.functions.Actions.Actions$8.call
13-
rx.util.functions.Actions.Actions$9.call
14-
rx.util.functions.Actions.Actions$10.call
15-
rx.util.functions.Actions.Actions$11.call
16-
rx.util.functions.Actions.Actions$12.call
17-
rx.util.functions.Actions.Actions$13.call
18-
rx.util.functions.Actions.Actions$14.call
19-
rx.util.functions.Actions.Functions$1.call
20-
rx.util.functions.Actions.Functions$2.call
21-
rx.util.functions.Actions.Functions$3.call
22-
rx.util.functions.Actions.Functions$4.call
23-
rx.util.functions.Actions.Functions$5.call
24-
rx.util.functions.Actions.Functions$6.call
25-
rx.util.functions.Actions.Functions$7.call
26-
rx.util.functions.Actions.Functions$8.call
27-
rx.util.functions.Actions.Functions$9.call
28-
rx.util.functions.Actions.Functions$10.call
29-
rx.util.functions.Actions.Functions$11.call
30-
rx.util.functions.Actions.Functions$12.call
31-
rx.util.functions.Actions.Functions$13.call
32-
rx.util.functions.Actions.Functions$14.call
33-
rx.util.functions.Actions.Functions$15.call

0 commit comments

Comments
 (0)