Skip to content

Commit 45a0397

Browse files
committed
post review
Signed-off-by: Dmitrii Tikhomirov <[email protected]>
1 parent 8eead7b commit 45a0397

File tree

4 files changed

+139
-81
lines changed

4 files changed

+139
-81
lines changed

experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/FlexibleProcess.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@ public class FlexibleProcess extends TaskBase {
2626
private Predicate<WorkflowModel> exitCondition;
2727
private Activity[] activities;
2828

29-
private int maxAttempts = 1024;
29+
private int maxAttempts;
3030

31-
private FlexibleProcess() {}
31+
private FlexibleProcess(Predicate<WorkflowModel> exitCondition, Activity[] activities) {
32+
this.exitCondition = exitCondition;
33+
this.activities = activities;
34+
}
3235

3336
public Predicate<WorkflowModel> getExitCondition() {
3437
return exitCondition;
@@ -59,7 +62,7 @@ public static FlexibleProcessBuilder builder() {
5962
public static class FlexibleProcessBuilder {
6063
private Predicate<WorkflowModel> exitCondition;
6164
private Activity[] activities;
62-
private Integer maxAttempts;
65+
private int maxAttempts = 1024;
6366

6467
public FlexibleProcessBuilder exitCondition(Predicate<WorkflowModel> exitCondition) {
6568
this.exitCondition = exitCondition;
@@ -71,8 +74,7 @@ public FlexibleProcessBuilder activities(Activity... activities) {
7174
return this;
7275
}
7376

74-
public FlexibleProcessBuilder maxAttempts(Integer maxAttempts) {
75-
Objects.requireNonNull(maxAttempts, "maxAttempts cannot be null");
77+
public FlexibleProcessBuilder maxAttempts(int maxAttempts) {
7678
if (maxAttempts < 1) {
7779
throw new IllegalArgumentException("maxAttempts must be greater than 0");
7880
}
@@ -81,14 +83,11 @@ public FlexibleProcessBuilder maxAttempts(Integer maxAttempts) {
8183
}
8284

8385
public FlexibleProcess build() {
84-
FlexibleProcess flexibleProcess = new FlexibleProcess();
85-
flexibleProcess.exitCondition =
86-
Objects.requireNonNull(this.exitCondition, "Exit condition must be provided");
87-
flexibleProcess.activities =
88-
Objects.requireNonNull(this.activities, "Activities must be provided");
89-
if (this.maxAttempts != null) {
90-
flexibleProcess.maxAttempts = this.maxAttempts;
91-
}
86+
FlexibleProcess flexibleProcess =
87+
new FlexibleProcess(
88+
Objects.requireNonNull(exitCondition, "Exit condition must be provided"),
89+
Objects.requireNonNull(activities, "Activities must be provided"));
90+
flexibleProcess.maxAttempts = maxAttempts;
9291
return flexibleProcess;
9392
}
9493
}

experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutor.java

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,49 +19,29 @@
1919
import io.serverlessworkflow.fluent.processes.FlexibleProcess;
2020
import io.serverlessworkflow.impl.TaskContext;
2121
import io.serverlessworkflow.impl.WorkflowContext;
22-
import io.serverlessworkflow.impl.WorkflowDefinition;
2322
import io.serverlessworkflow.impl.WorkflowModel;
24-
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2523
import io.serverlessworkflow.impl.executors.CallableTask;
2624
import io.serverlessworkflow.impl.executors.TaskExecutor;
27-
import io.serverlessworkflow.impl.executors.TaskExecutorBuilder;
28-
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
29-
import java.util.HashMap;
3025
import java.util.Map;
3126
import java.util.Optional;
32-
import java.util.UUID;
3327
import java.util.concurrent.CompletableFuture;
28+
import java.util.function.Supplier;
3429

3530
public class FlexibleProcessExecutor implements CallableTask {
3631

37-
private final WorkflowDefinition definition;
3832
private final FlexibleProcess flexibleProcess;
33+
private Supplier<Map<Activity, TaskExecutor<?>>> executors;
3934

40-
public FlexibleProcessExecutor(FlexibleProcess flexibleProcess, WorkflowDefinition definition) {
35+
public FlexibleProcessExecutor(
36+
FlexibleProcess flexibleProcess, Supplier<Map<Activity, TaskExecutor<?>>> executors) {
4137
this.flexibleProcess = flexibleProcess;
42-
this.definition = definition;
38+
this.executors = executors;
4339
}
4440

4541
@Override
4642
public CompletableFuture<WorkflowModel> apply(
4743
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
48-
return CompletableFuture.supplyAsync(
49-
() -> {
50-
WorkflowMutablePosition position = definition.application().positionFactory().get();
51-
position.addProperty(UUID.randomUUID().toString());
52-
TaskExecutorFactory factory = definition.application().taskFactory();
53-
Map<Activity, TaskExecutor<?>> executors = new HashMap<>();
54-
55-
for (Activity activity : flexibleProcess.getActivities()) {
56-
TaskExecutorBuilder<?> builder =
57-
factory.getTaskExecutor(position, activity.getTask(), definition);
58-
TaskExecutor<?> executor = builder.build();
59-
executors.put(activity, executor);
60-
}
61-
62-
FlexibleProcessManager manager = new FlexibleProcessManager(flexibleProcess, executors);
63-
manager.run(workflowContext, Optional.of(taskContext), input);
64-
return input;
65-
});
44+
FlexibleProcessManager manager = new FlexibleProcessManager(flexibleProcess, executors.get());
45+
return manager.run(workflowContext, Optional.of(taskContext), input);
6646
}
6747
}

experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutorBuilder.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,51 @@
1616
package io.serverlessworkflow.fluent.processes.internal;
1717

1818
import io.serverlessworkflow.api.types.TaskBase;
19+
import io.serverlessworkflow.fluent.processes.Activity;
1920
import io.serverlessworkflow.fluent.processes.FlexibleProcess;
2021
import io.serverlessworkflow.impl.WorkflowDefinition;
2122
import io.serverlessworkflow.impl.WorkflowMutablePosition;
2223
import io.serverlessworkflow.impl.executors.CallableTask;
2324
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
25+
import io.serverlessworkflow.impl.executors.TaskExecutor;
26+
import io.serverlessworkflow.impl.executors.TaskExecutorBuilder;
27+
import io.serverlessworkflow.impl.executors.TaskExecutorFactory;
28+
import java.util.HashMap;
29+
import java.util.Map;
30+
import java.util.UUID;
2431

2532
public class FlexibleProcessExecutorBuilder implements CallableTaskBuilder<FlexibleProcess> {
2633

27-
private FlexibleProcess task;
34+
private FlexibleProcess flexibleProcess;
2835
private WorkflowDefinition definition;
2936

3037
@Override
3138
public void init(
32-
FlexibleProcess task, WorkflowDefinition definition, WorkflowMutablePosition position) {
33-
this.task = task;
39+
FlexibleProcess flexibleProcess,
40+
WorkflowDefinition definition,
41+
WorkflowMutablePosition position) {
42+
this.flexibleProcess = flexibleProcess;
3443
this.definition = definition;
3544
}
3645

3746
@Override
3847
public CallableTask build() {
39-
return new FlexibleProcessExecutor(task, definition);
48+
WorkflowMutablePosition position = definition.application().positionFactory().get();
49+
position.addProperty(UUID.randomUUID().toString());
50+
TaskExecutorFactory factory = definition.application().taskFactory();
51+
52+
return new FlexibleProcessExecutor(
53+
flexibleProcess,
54+
() -> {
55+
Map<Activity, TaskExecutor<?>> executors = new HashMap<>();
56+
for (Activity activity : flexibleProcess.getActivities()) {
57+
TaskExecutorBuilder<?> builder =
58+
factory.getTaskExecutor(position, activity.getTask(), definition);
59+
TaskExecutor<?> executor = builder.build();
60+
executors.put(activity, executor);
61+
}
62+
return executors;
63+
});
4064
}
4165

4266
@Override

experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessManager.java

Lines changed: 92 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@
2121
import io.serverlessworkflow.impl.WorkflowContext;
2222
import io.serverlessworkflow.impl.WorkflowModel;
2323
import io.serverlessworkflow.impl.executors.TaskExecutor;
24-
import java.util.LinkedList;
2524
import java.util.Map;
2625
import java.util.Optional;
27-
import java.util.Queue;
26+
import java.util.concurrent.CompletableFuture;
2827
import java.util.stream.Collectors;
2928

3029
public class FlexibleProcessManager {
@@ -38,48 +37,104 @@ public FlexibleProcessManager(
3837
this.executors = executors;
3938
}
4039

41-
public void run(
40+
public CompletableFuture<WorkflowModel> run(
4241
WorkflowContext workflowContext, Optional<TaskContext> parentContext, WorkflowModel input) {
43-
boolean exit = flexibleProcess.getExitCondition().test(input);
44-
int counter = flexibleProcess.getMaxAttempts();
45-
while (!exit) {
46-
Map<Activity, TaskExecutor<?>> availableExecutors = getExecutors(input);
47-
if (availableExecutors.isEmpty()) {
48-
return;
49-
}
50-
Queue<Map.Entry<Activity, TaskExecutor<?>>> executorQueue =
51-
new LinkedList<>(availableExecutors.entrySet());
52-
while (!executorQueue.isEmpty()) {
53-
Map.Entry<Activity, TaskExecutor<?>> entry = executorQueue.poll();
54-
Activity activity = entry.getKey();
55-
TaskExecutor<?> executor = entry.getValue();
56-
try {
57-
executor
58-
.apply(workflowContext, parentContext, input)
59-
.join(); // blocking, because we run flexible process one by one
60-
if (activity.getPostAction() != null) {
61-
activity.getPostAction().accept(input);
42+
43+
int maxAttempts = flexibleProcess.getMaxAttempts();
44+
CompletableFuture<WorkflowModel> promise = new CompletableFuture<>();
45+
runAttempt(workflowContext, parentContext, input, maxAttempts, promise);
46+
return promise;
47+
}
48+
49+
private void runAttempt(
50+
WorkflowContext workflowContext,
51+
Optional<TaskContext> parentContext,
52+
WorkflowModel input,
53+
int remainingAttempts,
54+
CompletableFuture<WorkflowModel> promise) {
55+
56+
if (promise.isDone()) {
57+
return;
58+
}
59+
60+
if (flexibleProcess.getExitCondition().test(input)) {
61+
promise.complete(input);
62+
return;
63+
}
64+
65+
if (remainingAttempts <= 0) {
66+
promise.complete(input);
67+
return;
68+
}
69+
70+
Map<Activity, TaskExecutor<?>> availableExecutors = getExecutors(input);
71+
if (availableExecutors.isEmpty()) {
72+
promise.complete(input);
73+
return;
74+
}
75+
76+
CompletableFuture<WorkflowModel> passFuture =
77+
runOnePassSequentially(workflowContext, parentContext, input, availableExecutors);
78+
79+
passFuture.whenComplete(
80+
(updatedInput, ex) -> {
81+
if (ex != null) {
82+
promise.completeExceptionally(ex);
83+
return;
84+
}
85+
86+
if (flexibleProcess.getExitCondition().test(updatedInput)) {
87+
promise.complete(updatedInput);
88+
return;
6289
}
63-
activity.setExecuted();
64-
} catch (Exception e) {
65-
throw new RuntimeException("Error executing activity: " + activity.getName(), e);
66-
}
67-
exit = flexibleProcess.getExitCondition().test(input);
68-
if (exit) {
69-
break;
70-
}
71-
}
72-
counter--;
73-
if (counter <= 0) {
74-
break;
75-
}
90+
runAttempt(workflowContext, parentContext, updatedInput, remainingAttempts - 1, promise);
91+
});
92+
}
93+
94+
private CompletableFuture<WorkflowModel> runOnePassSequentially(
95+
WorkflowContext workflowContext,
96+
Optional<TaskContext> parentContext,
97+
WorkflowModel input,
98+
Map<Activity, TaskExecutor<?>> availableExecutors) {
99+
100+
return runNextExecutor(
101+
workflowContext, parentContext, input, availableExecutors.entrySet().iterator());
102+
}
103+
104+
private CompletableFuture<WorkflowModel> runNextExecutor(
105+
WorkflowContext workflowContext,
106+
Optional<TaskContext> parentContext,
107+
WorkflowModel input,
108+
java.util.Iterator<Map.Entry<Activity, TaskExecutor<?>>> it) {
109+
110+
if (flexibleProcess.getExitCondition().test(input) || !it.hasNext()) {
111+
return CompletableFuture.completedFuture(input);
76112
}
113+
114+
Map.Entry<Activity, TaskExecutor<?>> entry = it.next();
115+
Activity activity = entry.getKey();
116+
TaskExecutor<?> executor = entry.getValue();
117+
118+
return executor
119+
.apply(workflowContext, parentContext, input)
120+
.thenApply(
121+
taskContext -> {
122+
if (activity.getPostAction() != null) {
123+
activity.getPostAction().accept(input);
124+
}
125+
activity.setExecuted();
126+
return input;
127+
})
128+
.thenCompose(
129+
updatedInput -> runNextExecutor(workflowContext, parentContext, updatedInput, it));
77130
}
78131

79132
private Map<Activity, TaskExecutor<?>> getExecutors(WorkflowModel input) {
80133
return executors.entrySet().stream()
81-
.filter(activity -> activity.getKey().isRepeatable() || !activity.getKey().isExecuted())
82-
.filter(e -> e.getKey().getEntryCondition().test(input))
134+
.filter(
135+
activity ->
136+
(activity.getKey().isRepeatable() || !activity.getKey().isExecuted())
137+
&& activity.getKey().getEntryCondition().test(input))
83138
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
84139
}
85140
}

0 commit comments

Comments
 (0)