Skip to content

Commit c7fb8b9

Browse files
authored
Fix take while method (#94)
1 parent ae08fa5 commit c7fb8b9

File tree

2 files changed

+14
-0
lines changed

2 files changed

+14
-0
lines changed

flows/src/main/java/com/softwaremill/jox/flows/Flow.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,6 +1078,11 @@ public Flow<T> takeWhile(Predicate<T> f, boolean includeFirstFailing) {
10781078
throw new BreakException();
10791079
}
10801080
});
1081+
} catch (JoxScopeExecutionException e) {
1082+
if (!(e.getCause() instanceof BreakException)) {
1083+
throw e;
1084+
}
1085+
// done
10811086
} catch (BreakException e) {
10821087
// done
10831088
}

flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,15 @@ void shouldNotTakeIfPredicateFailsForFirstOrMoreElements() throws Exception {
265265
assertEquals(List.of(), flow.runToList());
266266
}
267267

268+
@Test
269+
void shouldTakeWhileFromAsyncFlow() throws Exception {
270+
// given
271+
Flow<Integer> flow = Flows.fromValues(3, 2, 1).buffer().takeWhile(x -> x > 2, false);
272+
273+
// when & then
274+
assertEquals(List.of(3), flow.runToList());
275+
}
276+
268277
@Test
269278
void shouldNotThrottleEmptySource() throws Exception {
270279
// given

0 commit comments

Comments
 (0)