Skip to content

Commit 0554dc0

Browse files
committed
initial commit
1 parent 0e5a098 commit 0554dc0

File tree

10 files changed

+507
-1
lines changed

10 files changed

+507
-1
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ bin/
6262

6363
# NetBeans specific files/directories
6464
.nbattrs
65+
/.nb-gradle/profiles/private/
6566

6667
# Scala build
6768
*.cache
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# rxjava-quasar
2+
3+
Integrates RxJava with [Quasar](https://github.com/puniverse/quasar).
4+
Includes a fiber (lightweight-thread) based scheduler, and an Observable API for Quasar channels.
5+
6+
Main Classes:
7+
8+
- [NewFiberScheduler](https://github.com/Netflix/RxJava/blob/master/rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/NewFiberScheduler.java)
9+
- [ChannelObservable](https://github.com/Netflix/RxJava/blob/master/rxjava-contrib/rxjava-quasar/src/main/java/rx/quasar/ChannelObservable.java)
10+
11+
12+
# Binaries
13+
14+
Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ccom.netflix.rxjava).
15+
16+
Example for [Maven](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-apache-http%22):
17+
18+
```xml
19+
<dependency>
20+
<groupId>com.netflix.rxjava</groupId>
21+
<artifactId>rxjava-quasar</artifactId>
22+
<version>x.y.z</version>
23+
</dependency>
24+
```
25+
26+
and for Ivy:
27+
28+
```xml
29+
<dependency org="com.netflix.rxjava" name="rxjava-quasar" rev="x.y.z" />
30+
```
31+
32+
# Sample Usage
33+
34+
35+
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
apply plugin: 'osgi'
2+
3+
sourceCompatibility = JavaVersion.VERSION_1_6
4+
targetCompatibility = JavaVersion.VERSION_1_7
5+
6+
configurations {
7+
quasar
8+
}
9+
10+
repositories {
11+
mavenLocal()
12+
mavenCentral()
13+
maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
14+
}
15+
16+
dependencies {
17+
compile project(':rxjava-core')
18+
compile 'co.paralleluniverse:quasar-core:0.5.0-SNAPSHOT'
19+
quasar 'co.paralleluniverse:quasar-core:0.5.0-SNAPSHOT'
20+
testCompile project(":rxjava-core").sourceSets.test.output
21+
provided 'junit:junit-dep:4.10'
22+
provided 'org.mockito:mockito-core:1.8.5'
23+
}
24+
25+
jar {
26+
manifest {
27+
name = 'rxjava-quasar'
28+
instruction 'Bundle-Vendor', 'Netflix'
29+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
30+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
31+
}
32+
}
33+
34+
tasks.withType(Test) {
35+
jvmArgs "-javaagent:${configurations.quasar.iterator().next()}" // =vdmc (verbose, debug, allow monitors, check class)
36+
}
37+
38+
tasks.withType(JavaExec) {
39+
jvmArgs "-javaagent:${configurations.quasar.iterator().next()}" // =vdmc (verbose, debug, allow monitors, check class)
40+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/package rx.quasar;
16+
17+
import co.paralleluniverse.fibers.SuspendExecution;
18+
import co.paralleluniverse.fibers.Suspendable;
19+
import co.paralleluniverse.strands.Strand;
20+
import co.paralleluniverse.strands.channels.ReceivePort;
21+
import co.paralleluniverse.strands.channels.SendPort;
22+
import rx.Observable;
23+
import rx.Observer;
24+
import rx.Scheduler;
25+
26+
/**
27+
*
28+
*/
29+
public class ChannelObservable {
30+
31+
/**
32+
* Converts an {@link Iterable} sequence into an Observable that emits each message received on the channel.
33+
* <p>
34+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.png">
35+
* <p>
36+
* @param channel
37+
* the source {@link ReceivePort}
38+
* @param <T>
39+
* the type of messages on the channel and the type of items to be
40+
* emitted by the resulting Observable
41+
* @return an Observable that emits each message received on the source {@link ReceivePort}
42+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
43+
*/
44+
public final static <T> Observable<T> from(ReceivePort<T> channel) {
45+
return Observable.create(new OnSubscribeFromChannel<T>(channel));
46+
}
47+
48+
/**
49+
* Converts an {@link Iterable} sequence into an Observable that operates on the specified
50+
* scheduler, emitting each message received on the channel.
51+
* <p>
52+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.s.png">
53+
* <p>
54+
* @param channel
55+
* the source {@link ReceivePort}
56+
* @param scheduler
57+
* the scheduler on which the Observable is to emit the messages received on the channel
58+
* @param <T>
59+
* the type of messages on the channel and the type of items to be
60+
* emitted by the resulting Observable
61+
* @return an Observable that emits each message received on the source {@link ReceivePort}, on the
62+
* specified scheduler
63+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#wiki-from">RxJava Wiki: from()</a>
64+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
65+
*/
66+
public final static <T> Observable<T> from(ReceivePort<T> channel, Scheduler scheduler) {
67+
return Observable.create(new OnSubscribeFromChannel<T>(channel)).subscribeOn(scheduler);
68+
}
69+
70+
/**
71+
* Converts a {@link SendPort} channel into an {@link Observer}.
72+
* <p>
73+
* @param <T> the type of messages that can be sent to the channel and the type of items to be
74+
* received by the Observer
75+
* @param channel the target {@link SendPort}
76+
* @return
77+
*/
78+
public final static <T> Observer<T> to(final SendPort<T> channel) {
79+
return new Observer<T>() {
80+
81+
@Override
82+
@Suspendable
83+
public void onNext(T t) {
84+
try {
85+
channel.send(t);
86+
} catch (InterruptedException ex) {
87+
Strand.interrupted();
88+
} catch (SuspendExecution ex) {
89+
throw new AssertionError(ex);
90+
}
91+
}
92+
93+
@Override
94+
public void onCompleted() {
95+
channel.close();
96+
}
97+
98+
@Override
99+
public void onError(Throwable e) {
100+
}
101+
};
102+
}
103+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.quasar;
17+
18+
import co.paralleluniverse.fibers.DefaultFiberScheduler;
19+
import co.paralleluniverse.fibers.Fiber;
20+
import co.paralleluniverse.fibers.FiberScheduler;
21+
import co.paralleluniverse.fibers.SuspendExecution;
22+
import co.paralleluniverse.strands.SuspendableRunnable;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
import rx.Scheduler;
26+
import rx.Subscription;
27+
import rx.subscriptions.CompositeSubscription;
28+
import rx.subscriptions.Subscriptions;
29+
import rx.util.functions.Action1;
30+
31+
/**
32+
* Schedules work on a new fiber.
33+
*/
34+
public class NewFiberScheduler extends Scheduler {
35+
private final static NewFiberScheduler DEFAULT_INSTANCE = new NewFiberScheduler();
36+
37+
public static NewFiberScheduler getDefaultInstance() {
38+
return DEFAULT_INSTANCE;
39+
}
40+
41+
private final FiberScheduler fiberScheduler;
42+
43+
public NewFiberScheduler(FiberScheduler fiberScheduler) {
44+
if(fiberScheduler == null)
45+
throw new IllegalArgumentException("Fiber scheduler is null");
46+
if(fiberScheduler == DefaultFiberScheduler.getInstance() && DEFAULT_INSTANCE != null)
47+
throw new IllegalArgumentException("Fiber scheduler is the default FiberScheduler; use getDefaultInstance()");
48+
this.fiberScheduler = fiberScheduler;
49+
}
50+
51+
private NewFiberScheduler() {
52+
this(DefaultFiberScheduler.getInstance());
53+
}
54+
55+
@Override
56+
public Subscription schedule(Action1<Scheduler.Inner> action) {
57+
EventLoopScheduler innerScheduler = new EventLoopScheduler();
58+
innerScheduler.schedule(action);
59+
return innerScheduler.innerSubscription;
60+
}
61+
62+
@Override
63+
public Subscription schedule(Action1<Scheduler.Inner> action, long delayTime, TimeUnit unit) {
64+
EventLoopScheduler innerScheduler = new EventLoopScheduler();
65+
innerScheduler.schedule(action, delayTime, unit);
66+
return innerScheduler.innerSubscription;
67+
}
68+
69+
private class EventLoopScheduler extends Scheduler.Inner implements Subscription {
70+
private final CompositeSubscription innerSubscription = new CompositeSubscription();
71+
72+
private EventLoopScheduler() {
73+
}
74+
75+
@Override
76+
public void schedule(final Action1<Scheduler.Inner> action) {
77+
if (innerSubscription.isUnsubscribed()) {
78+
// don't schedule, we are unsubscribed
79+
return;
80+
}
81+
82+
final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
83+
Subscription s = Subscriptions.from(new Fiber(fiberScheduler, new SuspendableRunnable() {
84+
85+
@Override
86+
public void run() throws SuspendExecution {
87+
try {
88+
if (innerSubscription.isUnsubscribed()) {
89+
return;
90+
}
91+
action.call(EventLoopScheduler.this);
92+
} finally {
93+
// remove the subscription now that we're completed
94+
Subscription s = sf.get();
95+
if (s != null) {
96+
innerSubscription.remove(s);
97+
}
98+
}
99+
}
100+
}).start());
101+
102+
sf.set(s);
103+
innerSubscription.add(s);
104+
}
105+
106+
@Override
107+
public void schedule(final Action1<Scheduler.Inner> action, final long delayTime, final TimeUnit unit) {
108+
final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
109+
110+
Subscription s = Subscriptions.from(new Fiber(fiberScheduler, new SuspendableRunnable() {
111+
112+
@Override
113+
public void run() throws InterruptedException, SuspendExecution {
114+
Fiber.sleep(delayTime, unit);
115+
try {
116+
if (innerSubscription.isUnsubscribed()) {
117+
return;
118+
}
119+
// now that the delay is past schedule the work to be done for real on the UI thread
120+
action.call(EventLoopScheduler.this);
121+
} finally {
122+
// remove the subscription now that we're completed
123+
Subscription s = sf.get();
124+
if (s != null) {
125+
innerSubscription.remove(s);
126+
}
127+
}
128+
}
129+
}).start());
130+
131+
sf.set(s);
132+
innerSubscription.add(s);
133+
}
134+
135+
@Override
136+
public void unsubscribe() {
137+
innerSubscription.unsubscribe();
138+
}
139+
140+
@Override
141+
public boolean isUnsubscribed() {
142+
return innerSubscription.isUnsubscribed();
143+
}
144+
}
145+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.quasar;
17+
18+
import co.paralleluniverse.fibers.Suspendable;
19+
import co.paralleluniverse.strands.channels.ReceivePort;
20+
import rx.Observable.OnSubscribe;
21+
import rx.Subscriber;
22+
23+
/**
24+
* Converts a {@link ReceivePort} into an Observable that emits each message received on the channel.
25+
* <p>
26+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-Observers/toObservable.png">
27+
*/
28+
public final class OnSubscribeFromChannel<T> implements OnSubscribe<T> {
29+
30+
final ReceivePort<? extends T> channel;
31+
32+
public OnSubscribeFromChannel(ReceivePort<? extends T> channel) {
33+
this.channel = channel;
34+
}
35+
36+
@Override
37+
@Suspendable
38+
public void call(Subscriber<? super T> o) {
39+
for (;;) {
40+
T m;
41+
42+
try {
43+
m = channel.receive();
44+
if (m == null)
45+
break;
46+
if (o.isUnsubscribed()) {
47+
return;
48+
}
49+
} catch (InterruptedException e) {
50+
break;
51+
} catch (Exception e) {
52+
o.onError(e);
53+
continue;
54+
}
55+
56+
o.onNext(m);
57+
}
58+
59+
o.onCompleted();
60+
}
61+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
rx.util.functions.Action1.call
2+
rx.Observer.onNext

0 commit comments

Comments
 (0)