diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java index 347bda9a..3579fcc5 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java @@ -215,9 +215,11 @@ protected void internalProcessCe( TaskContext taskContext, CompletableFuture future) { arrayNode.add(node); - if ((until.isEmpty() || until.map(u -> u.test(workflow, taskContext, arrayNode)).isPresent()) + if (until.map(u -> u.test(workflow, taskContext, arrayNode)).orElse(true) && untilRegBuilders == null) { future.complete(node); + } else { + ((WorkflowMutableInstance) workflow.instance()).status(WorkflowStatus.WAITING); } } } diff --git a/impl/jackson/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java b/impl/jackson/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java index 353b777b..b7483943 100644 --- a/impl/jackson/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java +++ b/impl/jackson/src/test/java/io/serverlessworkflow/impl/EventDefinitionTest.java @@ -24,10 +24,13 @@ import io.serverlessworkflow.api.WorkflowReader; import io.serverlessworkflow.impl.jackson.JsonUtils; import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -81,6 +84,31 @@ void testEventsListened(String listen, String emit1, String emit2, JsonNode expe assertThat(waitingInstance.outputAs(JsonNode.class)).isEqualTo(expectedResult); } + @Test + void testForEachInAnyIsExecutedAsEventArrive() throws IOException, InterruptedException { + WorkflowDefinition listenDefinition = + appl.workflowDefinition( + WorkflowReader.readWorkflowFromClasspath("listen-to-any-until.yaml")); + WorkflowDefinition emitDoctorDefinition = + appl.workflowDefinition(WorkflowReader.readWorkflowFromClasspath("emit-doctor.yaml")); + WorkflowInstance waitingInstance = listenDefinition.instance(Map.of()); + CompletableFuture future = waitingInstance.start(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING); + emitDoctorDefinition.instance(Map.of("temperature", 35)).start().join(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.WAITING); + Thread.sleep(1100); + emitDoctorDefinition.instance(Map.of("temperature", 39)).start().join(); + assertThat(future).isCompleted(); + assertThat(waitingInstance.status()).isEqualTo(WorkflowStatus.COMPLETED); + ArrayNode result = waitingInstance.outputAs(ArrayNode.class); + assertThat(ChronoUnit.SECONDS.between(getInstant(result, 0), getInstant(result, 1))) + .isGreaterThanOrEqualTo(1L); + } + + private static Instant getInstant(ArrayNode result, int index) { + return Instant.ofEpochSecond(result.get(index).get("time").asLong()); + } + private static Stream eventListenerParameters() { return Stream.of( Arguments.of("listen-to-any.yaml", "emit.yaml", array(cruellaDeVil()), Map.of()), diff --git a/impl/jackson/src/test/resources/listen-to-any-until.yaml b/impl/jackson/src/test/resources/listen-to-any-until.yaml new file mode 100644 index 00000000..42e814b6 --- /dev/null +++ b/impl/jackson/src/test/resources/listen-to-any-until.yaml @@ -0,0 +1,20 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: listen-to-any-until + version: '0.1.0' +do: + - callDoctor: + listen: + to: + any: + - with: + type: com.fake-hospital.vitals.measurements.temperature + until: . | any (.temperature > 38) + foreach: + item: event + do: + - measure: + set: + temperature: ${$event.temperature} + time: ${ now}