Skip to content

Commit 00a81de

Browse files
committed
adding unit operators to CompletableTask; adding some tests
1 parent 513e61b commit 00a81de

File tree

4 files changed

+196
-3
lines changed

4 files changed

+196
-3
lines changed

pom.xml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>net.tascalate.concurrent</groupId>
66
<artifactId>net.tascalate.concurrent.lib</artifactId>
7-
<version>1.0-SNAPSHOT</version>
7+
<version>0.5-SNAPSHOT</version>
88
<packaging>jar</packaging>
99

1010
<name>Tascalate Concurrenct</name>
@@ -53,6 +53,7 @@
5353
</dependencies>
5454
</dependencyManagement>
5555

56+
5657
<distributionManagement>
5758
<snapshotRepository>
5859
<id>ossrh</id>
@@ -115,6 +116,19 @@
115116
</profiles>
116117

117118

119+
<dependencies>
120+
<dependency>
121+
<groupId>commons-logging</groupId>
122+
<artifactId>commons-logging</artifactId>
123+
</dependency>
124+
125+
<dependency>
126+
<groupId>junit</groupId>
127+
<artifactId>junit</artifactId>
128+
<scope>test</scope>
129+
</dependency>
130+
</dependencies>
131+
118132
<build>
119133
<plugins>
120134
<!--

