Skip to content

Commit be83be3

Browse files
authored
Merge pull request #720 from fjtirado/Fix_#719
[Fix #719] Fixing until condition
2 parents 4512fa7 + 576c0d3 commit be83be3

File tree

3 files changed

+51
-1
lines changed

3 files changed

+51
-1
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,11 @@ protected void internalProcessCe(
215215
TaskContext taskContext,
216216
CompletableFuture<WorkflowModel> future) {
217217
arrayNode.add(node);
218-
if ((until.isEmpty() || until.map(u -> u.test(workflow, taskContext, arrayNode)).isPresent())
218+
if (until.map(u -> u.test(workflow, taskContext, arrayNode)).orElse(true)
219219
&& untilRegBuilders == null) {
220220
future.complete(node);
221+
} else {
222+
((WorkflowMutableInstance) workflow.instance()).status(WorkflowStatus.WAITING);
221223
}
222224
}
223225
}

impl/jackson/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import io.serverlessworkflow.api.WorkflowReader;
2525
import io.serverlessworkflow.impl.jackson.JsonUtils;
2626
import java.io.IOException;
27+
import java.time.Instant;
28+
import java.time.temporal.ChronoUnit;
2729
import java.util.Map;
2830
import java.util.concurrent.CompletableFuture;
2931
import java.util.stream.Stream;
3032
import org.junit.jupiter.api.BeforeAll;
33+
import org.junit.jupiter.api.Test;
3134
import org.junit.jupiter.params.ParameterizedTest;
3235
import org.junit.jupiter.params.provider.Arguments;
3336
import org.junit.jupiter.params.provider.MethodSource;
@@ -81,6 +84,31 @@ void testEventsListened(String listen, String emit1, String emit2, JsonNode expe
8184
assertThat(waitingInstance.outputAs(JsonNode.class)).isEqualTo(expectedResult);
8285
}
8386

87+
@Test
88+
void testForEachInAnyIsExecutedAsEventArrive() throws IOException, InterruptedException {
89+
WorkflowDefinition listenDefinition =
90+
appl.workflowDefinition(
91+
WorkflowReader.readWorkflowFromClasspath("listen-to-any-until.yaml"));
92+
WorkflowDefinition emitDoctorDefinition =
93+
appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("emit-doctor.yaml"));
94+
WorkflowInstance waitingInstance = listenDefinition.instance(Map.of());
95+
CompletableFuture<WorkflowModel> future = waitingInstance.start();
96+
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING);
97+
emitDoctorDefinition.instance(Map.of("temperature", 35)).start().join();
98+
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING);
99+
Thread.sleep(1100);
100+
emitDoctorDefinition.instance(Map.of("temperature", 39)).start().join();
101+
assertThat(future).isCompleted();
102+
assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED);
103+
ArrayNode result = waitingInstance.outputAs(ArrayNode.class);
104+
assertThat(ChronoUnit.SECONDS.between(getInstant(result, 0), getInstant(result, 1)))
105+
.isGreaterThanOrEqualTo(1L);
106+
}
107+
108+
private static Instant getInstant(ArrayNode result, int index) {
109+
return Instant.ofEpochSecond(result.get(index).get("time").asLong());
110+
}
111+
84112
private static Stream<Arguments> eventListenerParameters() {
85113
return Stream.of(
86114
Arguments.of("listen-to-any.yaml", "emit.yaml", array(cruellaDeVil()), Map.of()),
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
document:
2+
dsl: '1.0.0-alpha5'
3+
namespace: test
4+
name: listen-to-any-until
5+
version: '0.1.0'
6+
do:
7+
- callDoctor:
8+
listen:
9+
to:
10+
any:
11+
- with:
12+
type: com.fake-hospital.vitals.measurements.temperature
13+
until: . | any (.temperature > 38)
14+
foreach:
15+
item: event
16+
do:
17+
- measure:
18+
set:
19+
temperature: ${$event.temperature}
20+
time: ${ now}

0 commit comments

Comments
 (0)