Skip to content

Commit eb0e27b

Browse files
author
vsilaev
committed
Extensions to Continuations class
1 parent 6a60fbf commit eb0e27b

File tree

5 files changed

+151
-21
lines changed

5 files changed

+151
-21
lines changed

net.tascalate.javaflow.examples/src/main/java/org/apache/commons/javaflow/examples/lambdas/LambdasExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public class LambdasExample {
3030
public static void main(String[] argv) throws Exception {
3131
LambdasExample demo = new LambdasExample();
32-
Continuations.execute(demo::runExamples, s -> System.out.println("Interrupted " + s));
32+
Continuations.execute(demo::runExamples, (String s) -> System.out.println("Interrupted " + s));
3333

3434
System.out.println("===");
3535

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Copyright 2013-2017 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.commons.javaflow.examples.lambdas;
17+
18+
import org.apache.commons.javaflow.api.Continuation;
19+
20+
import org.apache.commons.javaflow.extras.CoIterator;
21+
import org.apache.commons.javaflow.extras.Continuations;
22+
23+
public class LambdasExampleMinimalIterator {
24+
25+
public static void main(String[] argv) throws Exception {
26+
27+
// Create suspended continuation
28+
Continuation cc = Continuations.create(() -> {
29+
try {
30+
for (int i = 1; i <= 5; i++) {
31+
System.out.println("Exe before suspend");
32+
Continuation.suspend(i);
33+
System.out.println("Exe after suspend");
34+
}
35+
} finally {
36+
System.out.println("Continuation gracefully exited");
37+
}
38+
});
39+
40+
// use try-with-resources to close the coIterator
41+
// (and hence terminate underlying continuation)
42+
// in case of early exit
43+
try (CoIterator<Integer> i = Continuations.iterate(cc)) {
44+
int c = 0;
45+
while (i.hasNext()) {
46+
System.out.println("Interrupted " + i.next());
47+
c++;
48+
if (c == 3) {
49+
// Emulate early exit
50+
break;
51+
}
52+
}
53+
}
54+
55+
System.out.println("===");
56+
}
57+
58+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.apache.commons.javaflow.api.Continuation;
1919
import org.apache.commons.javaflow.extras.Continuations;
2020

21-
public class LambdasExampleMinimal {
21+
public class LambdasExampleMinimalLoop {
2222

2323
public static void main(String[] argv) throws Exception {
2424

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright 2013-2017 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.commons.javaflow.examples.lambdas;
17+
18+
import java.util.Optional;
19+
import java.util.stream.Stream;
20+
21+
import org.apache.commons.javaflow.api.Continuation;
22+
import org.apache.commons.javaflow.extras.Continuations;
23+
24+
public class LambdasExampleMinimalStream {
25+
26+
public static void main(String[] argv) throws Exception {
27+
// use try-with-resources to close the stream
28+
// (and hence terminate underlying continuation)
29+
// in case of early exit
30+
try (Stream<Integer> stream = Continuations.stream(() -> {
31+
try {
32+
for (int i = 1; i <= 5; i++) {
33+
System.out.println("Exe before suspend");
34+
Continuation.suspend(i);
35+
System.out.println("Exe after suspend");
36+
}
37+
} finally {
38+
System.out.println("Continuation gracefully exited");
39+
}
40+
})) {
41+
Optional<Integer> firstDividableByThree =
42+
stream.peek(v -> System.out.println("Interrupted " + v))
43+
.filter(v -> v% 3 == 0)
44+
.findFirst()
45+
;
46+
System.out.println(firstDividableByThree);
47+
}
48+
}
49+
50+
}

net.tascalate.javaflow.extras/src/main/java/org/apache/commons/javaflow/extras/Continuations.java

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
import static org.apache.commons.javaflow.extras.ContinuationSupport.toRunnable;
1919

2020
import java.util.Iterator;
21-
import java.util.function.Supplier;
21+
import java.util.Spliterators;
22+
import java.util.function.Consumer;
2223
import java.util.stream.Stream;
24+
import java.util.stream.StreamSupport;
2325

2426
import org.apache.commons.javaflow.api.Continuation;
2527
import org.apache.commons.javaflow.api.continuable;
@@ -64,29 +66,56 @@ public static Continuation start(ContinuableRunnable o) {
6466
return Continuation.startWith(toRunnable(o));
6567
}
6668

69+
public static <T> CoIterator<T> iterate(Continuation continuation) {
70+
return new CoIterator<>(continuation);
71+
}
72+
73+
public static <T> CoIterator<T> iterate(ContinuableRunnable generator) {
74+
return iterate(create(generator));
75+
}
76+
77+
public static <T> Stream<T> stream(Continuation continuation) {
78+
CoIterator<T> iterator = iterate(continuation);
79+
return StreamSupport
80+
.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false)
81+
.onClose(iterator::close);
82+
}
83+
84+
public static <T> Stream<T> stream(ContinuableRunnable generator) {
85+
return stream(create(generator));
86+
}
87+
6788
/**
6889
* Executes the suspended continuation from the point specified till the end
69-
* of the corresponding code block and performs a potentially suspendable action
90+
* of the corresponding code block and performs a non-suspendable action
7091
* on each value yielded.
7192
*
93+
* @param <T> a type of values
7294
* @param continuation a continuation to resume a code block that yields multiple results
95+
* @param valueType a type of the values yielded from code block
7396
* @param action a continuable action to perform on the values yielded
74-
*/
75-
public @continuable static void execute(Continuation continuation, ContinuableConsumer<? super Object> action) {
76-
execute(continuation, Object.class, action);
97+
*/
98+
public static <T> void execute(Continuation continuation, Consumer<? super T> action) {
99+
try (CoIterator<T> iter = new CoIterator<>(continuation)) {
100+
while (iter.hasNext()) {
101+
action.accept(iter.next());
102+
}
103+
}
77104
}
78105

79106
/**
80-
* Fully executes the continuable code block and performs a potentially suspendable
107+
* Fully executes the continuable code block and performs a non-suspendable
81108
* action on each value yielded.
82109
*
110+
* @param <T> a type of values
83111
* @param generator a continuable code block that yields multiple results
112+
* @param valueType a type of the values yielded from code block
84113
* @param action a continuable action to perform on the values yielded
85114
*/
86-
public @continuable static void execute(ContinuableRunnable generator, ContinuableConsumer<? super Object> action) {
87-
execute(generator, Object.class, action);
115+
public static <T> void execute(ContinuableRunnable generator, Consumer<? super T> action) {
116+
execute(create(generator), action);
88117
}
89-
118+
90119

91120
/**
92121
* Executes the suspended continuation from the point specified till the end
@@ -98,7 +127,7 @@ public static Continuation start(ContinuableRunnable o) {
98127
* @param valueType a type of the values yielded from code block
99128
* @param action a continuable action to perform on the values yielded
100129
*/
101-
public @continuable static <T> void execute(Continuation continuation, Class<T> valueType, ContinuableConsumer<? super T> action) {
130+
public @continuable static <T> void executeContinuable(Continuation continuation, ContinuableConsumer<? super T> action) {
102131
forEach(()-> new CoIterator<>(continuation), action);
103132
}
104133

@@ -111,17 +140,10 @@ public static Continuation start(ContinuableRunnable o) {
111140
* @param valueType a type of the values yielded from code block
112141
* @param action a continuable action to perform on the values yielded
113142
*/
114-
public @continuable static <T> void execute(ContinuableRunnable generator, Class<T> valueType, ContinuableConsumer<? super T> action) {
115-
forEach(()-> new CoIterator<>(generator), action);
143+
public @continuable static <T> void executeContinuable(ContinuableRunnable generator, ContinuableConsumer<? super T> action) {
144+
executeContinuable(create(generator), action);
116145
}
117146

118-
private @continuable static <T> void execute(Supplier<CoIterator<T>> iteratorProvider, Class<T> valueType, ContinuableConsumer<? super T> action) {
119-
try (CoIterator<T> iterator = iteratorProvider.get()) {
120-
forEach(iterator, action);
121-
}
122-
}
123-
124-
125147
/**
126148
* Performs an continuable action for each element of the {@link Stream} supplied.
127149
*

0 commit comments

Comments
 (0)