Skip to content

Commit 176c4e8

Browse files
authored
Merge pull request #706 from fjtirado/experimental_listen
Add Until listen predicate
2 parents 10cd323 + 92cbe07 commit 176c4e8

File tree

7 files changed

+129
-19
lines changed

7 files changed

+129
-19
lines changed

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaFuncUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,22 @@
1515
*/
1616
package io.serverlessworkflow.impl.executors.func;
1717

18+
import io.serverlessworkflow.api.types.func.TypedPredicate;
1819
import io.serverlessworkflow.impl.WorkflowModel;
1920
import java.util.Optional;
21+
import java.util.function.Predicate;
2022

2123
public class JavaFuncUtils {
2224

2325
static Object safeObject(Object obj) {
2426
return obj instanceof WorkflowModel model ? model.asJavaObject() : obj;
2527
}
2628

29+
@SuppressWarnings({"unchecked", "rawtypes"})
30+
static Object predObject(Predicate<?> pred, Optional<Class<?>> predClass) {
31+
return predClass.isPresent() ? new TypedPredicate(pred, predClass.orElseThrow()) : pred;
32+
}
33+
2734
static <T> T convertT(WorkflowModel model, Optional<Class<T>> inputClass) {
2835
return inputClass
2936
.map(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.serverlessworkflow.impl.executors.func;
18+
19+
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.predObject;
20+
21+
import io.serverlessworkflow.api.types.ListenTask;
22+
import io.serverlessworkflow.api.types.Until;
23+
import io.serverlessworkflow.api.types.Workflow;
24+
import io.serverlessworkflow.api.types.func.UntilPredicate;
25+
import io.serverlessworkflow.impl.WorkflowApplication;
26+
import io.serverlessworkflow.impl.WorkflowMutablePosition;
27+
import io.serverlessworkflow.impl.WorkflowPredicate;
28+
import io.serverlessworkflow.impl.executors.ListenExecutor.ListenExecutorBuilder;
29+
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
30+
import io.serverlessworkflow.impl.resources.ResourceLoader;
31+
32+
public class JavaListenExecutorBuilder extends ListenExecutorBuilder {
33+
34+
protected JavaListenExecutorBuilder(
35+
WorkflowMutablePosition position,
36+
ListenTask task,
37+
Workflow workflow,
38+
WorkflowApplication application,
39+
ResourceLoader resourceLoader) {
40+
super(position, task, workflow, application, resourceLoader);
41+
}
42+
43+
@Override
44+
protected WorkflowPredicate buildUntilPredicate(Until until) {
45+
return until instanceof UntilPredicate untilPred && untilPred.predicate() != null
46+
? application
47+
.expressionFactory()
48+
.buildPredicate(
49+
ExpressionDescriptor.object(
50+
predObject(untilPred.predicate(), untilPred.predicateClass())))
51+
: super.buildUntilPredicate(until);
52+
}
53+
}

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaSwitchExecutorBuilder.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,19 @@
1616

1717
package io.serverlessworkflow.impl.executors.func;
1818

19+
import static io.serverlessworkflow.impl.executors.func.JavaFuncUtils.predObject;
20+
1921
import io.serverlessworkflow.api.types.SwitchCase;
2022
import io.serverlessworkflow.api.types.SwitchTask;
2123
import io.serverlessworkflow.api.types.Workflow;
2224
import io.serverlessworkflow.api.types.func.SwitchCaseFunction;
23-
import io.serverlessworkflow.api.types.func.TypedPredicate;
2425
import io.serverlessworkflow.impl.WorkflowApplication;
2526
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2627
import io.serverlessworkflow.impl.WorkflowPredicate;
2728
import io.serverlessworkflow.impl.executors.SwitchExecutor.SwitchExecutorBuilder;
2829
import io.serverlessworkflow.impl.expressions.ExpressionDescriptor;
2930
import io.serverlessworkflow.impl.resources.ResourceLoader;
3031
import java.util.Optional;
31-
import java.util.function.Predicate;
3232

3333
public class JavaSwitchExecutorBuilder extends SwitchExecutorBuilder {
3434

@@ -52,9 +52,4 @@ protected Optional<WorkflowPredicate> buildFilter(SwitchCase switchCase) {
5252
predObject(function.predicate(), function.predicateClass()))))
5353
: super.buildFilter(switchCase);
5454
}
55-
56-
@SuppressWarnings({"unchecked", "rawtypes"})
57-
private Object predObject(Predicate<?> pred, Optional<Class<?>> predClass) {
58-
return predClass.isPresent() ? new TypedPredicate(pred, predClass.orElseThrow()) : pred;
59-
}
6055
}