src/main/java/net/tascalate/concurrent/CompletableTask.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
public class CompletableTask<T> extends AbstractCompletableTask<T> implements RunnableFuture<T> {
2323

24-
public CompletableTask(final Executor executor, Callable<T> callable) {
24+
protected CompletableTask(final Executor executor, Callable<T> callable) {
2525
super(executor, callable);
2626
}
2727

@@ -30,11 +30,15 @@ public void run() {
3030
task.run();
3131
}
3232

33-
public static <T> Promise<T> completedFuture(T value, Executor defaultExecutor) {
33+
public static <T> Promise<T> resolve(T value, Executor defaultExecutor) {
3434
CompletableTask<T> result = new CompletableTask<T>(defaultExecutor, () -> value);
3535
SAME_THREAD_EXECUTOR.execute(result);
3636
return result;
3737
}
38+
39+
public static Promise<Void> asyncOn(Executor defaultExecutor) {
40+
return resolve(null, defaultExecutor);
41+
}
3842

3943
@Override
4044
Runnable setupTransition(Callable<T> code) {
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package net.tascalate.concurrent;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
import org.junit.After;
7+
import org.junit.Assert;
8+
import org.junit.Before;
9+
import org.junit.Test;
10+
11+
public class EitherCompletableTaskTests {
12+
13+
private TaskExecutorService executor;
14+
15+
@Before
16+
public void setup() {
17+
executor = TaskExecutors.newFixedThreadPool(4);
18+
}
19+
20+
@After
21+
public void tearDown() {
22+
executor.shutdown();
23+
}
24+
25+
@Test
26+
public void testAnyTaskIsExecuted1() throws Exception {
27+
CountDownLatch ready = new CountDownLatch(1);
28+
Throwable[] errors = {null, null};
29+
String[] results = {null, null};
30+
AtomicInteger idx = new AtomicInteger(0);
31+
32+
Promise<String> first = executor.submit(() -> resultWithDelay("ABC", 2000));
33+
Promise<String> second = executor.submit(() -> resultWithDelay("XYZ", 100)); // Second wins -- shorter delay
34+
35+
first
36+
.applyToEitherAsync(second, String::toLowerCase)
37+
.whenComplete((r, e) -> {
38+
int i = idx.getAndIncrement();
39+
results[i] = r;
40+
errors[i] = e;
41+
ready.countDown();
42+
});
43+
44+
ready.await();
45+
Assert.assertNull(errors[0]);
46+
Assert.assertEquals(results[0], "xyz");
47+
}
48+
49+
@Test
50+
public void testAnyTaskIsExecuted2() throws Exception {
51+
CountDownLatch ready = new CountDownLatch(1);
52+
Throwable[] errors = {null, null};
53+
String[] results = {null, null};
54+
AtomicInteger idx = new AtomicInteger(0);
55+
56+
Promise<String> first = executor.submit(() -> resultWithDelay("ABC", 2000));
57+
Promise<String> second = executor.submit(() -> resultWithDelay("XYZ", 100)); // Second wins -- shorter delay
58+
59+
second
60+
.applyToEitherAsync(first, String::toLowerCase)
61+
.whenComplete((r, e) -> {
62+
int i = idx.getAndIncrement();
63+
results[i] = r;
64+
errors[i] = e;
65+
ready.countDown();
66+
});
67+
68+
ready.await();
69+
Assert.assertNull(errors[0]);
70+
Assert.assertEquals(results[0], "xyz");
71+
}
72+
73+
74+
<T> T resultWithDelay(T value, long delay) throws InterruptedException {
75+
Thread.sleep(delay);
76+
return value;
77+
}
78+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package net.tascalate.concurrent;
2+
3+
import java.util.concurrent.CompletionException;
4+
import java.util.concurrent.CountDownLatch;
5+
6+
import org.junit.After;
7+
import org.junit.Assert;
8+
import org.junit.Before;
9+
import org.junit.Test;
10+
11+
public class SimpleCompletableTaskTests {
12+
13+
private TaskExecutorService executor;
14+
15+
@Before
16+
public void setup() {
17+
executor = TaskExecutors.newFixedThreadPool(4);
18+
}
19+
20+
@After
21+
public void tearDown() {
22+
executor.shutdown();
23+
}
24+
25+
@Test
26+
public void testTaskIsExecuted1() throws Exception {
27+
Promise<Integer> promise = executor.submit(() -> 50);
28+
int i = promise.get();
29+
Assert.assertEquals(i, 50);
30+
}
31+
32+
@Test
33+
public void testTaskIsExecuted2() throws Exception {
34+
CountDownLatch ready = new CountDownLatch(1);
35+
Throwable[] errors = {null};
36+
String[] results = {null};
37+
executor.submit(() -> "ABC").whenComplete((r, e) -> {
38+
results[0] = r;
39+
errors[0] = e;
40+
ready.countDown();
41+
});
42+
ready.await();
43+
Assert.assertNull(errors[0]);
44+
Assert.assertEquals(results[0], "ABC");
45+
}
46+
47+
@Test
48+
public void testTaskIsFailed() throws Exception {
49+
CountDownLatch ready = new CountDownLatch(1);
50+
Throwable[] errors = {null};
51+
Integer[] results = {null};
52+
executor.submit(this::failedResult).whenComplete((r, e) -> {
53+
results[0] = r;
54+
errors[0] = e;
55+
ready.countDown();
56+
});
57+
ready.await();
58+
Assert.assertNull(results[0]);
59+
Assert.assertTrue("ArithmeticException was not raised", errors[0] instanceof ArithmeticException);
60+
}
61+
62+
@Test
63+
public void testNestedTaskIsExecuted() throws Exception {
64+
CountDownLatch ready = new CountDownLatch(1);
65+
Throwable[] errors = {null};
66+
Integer[] results = {null};
67+
executor.submit(() -> 50).thenApply(i -> i * 10).whenComplete((r, e) -> {
68+
results[0] = r;
69+
errors[0] = e;
70+
ready.countDown();
71+
});
72+
ready.await();
73+
Assert.assertNull(errors[0]);
74+
Assert.assertEquals(results[0], Integer.valueOf(500));
75+
}
76+
77+
@Test
78+
public void testNestedTaskIsFailed() throws Exception {
79+
CountDownLatch ready = new CountDownLatch(1);
80+
Throwable[] errors = {null};
81+
Integer[] results = {null};
82+
executor.submit(this::failedResult).thenApply(i -> i * 10).whenComplete((r, e) -> {
83+
results[0] = r;
84+
errors[0] = e;
85+
ready.countDown();
86+
});
87+
ready.await();
88+
Assert.assertNull(results[0]);
89+
Assert.assertTrue("CompletionException was not raised", errors[0] instanceof CompletionException);
90+
Assert.assertTrue("CompletionException is not caused by ArithmeticException", errors[0].getCause() instanceof ArithmeticException);
91+
}
92+
93+
94+
int failedResult() {
95+
throw new ArithmeticException();
96+
}
97+
}

0 commit comments

Comments
 (0)