Skip to content

Commit 7ce53f0

Browse files
author
jmhofer
committed
Merge branch 'master' into combineLatest
2 parents fc99abe + 8664ce2 commit 7ce53f0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3123
-404
lines changed

CHANGES.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,28 @@
11
# RxJava Releases #
22

3+
### Version 0.8.2 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.8.2%22)) ###
4+
5+
* [Issue 74](https://github.com/Netflix/RxJava/issues/74) Operator: Sample
6+
* [Issue 93](https://github.com/Netflix/RxJava/issues/93) Operator: Timestamp
7+
* [Pull 253](https://github.com/Netflix/RxJava/pull/253) Fix multiple subscription bug on operation filter
8+
* [Pull 254](https://github.com/Netflix/RxJava/pull/254) SwingScheduler (new rxjava-swing module)
9+
* [Pull 256](https://github.com/Netflix/RxJava/pull/256) BehaviorSubject
10+
* [Pull 257](https://github.com/Netflix/RxJava/pull/257) Improved scan, reduce, aggregate
11+
* [Pull 262](https://github.com/Netflix/RxJava/pull/262) SwingObservable (new rxjava-swing module)
12+
* [Pull 264](https://github.com/Netflix/RxJava/pull/263) Publish, Replay and Cache Operators
13+
*
14+
### Version 0.8.1 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.8.1%22)) ###
15+
16+
* [Pull 250](https://github.com/Netflix/RxJava/pull/250) AsyncSubject
17+
* [Pull 252](https://github.com/Netflix/RxJava/pull/252) ToFuture
18+
* [Pull 246](https://github.com/Netflix/RxJava/pull/246) Scheduler.schedulePeriodically
19+
* [Pull 247](https://github.com/Netflix/RxJava/pull/247) flatMap aliased to mapMany
20+
321
### Version 0.8.0 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.8.0%22)) ###
422

523
This is a breaking (non-backwards compatible) release that updates the Scheduler implementation released in 0.7.0.
624

7-
See See https://github.com/Netflix/RxJava/issues/19 for background, discussion and status of Schedulers.
25+
See https://github.com/Netflix/RxJava/issues/19 for background, discussion and status of Schedulers.
826

927
It is believed that the public signatures of Scheduler and related objects is now stabilized but ongoing feedback and review by the community could still result in changes.
1028

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.8.1-SNAPSHOT
1+
version=0.8.3

language-adaptors/rxjava-clojure/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ apply plugin: 'osgi'
77
dependencies {
88
compile project(':rxjava-core')
99
provided 'org.clojure:clojure:1.4.+'
10-
provided 'junit:junit:4.10'
10+
provided 'junit:junit-dep:4.10'
1111
provided 'org.mockito:mockito-core:1.8.5'
1212

1313
// clojure

language-adaptors/rxjava-groovy/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ apply plugin: 'osgi'
77
dependencies {
88
compile project(':rxjava-core')
99
groovy 'org.codehaus.groovy:groovy-all:2.+'
10-
provided 'junit:junit:4.10'
10+
provided 'junit:junit-dep:4.10'
1111
provided 'org.mockito:mockito-core:1.8.5'
1212
}
1313

language-adaptors/rxjava-jruby/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ apply plugin: 'osgi'
66
dependencies {
77
compile project(':rxjava-core')
88
provided 'org.jruby:jruby:1.6+'
9-
provided 'junit:junit:4.10'
9+
provided 'junit:junit-dep:4.10'
1010
provided 'org.mockito:mockito-core:1.8.5'
1111
}
1212

language-adaptors/rxjava-scala/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ dependencies {
2020
provided 'org.scalatest:scalatest_2.10:1.9.1'
2121

2222
compile project(':rxjava-core')
23-
provided 'junit:junit:4.10'
23+
provided 'junit:junit-dep:4.10'
2424
provided 'org.mockito:mockito-core:1.8.5'
2525

2626
testCompile 'org.scalatest:scalatest_2.10:1.9.1'
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
apply plugin: 'java'
2+
apply plugin: 'eclipse'
3+
apply plugin: 'idea'
4+
apply plugin: 'osgi'
5+
6+
sourceCompatibility = JavaVersion.VERSION_1_6
7+
targetCompatibility = JavaVersion.VERSION_1_6
8+
9+
dependencies {
10+
compile project(':rxjava-core')
11+
provided 'junit:junit-dep:4.10'
12+
provided 'org.mockito:mockito-core:1.8.5'
13+
}
14+
15+
eclipse {
16+
classpath {
17+
// include 'provided' dependencies on the classpath
18+
plusConfigurations += configurations.provided
19+
20+
downloadSources = true
21+
downloadJavadoc = true
22+
}
23+
}
24+
25+
idea {
26+
module {
27+
// include 'provided' dependencies on the classpath
28+
scopes.PROVIDED.plus += configurations.provided
29+
}
30+
}
31+
32+
javadoc {
33+
options {
34+
doclet = "org.benjchristensen.doclet.DocletExclude"
35+
docletpath = [rootProject.file('./gradle/doclet-exclude.jar')]
36+
stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css')
37+
windowTitle = "RxJava Javadoc ${project.version}"
38+
}
39+
options.addStringOption('top').value = '<h2 class="title" style="padding-top:40px">RxJava</h2>'
40+
}
41+
42+
jar {
43+
manifest {
44+
name = 'rxjava-swing'
45+
instruction 'Bundle-Vendor', 'Netflix'
46+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
47+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
48+
}
49+
}
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.concurrency;
17+
18+
import static org.junit.Assert.assertTrue;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.awt.EventQueue;
22+
import java.awt.event.ActionEvent;
23+
import java.awt.event.ActionListener;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
27+
import javax.swing.SwingUtilities;
28+
import javax.swing.Timer;
29+
30+
import org.junit.Rule;
31+
import org.junit.Test;
32+
import org.junit.rules.ExpectedException;
33+
import org.mockito.InOrder;
34+
35+
import rx.Scheduler;
36+
import rx.Subscription;
37+
import rx.subscriptions.CompositeSubscription;
38+
import rx.subscriptions.Subscriptions;
39+
import rx.util.functions.Action0;
40+
import rx.util.functions.Func0;
41+
import rx.util.functions.Func2;
42+
43+
/**
44+
* Executes work on the Swing UI thread.
45+
* This scheduler should only be used with actions that execute quickly.
46+
*/
47+
public final class SwingScheduler extends Scheduler {
48+
private static final SwingScheduler INSTANCE = new SwingScheduler();
49+
50+
public static SwingScheduler getInstance() {
51+
return INSTANCE;
52+
}
53+
54+
private SwingScheduler() {
55+
}
56+
57+
@Override
58+
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
59+
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
60+
EventQueue.invokeLater(new Runnable() {
61+
@Override
62+
public void run() {
63+
sub.set(action.call(SwingScheduler.this, state));
64+
}
65+
});
66+
return Subscriptions.create(new Action0() {
67+
@Override
68+
public void call() {
69+
Subscription subscription = sub.get();
70+
if (subscription != null) {
71+
subscription.unsubscribe();
72+
}
73+
}
74+
});
75+
}
76+
77+
@Override
78+
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
79+
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
80+
long delay = unit.toMillis(dueTime);
81+
assertThatTheDelayIsValidForTheSwingTimer(delay);
82+
83+
class ExecuteOnceAction implements ActionListener {
84+
private Timer timer;
85+
86+
private void setTimer(Timer timer) {
87+
this.timer = timer;
88+
}
89+
90+
@Override
91+
public void actionPerformed(ActionEvent e) {
92+
timer.stop();
93+
sub.set(action.call(SwingScheduler.this, state));
94+
}
95+
}
96+
97+
ExecuteOnceAction executeOnce = new ExecuteOnceAction();
98+
final Timer timer = new Timer((int) delay, executeOnce);
99+
executeOnce.setTimer(timer);
100+
timer.start();
101+
102+
return Subscriptions.create(new Action0() {
103+
@Override
104+
public void call() {
105+
timer.stop();
106+
107+
Subscription subscription = sub.get();
108+
if (subscription != null) {
109+
subscription.unsubscribe();
110+
}
111+
}
112+
});
113+
}
114+
115+
@Override
116+
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
117+
final AtomicReference<Timer> timer = new AtomicReference<Timer>();
118+
119+
final long delay = unit.toMillis(period);
120+
assertThatTheDelayIsValidForTheSwingTimer(delay);
121+
122+
final CompositeSubscription subscriptions = new CompositeSubscription();
123+
final Func2<Scheduler, T, Subscription> initialAction = new Func2<Scheduler, T, Subscription>() {
124+
@Override
125+
public Subscription call(final Scheduler scheduler, final T state0) {
126+
// start timer for periodic execution, collect subscriptions
127+
timer.set(new Timer((int) delay, new ActionListener() {
128+
@Override
129+
public void actionPerformed(ActionEvent e) {
130+
subscriptions.add(action.call(scheduler, state0));
131+
}
132+
}));
133+
timer.get().start();
134+
135+
return action.call(scheduler, state0);
136+
}
137+
};
138+
subscriptions.add(schedule(state, initialAction, initialDelay, unit));
139+
140+
subscriptions.add(Subscriptions.create(new Action0() {
141+
@Override
142+
public void call() {
143+
// in addition to all the individual unsubscriptions, stop the timer on unsubscribing
144+
Timer maybeTimer = timer.get();
145+
if (maybeTimer != null) {
146+
maybeTimer.stop();
147+
}
148+
}
149+
}));
150+
151+
return subscriptions;
152+
}
153+
154+
private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) {
155+
if (delay < 0 || delay > Integer.MAX_VALUE) {
156+
throw new IllegalArgumentException(String.format("The swing timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
157+
}
158+
}
159+
160+
public static class UnitTest {
161+
@Rule
162+
public ExpectedException exception = ExpectedException.none();
163+
164+
@Test
165+
public void testInvalidDelayValues() {
166+
final SwingScheduler scheduler = new SwingScheduler();
167+
final Action0 action = mock(Action0.class);
168+
169+
exception.expect(IllegalArgumentException.class);
170+
scheduler.schedulePeriodically(action, -1L, 100L, TimeUnit.SECONDS);
171+
172+
exception.expect(IllegalArgumentException.class);
173+
scheduler.schedulePeriodically(action, 100L, -1L, TimeUnit.SECONDS);
174+
175+
exception.expect(IllegalArgumentException.class);
176+
scheduler.schedulePeriodically(action, 1L + Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS);
177+
178+
exception.expect(IllegalArgumentException.class);
179+
scheduler.schedulePeriodically(action, 100L, 1L + Integer.MAX_VALUE / 1000, TimeUnit.SECONDS);
180+
}
181+
182+
@Test
183+
public void testPeriodicScheduling() throws Exception {
184+
final SwingScheduler scheduler = new SwingScheduler();
185+
186+
final Action0 innerAction = mock(Action0.class);
187+
final Action0 unsubscribe = mock(Action0.class);
188+
final Func0<Subscription> action = new Func0<Subscription>() {
189+
@Override
190+
public Subscription call() {
191+
innerAction.call();
192+
assertTrue(SwingUtilities.isEventDispatchThread());
193+
return Subscriptions.create(unsubscribe);
194+
}
195+
};
196+
197+
Subscription sub = scheduler.schedulePeriodically(action, 50, 200, TimeUnit.MILLISECONDS);
198+
Thread.sleep(840);
199+
sub.unsubscribe();
200+
waitForEmptyEventQueue();
201+
verify(innerAction, times(4)).call();
202+
verify(unsubscribe, times(4)).call();
203+
}
204+
205+
@Test
206+
public void testNestedActions() throws Exception {
207+
final SwingScheduler scheduler = new SwingScheduler();
208+
209+
final Action0 firstStepStart = mock(Action0.class);
210+
final Action0 firstStepEnd = mock(Action0.class);
211+
212+
final Action0 secondStepStart = mock(Action0.class);
213+
final Action0 secondStepEnd = mock(Action0.class);
214+
215+
final Action0 thirdStepStart = mock(Action0.class);
216+
final Action0 thirdStepEnd = mock(Action0.class);
217+
218+
final Action0 firstAction = new Action0() {
219+
@Override
220+
public void call() {
221+
assertTrue(SwingUtilities.isEventDispatchThread());
222+
firstStepStart.call();
223+
firstStepEnd.call();
224+
}
225+
};
226+
final Action0 secondAction = new Action0() {
227+
@Override
228+
public void call() {
229+
assertTrue(SwingUtilities.isEventDispatchThread());
230+
secondStepStart.call();
231+
scheduler.schedule(firstAction);
232+
secondStepEnd.call();
233+
}
234+
};
235+
final Action0 thirdAction = new Action0() {
236+
@Override
237+
public void call() {
238+
assertTrue(SwingUtilities.isEventDispatchThread());
239+
thirdStepStart.call();
240+
scheduler.schedule(secondAction);
241+
thirdStepEnd.call();
242+
}
243+
};
244+
245+
InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd);
246+
247+
scheduler.schedule(thirdAction);
248+
waitForEmptyEventQueue();
249+
250+
inOrder.verify(thirdStepStart, times(1)).call();
251+
inOrder.verify(thirdStepEnd, times(1)).call();
252+
inOrder.verify(secondStepStart, times(1)).call();
253+
inOrder.verify(secondStepEnd, times(1)).call();
254+
inOrder.verify(firstStepStart, times(1)).call();
255+
inOrder.verify(firstStepEnd, times(1)).call();
256+
}
257+
258+
private static void waitForEmptyEventQueue() throws Exception {
259+
EventQueue.invokeAndWait(new Runnable() {
260+
@Override
261+
public void run() {
262+
// nothing to do, we're just waiting here for the event queue to be emptied
263+
}
264+
});
265+
}
266+
}
267+
}

0 commit comments

Comments
 (0)