Skip to content

Commit 95efc24

Browse files
author
vsilaev
committed
Refactoring Continuations class, adding parameter to specify whether to return current continuation values from streams/iterators
1 parent eb0e27b commit 95efc24

File tree

5 files changed

+131
-43
lines changed

5 files changed

+131
-43
lines changed

net.tascalate.javaflow.examples/src/main/java/org/apache/commons/javaflow/examples/lambdas/LambdasExampleMinimalIterator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
package org.apache.commons.javaflow.examples.lambdas;
1717

1818
import org.apache.commons.javaflow.api.Continuation;
19-
20-
import org.apache.commons.javaflow.extras.CoIterator;
19+
import org.apache.commons.javaflow.extras.ClosableIterator;
2120
import org.apache.commons.javaflow.extras.Continuations;
2221

2322
public class LambdasExampleMinimalIterator {
@@ -40,7 +39,7 @@ public static void main(String[] argv) throws Exception {
4039
// use try-with-resources to close the coIterator
4140
// (and hence terminate underlying continuation)
4241
// in case of early exit
43-
try (CoIterator<Integer> i = Continuations.iterate(cc)) {
42+
try (ClosableIterator<Integer> i = Continuations.iterate(cc)) {
4443
int c = 0;
4544
while (i.hasNext()) {
4645
System.out.println("Interrupted " + i.next());

net.tascalate.javaflow.examples/src/main/java/org/apache/commons/javaflow/examples/lambdas/LambdasExampleMinimalStream.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@
2424
public class LambdasExampleMinimalStream {
2525

2626
public static void main(String[] argv) throws Exception {
27-
// use try-with-resources to close the stream
28-
// (and hence terminate underlying continuation)
29-
// in case of early exit
30-
try (Stream<Integer> stream = Continuations.stream(() -> {
27+
// Create suspended continuation
28+
Continuation cc = Continuations.create(() -> {
3129
try {
3230
for (int i = 1; i <= 5; i++) {
3331
System.out.println("Exe before suspend");
@@ -37,7 +35,12 @@ public static void main(String[] argv) throws Exception {
3735
} finally {
3836
System.out.println("Continuation gracefully exited");
3937
}
40-
})) {
38+
});
39+
40+
// use try-with-resources to close the stream
41+
// (and hence terminate underlying continuation)
42+
// in case of early exit
43+
try (Stream<Integer> stream = Continuations.stream(cc)) {
4144
Optional<Integer> firstDividableByThree =
4245
stream.peek(v -> System.out.println("Interrupted " + v))
4346
.filter(v -> v% 3 == 0)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Copyright 2013-2017 Valery Silaev (http://vsilaev.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.commons.javaflow.extras;
17+
18+
import java.util.Iterator;
19+
20+
/**
21+
* Iterator that additionally implements {@link Closable} interface
22+
* to execute clean-up when not fully iterated
23+
*
24+
* @author vsilaev
25+
*
26+
* @param <E>
27+
* Type of objects returned by the iterator
28+
*/
29+
public interface ClosableIterator<E> extends Iterator<E>, AutoCloseable {
30+
@Override
31+
void close();
32+
}

net.tascalate.javaflow.extras/src/main/java/org/apache/commons/javaflow/extras/CoIterator.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package org.apache.commons.javaflow.extras;
1717

1818
import java.io.Serializable;
19-
import java.util.Iterator;
2019
import java.util.NoSuchElementException;
2120

2221
import org.apache.commons.javaflow.api.Continuation;
@@ -41,10 +40,10 @@
4140
* it's impossible to pass value back to continuation from client code; the result of
4241
* {@link Continuation#suspend(Object)} in continuation will be always <i>null</i>
4342
*
44-
* @param <T>
43+
* @param <E>
4544
* Type of objects returned by the iterator
4645
*/
47-
public class CoIterator<T> implements Iterator<T>, Serializable, AutoCloseable {
46+
public class CoIterator<E> implements ClosableIterator<E>, Serializable {
4847
private static final long serialVersionUID = 1L;
4948

5049
private boolean advance;
@@ -59,7 +58,7 @@ public class CoIterator<T> implements Iterator<T>, Serializable, AutoCloseable {
5958
*/
6059
public CoIterator(Runnable code) {
6160
cc = Continuation.startSuspendedWith(code);
62-
advance = true;
61+
advance = null != cc;
6362
}
6463

6564
/**
@@ -74,31 +73,47 @@ public CoIterator(ContinuableRunnable code) {
7473
}
7574

7675
/**
77-
* Iterator constructor
76+
* <p>Iterator constructor
77+
* <p>Valued returned by this iterator will be results these are yielded via call to
78+
* {@link Continuation#suspend(Object)}, i.e. cc.value() is not included
7879
*
7980
* @param cc
8081
* Current {@link Continuation} to start iteration from.
81-
* Valued returned by this iterator will be results these are yielded via call to
82-
* {@link Continuation#suspend(Object)}, i.e. cc.value() is not included
8382
*/
8483
public CoIterator(Continuation cc) {
84+
this(cc, false);
85+
}
86+
87+
/**
88+
* <p>Iterator constructor
89+
* <p>Valued returned by this iterator will be results these are yielded via call to
90+
* {@link Continuation#suspend(Object)}, i.e. cc.value() is not included unless
91+
* <code>useCurrentValue</code> is true
92+
*
93+
* @param cc
94+
* Current {@link Continuation} to start iteration from.
95+
*
96+
* @param useCurrentValue
97+
* Should the value of the supplied continuation be used as a first returned value
98+
*/
99+
public CoIterator(Continuation cc, boolean useCurrentValue) {
85100
this.cc = cc;
86-
advance = true;
101+
advance = !useCurrentValue && null != cc;
87102
}
88103

89104
public boolean hasNext() {
90105
advanceIfNecessary();
91106
return cc != null;
92107
}
93108

94-
public T next() {
109+
public E next() {
95110
advanceIfNecessary();
96111

97112
if (cc == null)
98113
throw new NoSuchElementException();
99114

100115
@SuppressWarnings("unchecked")
101-
T result = (T) cc.value();
116+
E result = (E)cc.value();
102117
advance = true;
103118

104119
return result;
@@ -109,21 +124,18 @@ public void remove() {
109124
}
110125

111126
public void close() {
112-
if (null != cc) {
113-
try {
114-
cc.terminate();
115-
} finally {
116-
cc = null;
117-
advance = false;
118-
}
119-
} else {
127+
try {
128+
cc.terminate();
129+
} finally {
130+
cc = null;
120131
advance = false;
121132
}
122133
}
123134

124135
protected void advanceIfNecessary() {
125-
if (advance)
136+
if (advance) {
126137
cc = cc.resume();
138+
}
127139
advance = false;
128140
}
129141
}

net.tascalate.javaflow.extras/src/main/java/org/apache/commons/javaflow/extras/Continuations.java

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,32 @@ public static Continuation start(ContinuableRunnable o) {
6666
return Continuation.startWith(toRunnable(o));
6767
}
6868

69-
public static <T> CoIterator<T> iterate(Continuation continuation) {
70-
return new CoIterator<>(continuation);
69+
public static <T> ClosableIterator<T> iterate(Continuation continuation) {
70+
return iterate(continuation, false);
7171
}
7272

73-
public static <T> CoIterator<T> iterate(ContinuableRunnable generator) {
74-
return iterate(create(generator));
73+
public static <T> ClosableIterator<T> iterate(Continuation continuation, boolean useCurrentValue) {
74+
return new CoIterator<>(continuation, useCurrentValue);
75+
}
76+
77+
78+
public static <T> ClosableIterator<T> iterate(ContinuableRunnable generator) {
79+
return iterate(create(generator), false);
7580
}
7681

7782
public static <T> Stream<T> stream(Continuation continuation) {
78-
CoIterator<T> iterator = iterate(continuation);
83+
return stream(continuation, false);
84+
}
85+
86+
public static <T> Stream<T> stream(Continuation continuation, boolean useCurrentValue) {
87+
ClosableIterator<T> iterator = iterate(continuation, useCurrentValue);
7988
return StreamSupport
8089
.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false)
8190
.onClose(iterator::close);
8291
}
8392

8493
public static <T> Stream<T> stream(ContinuableRunnable generator) {
85-
return stream(create(generator));
94+
return stream(create(generator), false);
8695
}
8796

8897
/**
@@ -92,11 +101,24 @@ public static <T> Stream<T> stream(ContinuableRunnable generator) {
92101
*
93102
* @param <T> a type of values
94103
* @param continuation a continuation to resume a code block that yields multiple results
95-
* @param valueType a type of the values yielded from code block
96-
* @param action a continuable action to perform on the values yielded
104+
* @param action a non-continuable action to perform on the values yielded
97105
*/
98106
public static <T> void execute(Continuation continuation, Consumer<? super T> action) {
99-
try (CoIterator<T> iter = new CoIterator<>(continuation)) {
107+
execute(continuation, false, action);
108+
}
109+
110+
/**
111+
* Executes the suspended continuation from the point specified till the end
112+
* of the corresponding code block and performs a non-suspendable action
113+
* on each value yielded.
114+
*
115+
* @param <T> a type of values
116+
* @param continuation a continuation to resume a code block that yields multiple results
117+
* @param useCurrentValue should the value of the supplied continuation be used as a first value to process
118+
* @param action a non-continuable action to perform on the values yielded
119+
*/
120+
public static <T> void execute(Continuation continuation, boolean useCurrentValue, Consumer<? super T> action) {
121+
try (ClosableIterator<T> iter = iterate(continuation, useCurrentValue)) {
100122
while (iter.hasNext()) {
101123
action.accept(iter.next());
102124
}
@@ -109,8 +131,7 @@ public static <T> void execute(Continuation continuation, Consumer<? super T> ac
109131
*
110132
* @param <T> a type of values
111133
* @param generator a continuable code block that yields multiple results
112-
* @param valueType a type of the values yielded from code block
113-
* @param action a continuable action to perform on the values yielded
134+
* @param action a non-continuable action to perform on the values yielded
114135
*/
115136
public static <T> void execute(ContinuableRunnable generator, Consumer<? super T> action) {
116137
execute(create(generator), action);
@@ -124,24 +145,38 @@ public static <T> void execute(ContinuableRunnable generator, Consumer<? super T
124145
*
125146
* @param <T> a type of values
126147
* @param continuation a continuation to resume a code block that yields multiple results
127-
* @param valueType a type of the values yielded from code block
128148
* @param action a continuable action to perform on the values yielded
129149
*/
130150
public @continuable static <T> void executeContinuable(Continuation continuation, ContinuableConsumer<? super T> action) {
131-
forEach(()-> new CoIterator<>(continuation), action);
151+
executeContinuable(continuation, false, action);
132152
}
133153

154+
/**
155+
* Executes the suspended continuation from the point specified till the end
156+
* of the corresponding code block and performs a potentially suspendable action
157+
* on each value yielded.
158+
*
159+
* @param <T> a type of values
160+
* @param useCurrentValue should the value of the supplied continuation be used as a first value to process
161+
* @param action a continuable action to perform on the values yielded
162+
*/
163+
public @continuable static <T> void executeContinuable(Continuation continuation, boolean useCurrentValue, ContinuableConsumer<? super T> action) {
164+
try (ClosableIterator<T> iter = iterate(continuation, useCurrentValue)) {
165+
forEach(iter, action);
166+
}
167+
}
168+
169+
134170
/**
135171
* Fully executes the continuable code block and performs a potentially suspendable
136172
* action on each value yielded.
137173
*
138174
* @param <T> a type of values
139175
* @param generator a continuable code block that yields multiple results
140-
* @param valueType a type of the values yielded from code block
141176
* @param action a continuable action to perform on the values yielded
142177
*/
143178
public @continuable static <T> void executeContinuable(ContinuableRunnable generator, ContinuableConsumer<? super T> action) {
144-
executeContinuable(create(generator), action);
179+
executeContinuable(create(generator), false, action);
145180
}
146181

147182
/**
@@ -169,7 +204,10 @@ public static <T> void execute(ContinuableRunnable generator, Consumer<? super T
169204
* @param action a continuable action to perform on the elements
170205
*/
171206
public @continuable static <T> void forEach(Iterable<T> iterable, ContinuableConsumer<? super T> action) {
172-
forEach(iterable.iterator(), action);
207+
Iterator<T> iter = iterable.iterator();
208+
try (ClosableIterator<T> closable = asClosable(iter)) {
209+
forEach(iter, action);
210+
}
173211
}
174212

175213
/**
@@ -188,5 +226,9 @@ public static <T> void execute(ContinuableRunnable generator, Consumer<? super T
188226
}
189227
}
190228

229+
@SuppressWarnings("unchecked")
230+
private static <E> ClosableIterator<E> asClosable(Object o) {
231+
return o instanceof ClosableIterator ? (ClosableIterator<E>)o : null;
232+
}
191233

192234
}

0 commit comments

Comments
 (0)