Skip to content

Commit a0e5cf1

Browse files
Merge pull request #1332 from ashleyj/master
IOSSchedulers for RoboVM
2 parents e808394 + c46153c commit a0e5cf1

File tree

5 files changed

+332
-0
lines changed

5 files changed

+332
-0
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
apply plugin: 'osgi'
2+
3+
dependencies {
4+
compile project(':rxjava-core')
5+
6+
// testing
7+
provided 'junit:junit-dep:4.10'
8+
compile 'org.robovm:robovm-rt:0.0.12'
9+
compile 'org.robovm:robovm-objc:0.0.12'
10+
compile 'org.robovm:robovm-cocoatouch:0.0.12'
11+
}
12+
13+
javadoc {
14+
options {
15+
doclet = "org.benjchristensen.doclet.DocletExclude"
16+
docletpath = [rootProject.file('./gradle/doclet-exclude.jar')]
17+
stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css')
18+
windowTitle = "RxJava iOS Javadoc ${project.version}"
19+
}
20+
options.addStringOption('top').value = '<h2 class="title" style="padding-top:40px">RxJava iOS</h2>'
21+
}
22+
23+
jar {
24+
manifest {
25+
name = 'rxjava-ios'
26+
instruction 'Bundle-Vendor', 'Netflix'
27+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
28+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
29+
instruction 'Fragment-Host', 'com.netflix.rxjava.core'
30+
}
31+
}
32+
33+
test {
34+
testLogging {
35+
exceptionFormat "full"
36+
events "started"
37+
displayGranularity 2
38+
}
39+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
* Copyright 2014 Ashley Williams
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package rx.ios.schedulers;
19+
20+
import org.robovm.apple.foundation.NSOperationQueue;
21+
import rx.Scheduler;
22+
import rx.Subscription;
23+
import rx.functions.Action0;
24+
import rx.internal.util.RxThreadFactory;
25+
import rx.subscriptions.CompositeSubscription;
26+
import rx.subscriptions.Subscriptions;
27+
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.Future;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.TimeUnit;
32+
33+
/**
34+
* Schedules actions to run on an iOS Handler thread.
35+
*/
36+
public class HandlerThreadScheduler extends Scheduler {
37+
38+
private final NSOperationQueue operationQueue;
39+
private static final String THREAD_PREFIX = "RxiOSScheduledExecutorPool-";
40+
41+
42+
public HandlerThreadScheduler(NSOperationQueue operationQueue) {
43+
this.operationQueue = operationQueue;
44+
}
45+
46+
@Override
47+
public Worker createWorker() {
48+
return new InnerHandlerThreadScheduler(operationQueue);
49+
}
50+
51+
52+
private static class InnerHandlerThreadScheduler extends Worker {
53+
54+
private final NSOperationQueue operationQueue;
55+
private CompositeSubscription innerSubscription = new CompositeSubscription();
56+
57+
58+
public InnerHandlerThreadScheduler(NSOperationQueue operationQueue) {
59+
this.operationQueue = operationQueue;
60+
}
61+
62+
@Override
63+
public void unsubscribe() {
64+
innerSubscription.unsubscribe();
65+
}
66+
67+
@Override
68+
public boolean isUnsubscribed() {
69+
return innerSubscription.isUnsubscribed();
70+
}
71+
72+
@Override
73+
public Subscription schedule(final Action0 action) {
74+
return schedule(action, 0, null);
75+
}
76+
77+
@Override
78+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
79+
return scheduledAction(action, delayTime, unit);
80+
}
81+
82+
public Subscription scheduledAction(final Action0 action, long delay, TimeUnit unit) {
83+
84+
if (innerSubscription.isUnsubscribed()) {
85+
return Subscriptions.empty();
86+
}
87+
88+
final ScheduledIOSAction scheduledAction = new ScheduledIOSAction(action, operationQueue);
89+
final ScheduledExecutorService executor = IOSScheduledExecutorPool.getInstance();
90+
91+
Future<?> future;
92+
if (delay <= 0) {
93+
future = executor.submit(scheduledAction);
94+
} else {
95+
future = executor.schedule(scheduledAction, delay, unit);
96+
}
97+
98+
scheduledAction.add(Subscriptions.from(future));
99+
scheduledAction.addParent(innerSubscription);
100+
101+
return scheduledAction;
102+
}
103+
}
104+
105+
106+
private static final class IOSScheduledExecutorPool {
107+
108+
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_PREFIX);
109+
110+
private static IOSScheduledExecutorPool INSTANCE = new IOSScheduledExecutorPool();
111+
private final ScheduledExecutorService executorService;
112+
113+
private IOSScheduledExecutorPool() {
114+
executorService = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
115+
}
116+
117+
public static ScheduledExecutorService getInstance() {
118+
return INSTANCE.executorService;
119+
}
120+
}
121+
122+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
* Copyright 2014 Ashley Williams
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package rx.ios.schedulers;
18+
19+
import rx.Scheduler;
20+
import org.robovm.apple.foundation.NSOperationQueue;
21+
22+
23+
public class IOSSchedulers {
24+
25+
private static final Scheduler MAIN_THREAD_SCHEDULER =
26+
new HandlerThreadScheduler((NSOperationQueue) NSOperationQueue.getMainQueue());
27+
28+
private IOSSchedulers(){
29+
30+
}
31+
32+
33+
public static Scheduler handlerThread(final NSOperationQueue operationQueue) {
34+
return new HandlerThreadScheduler(operationQueue);
35+
}
36+
37+
public static Scheduler mainThread() {
38+
return MAIN_THREAD_SCHEDULER;
39+
}
40+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
* Copyright 2014 Ashley Williams
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package rx.ios.schedulers;
19+
20+
import org.robovm.apple.foundation.NSBlockOperation;
21+
import org.robovm.apple.foundation.NSOperationQueue;
22+
import rx.Subscription;
23+
import rx.functions.Action0;
24+
import rx.subscriptions.CompositeSubscription;
25+
26+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
27+
28+
/**
29+
* Based on {@code ScheduledAction} - A {@code Runnable} that executes an {@code Action0}
30+
* that can be cancelled.
31+
*/
32+
final class ScheduledIOSAction implements Runnable, Subscription {
33+
final CompositeSubscription cancel;
34+
final Action0 action;
35+
NSBlockOperation nsBlockOperation;
36+
final NSOperationQueue operationQueue;
37+
volatile int once;
38+
static final AtomicIntegerFieldUpdater<ScheduledIOSAction> ONCE_UPDATER
39+
= AtomicIntegerFieldUpdater.newUpdater(ScheduledIOSAction.class, "once");
40+
41+
public ScheduledIOSAction(Action0 action, NSOperationQueue operationQueue) {
42+
this.action = action;
43+
this.operationQueue = operationQueue;
44+
this.cancel = new CompositeSubscription();
45+
46+
nsBlockOperation = new NSBlockOperation();
47+
}
48+
49+
@Override
50+
public void run() {
51+
try {
52+
53+
final Runnable actionRunner = new Runnable() {
54+
@Override
55+
public void run() {
56+
action.call();
57+
}
58+
};
59+
60+
nsBlockOperation.addExecutionBlock$(actionRunner);
61+
62+
/* Add operation to operation queue*/
63+
operationQueue.addOperation(nsBlockOperation);
64+
65+
} finally {
66+
unsubscribe();
67+
}
68+
}
69+
70+
@Override
71+
public boolean isUnsubscribed() {
72+
return cancel.isUnsubscribed();
73+
}
74+
75+
@Override
76+
public void unsubscribe() {
77+
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
78+
nsBlockOperation.cancel();
79+
cancel.unsubscribe();
80+
System.err.println("cancelled");
81+
}
82+
}
83+
84+
/**
85+
* Adds a {@code Subscription} to the {@link CompositeSubscription} to be later cancelled on unsubscribe
86+
*
87+
* @param s subscription to add
88+
*/
89+
public void add(Subscription s) {
90+
cancel.add(s);
91+
}
92+
93+
/**
94+
* Adds a parent {@link rx.subscriptions.CompositeSubscription} to this {@code ScheduledIOSAction} so when
95+
* the action is cancelled or terminates, it can remove itself from this parent
96+
* @param parent the parent {@code CompositeSubscription} to add
97+
*/
98+
public void addParent(CompositeSubscription parent) {
99+
cancel.add(new Remover(this, parent));
100+
}
101+
102+
103+
/**
104+
* Remove a child subscription from a composite when unsubscribing
105+
*/
106+
private static final class Remover implements Subscription {
107+
final Subscription s;
108+
final CompositeSubscription parent;
109+
volatile int once;
110+
static final AtomicIntegerFieldUpdater<Remover> ONCE_UPDATER
111+
= AtomicIntegerFieldUpdater.newUpdater(Remover.class, "once");
112+
113+
public Remover(Subscription s, CompositeSubscription parent) {
114+
this.s = s;
115+
this.parent = parent;
116+
}
117+
118+
@Override
119+
public boolean isUnsubscribed() {
120+
return s.isUnsubscribed();
121+
}
122+
123+
@Override
124+
public void unsubscribe() {
125+
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
126+
parent.remove(s);
127+
}
128+
}
129+
}
130+
}

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ include 'rxjava-core', \
77
'language-adaptors:rxjava-kotlin', \
88
'rxjava-contrib:rxjava-swing', \
99
//'rxjava-contrib:rxjava-javafx', \
10+
'rxjava-contrib:rxjava-ios', \
1011
'rxjava-contrib:rxjava-android', \
1112
'rxjava-contrib:rxjava-android-samples-build-wrapper', \
1213
'rxjava-contrib:rxjava-apache-http', \

0 commit comments

Comments
 (0)