experimental/lambda/src/main/java/io/serverlessworkflow/impl/executors/func/JavaTaskExecutorFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public TaskExecutorBuilder<? extends TaskBase> getTaskExecutor(
3838
} else if (task.getSwitchTask() != null) {
3939
return new JavaSwitchExecutorBuilder(
4040
position, task.getSwitchTask(), workflow, application, resourceLoader);
41+
} else if (task.getListenTask() != null) {
42+
return new JavaListenExecutorBuilder(
43+
position, task.getListenTask(), workflow, application, resourceLoader);
4144
} else {
4245
return super.getTaskExecutor(position, task, workflow, application, resourceLoader);
4346
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.api.types.func;
17+
18+
import io.serverlessworkflow.api.types.Until;
19+
import java.util.Optional;
20+
import java.util.function.Predicate;
21+
22+
public class UntilPredicate extends Until {
23+
24+
private Predicate<?> predicate;
25+
private Optional<Class<?>> predicateClass;
26+
27+
public <T> Until withFunction(Predicate<T> predicate) {
28+
this.predicate = predicate;
29+
this.predicateClass = Optional.empty();
30+
return this;
31+
}
32+
33+
public <T> Until withFunction(Predicate<T> predicate, Class<T> clazz) {
34+
this.predicate = predicate;
35+
this.predicateClass = Optional.ofNullable(clazz);
36+
return this;
37+
}
38+
39+
public Predicate<?> predicate() {
40+
return predicate;
41+
}
42+
43+
public Optional<Class<?>> predicateClass() {
44+
return predicateClass;
45+
}
46+
}

impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,17 +100,17 @@ protected ListenExecutorBuilder(
100100
registrations = anyEvents(any);
101101
Until untilDesc = any.getUntil();
102102
if (untilDesc != null) {
103-
if (untilDesc.getAnyEventUntilCondition() != null) {
104-
until =
105-
WorkflowUtils.buildPredicate(application, untilDesc.getAnyEventUntilCondition());
106-
} else if (untilDesc.getAnyEventUntilConsumed() != null) {
107-
EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed();
108-
if (strategy.getAllEventConsumptionStrategy() != null) {
109-
untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy());
110-
} else if (strategy.getAnyEventConsumptionStrategy() != null) {
111-
untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy());
112-
} else if (strategy.getOneEventConsumptionStrategy() != null) {
113-
untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy());
103+
until = buildUntilPredicate(untilDesc);
104+
if (until == null) {
105+
if (untilDesc.getAnyEventUntilConsumed() != null) {
106+
EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed();
107+
if (strategy.getAllEventConsumptionStrategy() != null) {
108+
untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy());
109+
} else if (strategy.getAnyEventConsumptionStrategy() != null) {
110+
untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy());
111+
} else if (strategy.getOneEventConsumptionStrategy() != null) {
112+
untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy());
113+
}
114114
}
115115
}
116116
}
@@ -136,6 +136,12 @@ protected ListenExecutorBuilder(
136136
}
137137
}
138138

139+
protected WorkflowPredicate buildUntilPredicate(Until until) {
140+
return until.getAnyEventUntilCondition() != null
141+
? WorkflowUtils.buildPredicate(application, until.getAnyEventUntilCondition())
142+
: null;
143+
}
144+
139145
private Collection<EventRegistrationBuilder> registerToAll() {
140146
return application.eventConsumer().listenToAll(application);
141147
}

impl/jackson/src/test/java/io/serverlessworkflow/impl/LifeCycleEventsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ void testSuspendResumeWait()
117117
CompletableFuture<WorkflowModel> future = instance.start();
118118
instance.suspend();
119119
assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING);
120-
Thread.sleep(500);
120+
Thread.sleep(550);
121121
assertThat(instance.status()).isEqualTo(WorkflowStatus.SUSPENDED);
122122
instance.resume();
123123
assertThat(future.get(1, TimeUnit.SECONDS).asMap().orElseThrow())

0 commit comments

Comments
 (0)